Problema produtor-consumidor - Producer–consumer problem

Em computação , o problema do produtor-consumidor (também conhecido como o problema de tampão delimitada ) é um exemplo clássico de uma multi- processo de sincronização problema, a primeira versão do que foi proposto por Edsger Dijkstra em 1965 no seu manuscrito não publicado, em em que o buffer era ilimitado e subsequentemente publicado com um buffer limitado em 1972. Na primeira versão do problema, há dois processos cíclicos, um produtor e um consumidor, que compartilham um buffer comum de tamanho fixo usado como fila . O produtor gera dados repetidamente e os grava no buffer. O consumidor lê repetidamente os dados no buffer, removendo-os durante a leitura e usando esses dados de alguma forma. Na primeira versão do problema, com um buffer ilimitado, o problema é como projetar o código do produtor e do consumidor de forma que, em sua troca de dados, nenhum dado seja perdido ou duplicado, os dados sejam lidos pelo consumidor na ordem em que é escrito pelo produtor e ambos os processos progridem tanto quanto possível. Na formulação posterior do problema, Dijkstra propôs vários produtores e consumidores compartilhando uma coleção finita de buffers. Isso acrescentou o problema adicional de evitar que os produtores tentassem gravar nos buffers quando todos estivessem cheios e de evitar que os consumidores lessem um buffer quando todos estivessem vazios.

O primeiro caso a considerar é aquele em que há um único produtor e um único consumidor, e há um buffer de tamanho finito. A solução para o produtor é dormir ou descartar os dados se o buffer estiver cheio. Na próxima vez que o consumidor remover um item do buffer, ele notificará o produtor, que começará a preencher o buffer novamente. Da mesma forma, o consumidor pode dormir se encontrar o buffer vazio. Na próxima vez que o produtor colocar dados no buffer, ele acordará o consumidor adormecido. A solução pode ser alcançada por meio da comunicação entre processos , normalmente por meio de semáforos . Uma solução inadequada pode resultar em um deadlock em que ambos os processos aguardam para serem ativados.

Implementação inadequada

Para resolver o problema, é tentador propor a "solução" mostrada abaixo. Na solução, duas rotinas de biblioteca são usadas, sleep e wakeup . Quando o sono é chamado, o chamador é bloqueado até que outro processo o acorde usando a rotina de despertar. A variável global itemCount contém o número de itens no buffer.

int itemCount = 0;

procedure producer() 
{
    while (true) 
    {
        item = produceItem();

        if (itemCount == BUFFER_SIZE) 
        {
            sleep();
        }

        putItemIntoBuffer(item);
        itemCount = itemCount + 1;

        if (itemCount == 1) 
        {
            wakeup(consumer);
        }
    }
}

procedure consumer() 
{
    while (true) 
    {

        if (itemCount == 0) 
        {
            sleep();
        }

        item = removeItemFromBuffer();
        itemCount = itemCount - 1;

        if (itemCount == BUFFER_SIZE - 1) 
        {
            wakeup(producer);
        }

        consumeItem(item);
    }
}

O problema com essa solução é que ela contém uma condição de corrida que pode levar a um deadlock . Considere o seguinte cenário:

  1. O consumer acabou de ler a variável itemCount , percebeu que é zero e está prestes a se mover dentro do if bloco.
  2. Antes de chamar o sleep, o consumidor é interrompido e o produtor retomado.
  3. O produtor cria um item, coloca-o no buffer e aumenta itemCount .
  4. Como o buffer estava vazio antes da última adição, o produtor tenta acordar o consumidor.
  5. Infelizmente, o consumidor ainda não estava dormindo e a chamada de despertar foi perdida. Quando o consumidor retoma, ele vai dormir e nunca mais será acordado. Isso porque o consumidor só é despertado pelo produtor quando itemCount for igual a 1.
  6. O produtor fará um loop até que o buffer esteja cheio, após o que também entrará em hibernação.

Uma vez que ambos os processos ficarão inativos para sempre, chegamos a um impasse. Esta solução, portanto, não é satisfatória.

