I/O por reação à disponibilidade: aplicações

Published by

on

Em um post anterior, recapitulei antigos estudos sobre concorrência no processamento de I/O com mecanismos de reação à disponibilidade, delineando três reactors, select_reactor, poll_reactor e epoll_reactor definidos genericamente para qualquer Handler conforme este concept, onde device_type, interest_type e event_type ficaram como exercício para o leitor.

concept Handler
{
  device_type device ();

  interest_type interest ();

  void dispatch (event_type);
}

Grosso modo, device_type é um “file descriptor”; interest_type e event_type são algum tipo de conjunto com operação “definir” e predicado “contém”.

Na discussão abaixo, vamos ignorar o tratamento de erro nas primitivas do sistema, como read e write, a cerimônia necessária para RAII etc.

Uma das mais simples aplicações para demonstrar I/O é um descartador de dados, que recebe tudo e joga fora. Esta aplicação tem interesse apenas em consumir a entrada.

struct discard
{
  device_type m_device;

  interest_type m_interest;

  std::vector<std::byte> m_buffer;

  discard () : 
    m_device { ::acquire() },
    m_interest {}
  {
    using event_nature::in;
    m_interest.set(in);
    m_buffer.resize(...);
  }

  ~discard ()
  {
    ::release(m_device);
  }

  device_type device () { return m_device; }

  interest_type interest () { return m_interest; }

  void dispatch (event_type event)
  {
    using event_nature::in;
    if (event.has(in)) do_read();
  }

  void do_read ()
  {
    ::read(m_device,m_buffer.data(),m_buffer.size());
  }
};

Nesta definição, não damos solução para a condição de término. No caso de aplicações de networking, o remoto pode desconectar a qualquer momento. Existem dois casos gerais para isso. Em um caso, read retorna um valor especial TERMINATED.

void do_read ()
{
  auto r = ::read(m_device,m_buffer.data(),m_buffer.size());
  if (r == TERMINATED) do_terminate();
}

Em outro caso, existe um event_nature especificamente reservado para significar término.

void dispatch (event_type event)
{
  using event_nature::in;
  using event_nature::terminated;

  if (event.has(terminated)) do_terminate();
  else if (event.has(in)) do_read();
}

Em qualquer caso, do_terminate tem a complicada missão de se comunicar com o reactor, que deve cessar a observação deste dispositivo. Este problema será discutido em outra oportunidade.

Outra aplicação muito simples para demonstrar I/O é um gerador de barulho, que produz informação qualquer sempre que disponível.

struct chargen
{
  device_type m_device;

  interest_type m_interest;

  std::vector<std::byte> m_buffer;

  chargen () : 
    m_device { ::acquire() },
    m_interest {},
    m_buffer { ... }
  {
    using event_nature::write;
    m_interest.set(write);
  }

  ~chargen ()
  {
    ::release(m_device);
  }

  device_type device () { return m_device; }

  interest_type interest () { return m_interest; }

  void dispatch (event_type event)
  {
    using event_nature::in;
    if (event.has(write)) do_write();
  }

  void do_write ()
  {
    ::write(m_device,m_buffer.data(),m_buffer.size());
  }
};

Observe que, neste caso, não existe um interesse natural na entrada. Porém, se o término da aplicação é sinalizado por read, então é preciso artificialmente incluir interesse na entrada com o intuito de detectar a condição de término. Havendo um sinal específico para término, a solução é como no caso anterior.

A diferença no modo de sinalizar o término, que varia entre os mecanismos concretos do reactor, torna difícil definir o contrato para Handler::dispatch. No meu entendimento, isto é uma consequência da severa limitação de select; caso este mecanismo seja excluído, uma API muito mais regular e enxuta deve ser possível.

Combinando as duas funções, enxergamos uma aplicação de demonstração que produz na saída tudo aquilo que consome na entrada. Esta aplicação já permite discutir alguns problemas básicos de reactors baseados em disponibilidade.

struct echo
{
  device_type m_device;

  interest_type m_interest;

  std::queue<std::vector<std::byte>> m_buffers;

  echo () : 
    m_device { ::acquire() },
    m_interest {},
    m_buffers {}
  {
    using event_nature::in;
    using event_nature::out;
    m_interest.set(in);
    m_interest.set(out);
  }

  ~echo()
  {
    ::release(m_device);
  }

  device_type device () { return m_device; }

  interest_type interest () { return m_interest; }

  void dispatch (event_type event)
  {
    using event_nature::in;
    using event_nature::out;
    if (event.has(in))  do_read();
    if (event.has(out)) do_write();
  }

  void do_read ()
  {
    std::vector<std::byte> buffer {};
    buffer.resize(...);
    auto r = ::read(m_device,buffer.data(),buffer.size());
    buffer.resize(r);
    buffers.push(std::move(buffer));
  }

  void do_write ()
  {
    while (! buffers.empty())
    {
      auto& buffer = buffers.front();
      ::write(m_device,buffer.data(),buffer.size());
      buffers.pop();
    }
  }
};

A definição para do_write acima está incorreta. A disponibilidade de saída significa alguma quantidade de saída, não qualquer quantidade; não existe garantia de qual quantidade seja essa. A aplicação deve escrever até que a capacidade seja totalmente consumida e o dispositivo se torne indisponível; toda operação tipo write deve contemplar essa possibilidade na sua sinalização de resultado.

void do_write ()
{
  while (! buffers.empty())
  {
    auto& buffer = buffers.front();
    auto r = ::write(m_device,buffer.data(),buffer.size());
    if (r == UNAVAILABLE) return;
    buffers.pop();
  }
}

A definição para do_write permanece incorreta. Ela é ingênua: ela pressupõe que a quantidade de bytes transferidos será ou zero ou buffer.size(); mas não existe garantia de quanto será transferido. Operações tipo write estão sujeitas a transferências parciais; portanto, é preciso sempre lembrar o estado de uma transferência parcial. Existem várias maneiras de fazer isso; vamos nos valer de um buffer_type rico com essa informação.

void do_write ()
{
  while (! buffers.empty())
  {
    auto& buffer = buffers.front();
    auto range = buffer.remaining();
    auto r = ::write(m_device,range.data(),range.size());
    if (r == UNAVAILABLE) return;
    buffer.advance(r);
    if (buffer.empty()) buffers.pop();
  }
}

Esta aplicação, apesar de ainda ser uma aplicação de demonstração, já permite vislumbrar a arquitetura de um serviço solicitação/resposta. Após do_read obter a entrada, algum processo transformará essa entrada em saídas, que serão enfileiradas para o consumo de do_write.

Este hipotético do_transform levará algum tempo. O reactor permanece esperando o handler retornar; durante este tempo, pode haver mais trabalho disponível para ser realizado. A solução natural é introduzir concorrência na arquitetura; vamos discutir isso em outra oportunidade.

Deixe um comentário