Uma análise alternativa é que se a linguagem de programação não define a semântica de acessos concorrentes a variáveis ​​compartilhadas (neste caso itemCount ) com uso de sincronização, então a solução é insatisfatória por esse motivo, sem a necessidade de demonstrar explicitamente uma condição de corrida.

Usando semáforos

Os semáforos resolvem o problema de chamadas de despertar perdidas. Na solução abaixo, usamos dois semáforos, fillCount e emptyCount , para resolver o problema. fillCount é o número de itens já no buffer e disponíveis para leitura, enquanto emptyCount é o número de espaços disponíveis no buffer onde os itens podem ser gravados. fillCount é incrementado e emptyCount decrementado quando um novo item é colocado no buffer. Se o produtor tentar decrementar emptyCount quando seu valor for zero, o produtor será colocado no modo de hibernação. Na próxima vez que um item é consumido, emptyCount é incrementado e o produtor acorda. O consumidor trabalha de forma análoga.

semaphore fillCount = 0; // items produced
semaphore emptyCount = BUFFER_SIZE; // remaining space

procedure producer() 
{
    while (true) 
    {
        item = produceItem();
        down(emptyCount);
        putItemIntoBuffer(item);
        up(fillCount);
    }
}

procedure consumer() 
{
    while (true) 
    {
        down(fillCount);
        item = removeItemFromBuffer();
        up(emptyCount);
        consumeItem(item);
    }
}

A solução acima funciona bem quando há apenas um produtor e um consumidor. Com vários produtores compartilhando o mesmo espaço de memória para o buffer de item, ou vários consumidores compartilhando o mesmo espaço de memória, esta solução contém uma condição de corrida séria que pode resultar em dois ou mais processos lendo ou gravando no mesmo slot ao mesmo tempo. Para entender como isso é possível, imagine como o procedimento putItemIntoBuffer() pode ser implementado. Ele pode conter duas ações, uma determinando o próximo slot disponível e a outra escrevendo nele. Se o procedimento pode ser executado simultaneamente por vários produtores, o seguinte cenário é possível:

  1. Decréscimo de dois produtores emptyCount
  2. Um dos produtores determina o próximo slot vazio no buffer
  3. O segundo produtor determina o próximo slot vazio e obtém o mesmo resultado que o primeiro produtor
  4. Ambos os produtores escrevem no mesmo slot

Para superar esse problema, precisamos garantir que apenas um produtor esteja executando putItemIntoBuffer() por vez. Em outras palavras, precisamos de uma maneira de executar uma seção crítica com exclusão mútua . A solução para vários produtores e consumidores é mostrada abaixo.

mutex buffer_mutex; // similar to "semaphore buffer_mutex = 1", but different (see notes below)
semaphore fillCount = 0;
semaphore emptyCount = BUFFER_SIZE;

procedure producer() 
{
    while (true) 
    {
        item = produceItem();
        down(emptyCount);
        down(buffer_mutex);
        putItemIntoBuffer(item);
        up(buffer_mutex);
        up(fillCount);
    }
}

procedure consumer() 
{
    while (true) 
    {
        down(fillCount);
        down(buffer_mutex);
        item = removeItemFromBuffer();
        up(buffer_mutex);
        up(emptyCount);
        consumeItem(item);
    }
}

Observe que a ordem em que diferentes semáforos são incrementados ou decrementados é essencial: alterar a ordem pode resultar em um deadlock. É importante notar aqui que embora o mutex pareça funcionar como um semáforo com valor 1 (semáforo binário), há diferença no fato de que o mutex tem conceito de propriedade. Propriedade significa que o mutex só pode ser "incrementado" de volta (definido como 1) pelo mesmo processo que o "diminuiu" (definido como 0), e todas as outras tarefas esperam até que o mutex esteja disponível para redução (efetivamente significando que o recurso está disponível) , o que garante exclusividade mútua e evita impasses. Assim, usar mutexes incorretamente pode paralisar muitos processos quando o acesso exclusivo não é necessário, mas mutex é usado em vez de semáforo.

Usando monitores

O pseudocódigo a seguir mostra uma solução para o problema produtor-consumidor usando monitores . Como a exclusão mútua está implícita nos monitores, nenhum esforço extra é necessário para proteger a seção crítica. Em outras palavras, a solução mostrada abaixo funciona com qualquer número de produtores e consumidores sem nenhuma modificação. Também é digno de nota que é menos provável para um programador escrever código que sofre de condições de corrida ao usar monitores do que ao usar semáforos.

monitor ProducerConsumer 
{
    int itemCount = 0;
    condition full;
    condition empty;

    procedure add(item) 
    {
        if (itemCount == BUFFER_SIZE) 
        {
            wait(full);
        }

        putItemIntoBuffer(item);
        itemCount = itemCount + 1;

        if (itemCount == 1)
        {
            notify(empty);
        }
    }

    procedure remove() 
    {
        if (itemCount == 0) 
        {
            wait(empty);
        }

        item = removeItemFromBuffer();
        itemCount = itemCount - 1;

        if (itemCount == BUFFER_SIZE - 1)
        {
            notify(full);
        }

        return item;
    }
}

procedure producer() 
{
    while (true) 
    {
        item = produceItem();
        ProducerConsumer.add(item);
    }
}

procedure consumer() 
{
    while (true) 
    {
        item = ProducerConsumer.remove();
        consumeItem(item);
    }
}

Sem semáforos ou monitores

O problema produtor-consumidor, particularmente no caso de um único produtor e único consumidor, está fortemente relacionado à implementação de um PEPS ou de um canal . O padrão produtor-consumidor pode fornecer comunicação de dados altamente eficiente sem depender de semáforos, mutexes ou monitores para transferência de dados . O uso dessas primitivas pode ser caro em termos de desempenho, em comparação com a operação atômica básica de leitura / gravação. Canais e FIFOs são populares apenas porque evitam a necessidade de sincronização atômica ponta a ponta. Um exemplo básico codificado em C é mostrado abaixo. Observe que:

  • O acesso atômico de leitura-modificação-gravação a variáveis ​​compartilhadas é evitado, pois cada uma das duas Count variáveis ​​é atualizada apenas por um único encadeamento. Além disso, essas variáveis ​​oferecem suporte a um número ilimitado de operações de incremento; a relação permanece correta quando seus valores envolvem um estouro de inteiro.
  • Este exemplo não suspende os threads, o que pode ser aceitável dependendo do contexto do sistema. O schedulerYield() é inserido como uma tentativa de melhorar o desempenho e pode ser omitido. Bibliotecas de threads geralmente requerem semáforos ou variáveis ​​de condição para controlar a suspensão / ativação das threads. Em um ambiente com vários processadores, a suspensão / ativação do thread ocorreria com muito menos frequência do que a passagem de tokens de dados, portanto, evitar operações atômicas na passagem de dados é benéfico.
  • Este exemplo não funciona para vários produtores e / ou consumidores porque há uma condição de corrida ao verificar o estado. Por exemplo, se apenas um token estiver no buffer de armazenamento e dois consumidores encontrarem o buffer não vazio, ambos consumirão o mesmo token e possivelmente aumentarão o contador dos tokens consumidos acima do contador dos tokens produzidos.
  • Este exemplo, conforme escrito, requer que UINT_MAX + 1 seja igualmente divisível por BUFFER_SIZE ; se não é divisível, [Count % BUFFER_SIZE] produz o índice tampão errado após Count envoltórios passados UINT_MAX de volta a zero. Uma solução alternativa que evita essa limitação emprega duas Idx variáveis adicionais para rastrear o índice de buffer atual para a cabeça (produtor) e a cauda (consumidor). Estas Idx variáveis seria usado no lugar de [Count % BUFFER_SIZE] , e cada um deles teria que ser incrementado, ao mesmo tempo que a respectiva Count variável é incrementado, como segue: Idx = (Idx + 1) % BUFFER_SIZE .
  • As duas Count variáveis ​​precisam ser suficientemente pequenas para suportar ações atômicas de leitura e gravação. Caso contrário, há uma condição de corrida em que o outro segmento lê um valor parcialmente atualizado e, portanto, incorreto.


volatile unsigned int produceCount = 0, consumeCount = 0;
TokenType sharedBuffer[BUFFER_SIZE];

void producer(void) {
	while (1) {
		while (produceCount - consumeCount == BUFFER_SIZE) {
			schedulerYield(); /* sharedBuffer is full */
		}
		/* Write to sharedBuffer _before_ incrementing produceCount */
		sharedBuffer[produceCount % BUFFER_SIZE] = produceToken();
		/* Memory barrier required here to ensure update of the sharedBuffer is 
		visible to other threads before the update of produceCount */
		++produceCount;
	}
}

void consumer(void) {
	while (1) {
		while (produceCount - consumeCount == 0) {
			schedulerYield(); /* sharedBuffer is empty */
		}
		consumeToken(&sharedBuffer[consumeCount % BUFFER_SIZE]);
		++consumeCount;
	}
}

A solução acima emprega contadores que, quando usados ​​com frequência, podem ficar sobrecarregados e atingir seu valor máximo UINT_MAX . A ideia delineada no quarto item, sugerida originalmente por Leslie Lamport , explica como os contadores podem ser substituídos por contadores de alcance finito. Especificamente, eles podem ser substituídos por contadores de faixa finita com valor máximo N, a capacidade do buffer.

Quatro décadas após a apresentação do problema produtor-consumidor, Aguilera, Gafni e Lamport mostraram que o problema pode ser resolvido de forma que os processos acessem apenas contadores de alcance fixo (ou seja, um alcance que é independente do tamanho do buffer) ao determinar se o buffer está vazio ou cheio. A motivação para essa medida de eficiência é acelerar as interações entre um processador e dispositivos que interagem por meio de canais FIFO. Eles propuseram uma solução na qual contadores de valor máximo são lidos para determinar se é seguro acessar o buffer. No entanto, sua solução ainda emprega contadores ilimitados que crescem infinitamente, apenas para que esses contadores não sejam acessados ​​durante a fase de verificação descrita.

Posteriormente, Abraham e Amram propuseram uma solução mais simples, apresentada a seguir em pseudocódigo, que possui a propriedade de alcance fixo discutida. A solução emprega contadores de valor máximo N. No entanto, para determinar se o buffer está vazio ou cheio, os processos acessam apenas registros de gravador único de intervalo finito . Cada um dos processos possui um gravador único de 12 valores. O processo produtor grava Flag_p e o processo consumidor grava Flag_c , ambos são matrizes de 3 campos. Flag_p[2] e Flag_c[2] pode armazenar "cheio", "vazio" ou "seguro", que indicam correspondentemente se o buffer está cheio, vazio ou nem cheio nem vazio.

A ideia por trás do algoritmo é a seguinte. Os processos contabilizam a quantidade de itens entregues e retirados módulo N + 1 por meio de registros CountDelivered e CountRemoved . Quando um processo entrega ou remove um item, ele compara esses contadores e, portanto, determina com sucesso o status do buffer e armazena esses dados em Flag_p[2] ou Flag_c[2] . Em uma fase de verificação, o processo de execução lê Flag_p e Flag_c tenta estimar qual valor entre Flag_p[2] e Flag_c[2] reflete o status atual do buffer. Duas técnicas de sincronização ajudam a atingir esse objetivo.

  1. Depois de entregar um item, o produtor escreve para Flag_p[0] o valor que ler a partir Flag_c[0] , e após a remoção de um item, as gravações de consumo para Flag_c[1] o valor: 1-Flag_p[0] . Portanto, a condição Flag_p[0] == Flag_c[0] sugere que o produtor verificou recentemente o status do buffer, enquanto Flag_p[0] != Flag_c[0] sugere o contrário.
  2. Uma operação de entrega (remoção) termina escrevendo para Flag_p[1] ( Flag_c[1] ) o valor armazenado em Flag_p[0] ( Flag_c[0] ). Portanto, a condição Flag_p[0] == Flag_p[1] sugere que o produtor concluiu sua última operação de entrega. Da mesma forma, a Condição Flag_c[0] == Flag_c[1] sugere que a última remoção do consumidor já foi encerrada.

Portanto, na fase de verificação, se o produtor constatar isso Flag_c[0] != Flag_p[0] & Flag_c[0] == Flag_c[1] , ele age de acordo com o valor de Flag_c[2] , e caso contrário, de acordo com o valor armazenado em Flag_p[2] . Analogamente, se o consumidor descobrir isso Flag_p[0] == Flag_c[0] & Flag_p[0] == Flag_p[1] , ele age de acordo com o valor de Flag_p[2] e, caso contrário, de acordo com o valor armazenado em Flag_c[2] . No código abaixo, as variáveis ​​em maiúsculas indicam registros compartilhados, escritos por um dos processos e lidos por ambos os processos. Variáveis ​​não capitalizadas são variáveis ​​locais nas quais os processos copiam os valores lidos dos registradores compartilhados.

countDelivered = 0;
countRemoved = 0;
Flag_p[0] = 0; Flag_p[1] = 0; Flag_p[2] = "empty";
Flag_c[0] = 0; Flag_c[1] = 0; Flag_c[2] = "empty"; 

procedure producer() 
{
    while (true) {
        item = produceItem();
        
        /* check phase: busy wait until the buffer is not full */       	
        repeat {
            flag_c = Flag_c;
    	      if (flag_c[0] != Flag_p[0] & flag_c[0] == flag_c[1]) ans = flag_c[2];
    	      else ans = Flag_p[2];
        } until (ans != "full")
    
         /* item delivery phase */
         putItemIntoBuffer(item);
         CountDeliverd = countDelivered+1 % N+1;
         flag_c = Flag_c;
         Flag_p[0] = flag_c[0];
         removed = CountRemoved;
         if (CountDelivered  removed == N) { Flag_p[1] = flag_c[0]; Flag_p[2] = "full"; }
         if (CountDelivered  removed == 0) { Flag_p[1] = flag_c[0]; Flag_p[2] = "empty"; }
         if (0 < CountDelivered  removed < N) { Flag_p[1] = flag_c[0]; Flag_p[2] = "safe"; }
     }
}

procedure consumer() 
{
    while (true) {
        /* check phase: busy wait until the buffer is not empty */       	
        repeat {
            flag_p = Flag_p;
    	      if (flag_p[0] == Flag_c[0] & flag_p[1] == flag_p[0]) ans = flag_p[2]);
    	      else ans = Flag_c[2];
         } until (ans != "empty")
    
         /* item removal phase */
         Item = removeItemFromBuffer();
         countRemoved = countRemoved+1 % N+1;
         flag_p = Flag_p;
         Flag_c[0] = 1-flag_p[0];
         delivered = CountDelivered;
         if (delivered  CountRemoved == N) { Flag_c[1] = 1-flag_p[0]; Flag_c[2] = "full"; }
         if (delivered  CountRemoved == 0) { Flag_c[1] = 1-flag_p[0]; Flag_c[2] = "empty"; }
         if (0 < delivered  CountRemoved < N) { Flag_c[1] = 1-flag_p[0]; Flag_c[2] ="safe"; }
     }
}

A exatidão do código se baseia na suposição de que os processos podem ler um array inteiro, ou gravar em vários campos de um array, em uma única ação atômica. Como essa suposição não é realista, na prática, deve-se substituir Flag_p e Flag_c por inteiros (log (12) -bit) que codificam os valores dessas matrizes. Flag_p e Flag_c são apresentados aqui como matrizes apenas para a legibilidade do código.

Veja também

Referências

Leitura adicional