Usando Grupos de Consumidores

Consumir mensagens de um stream usando grupos de consumidores.

Os consumidores podem ser configurados para consumir mensagens como parte de um grupo. Em um ambiente de produção com várias partições, usar um grupo de consumidores é nosso método recomendado para consumir mensagens do serviço Streaming.

Cada partição de stream é designada a um membro de um grupo de consumidores. Um membro individual de um grupo de consumidores é chamado de instância. Cada instância em um grupo de consumidores recebe mensagens de uma ou mais partições, a menos que haja mais instâncias do que partições. Instâncias que excedam a contagem de partições do stream não recebem mensagens.

Os grupos de consumidores lidam com a coordenação necessária para vários consumidores compartilharem o consumo de um stream. Um grupo de consumidores automaticamente:

  • Designa uma ou mais partições a uma instância
  • Rastreia as mensagens recebidas pelo grupo e gerencia commits
  • Solicita as partições e contrapartidas apropriadas em nome de cada instância
  • Balanceia o grupo à medida que as instâncias entram ou saem

Até 50 grupos de consumidores podem ler um único stream. Cada grupo de consumidores recebe todas as mensagens no stream pelo menos uma vez.

Os grupos de consumidores são efêmeros. Eles desaparecem quando não são usados pelo período de retenção do stream.

Criando um Grupo de Consumidores

Um grupo de consumidores é criado na primeira solicitação CreateGroupCursor (consulte Criando um Cursor de Grupo). Os cursores de grupo definem um par nome do grupo/nome da instância. Ao criar o cursor do grupo, forneça o ID do stream, um nome de grupo, um nome de instância e um dos seguintes tipos de cursor com suporte:

  • TRIM_HORIZON - O grupo começa a consumir pela mensagem mais antiga disponível no stream.
  • AT_TIME - O grupo começa a consumir de um horário específico. O timestamp da mensagem retornada é no horário fornecido ou após o horário fornecido.
  • LATEST - O grupo inicia o consumo de mensagens que foram publicadas após a criação do cursor.

Os tipos de cursor de grupo são ignorados nas chamadas CreateGroupCursor que incluem o nome de um grupo existente. As referências com commit desse grupo são usadas em vez do tipo de cursor fornecido.

O serviço Streaming usa o nome da instância para identificar membros do grupo ao gerenciar referências. Use nomes de instância exclusivos para cada instância do grupo de consumidores.

Se quiser que o serviço Streaming manipule referências com commit, deixe o valor commitOnGet do cursor do grupo definido como true. Recomendamos o uso desse método para reduzir a complexidade do aplicativo para que seu aplicativo não precise tratar de commits.

Consumindo como um Grupo

Depois que suas instâncias ingressarem no grupo de consumidores, elas poderão ler mensagens do stream usando GetMessages (consulte Criando um Cursor de Grupo). Cada chamada GetMessages retorna o cursor a ser usado na próxima chamada GetMessages como o valor do cabeçalho opc-next-cursor. O cursor retornado nunca é nulo, mas expira em cinco minutos. Contanto que você continue consumindo, não é necessário recriar um cursor.

Quando o serviço Streaming recebe uma solicitação de mensagens de uma instância, o serviço:

Os tamanhos de batch GetMessages se baseiam no tamanho médio da mensagem publicada nesse stream. Por padrão, o serviço retorna o máximo de mensagens possível. Você pode usar o parâmetro limit para especificar qualquer valor até 10.000, mas considere seu tamanho médio de mensagem para evitar exceder o throughput no stream ou timeouts.

Se não houver mais mensagens não lidas na partição, o serviço Streaming retornará uma lista vazia de mensagens.

Como os grupos de consumidores removem instâncias que pararam de consumir mensagens por mais de 30 segundos, solicite menos mensagens para evitar timeouts ou estenda o timeout usando ConsumerHeartbeat (consulte Enviando uma pulsação).

Uma partição não pode ser designada a várias instâncias dentro do mesmo grupo de consumidores. Se houver mais instâncias do que partições, as instâncias não designadas poderão enviar solicitações GetMesages, mas elas não receberão mensagens. Elas permanecem ociosas até que o grupo de consumidores precise substituir uma instância, como quando um membro existente do grupo não atua dentro do período de timeout.

Se precisar atualizar manualmente a posição do grupo, você poderá usar UpdateGroup (consulte Atualizando um Grupo de Consumidores) para redefinir o local de todos os consumidores do grupo para o local especificado no stream.

Referências e Commits

As referências indicam a localização de uma mensagem dentro de uma partição. Se um consumidor for reiniciado ou você precisar se recuperar de uma falha, poderá usar a referência para reiniciar a leitura do stream.

Quando você usa um grupo de consumidores, o serviço Streaming trata as referências automaticamente. O comportamento padrão de commitOnGet=true significa que as referências da solicitação anterior serão submetidas a commit. Por exemplo:

Para o consumidor A:

  • A chama GetMessages e recebe mensagens de uma partição arbitrária, com referências de 1 a 100.
  • A processa todas as 100 mensagens com sucesso.
  • A chama GetMessages e o serviço Streaming faz commit da referência 100 e retorna mensagens com as referências 101-200.
  • A processa 15 mensagens e fica off-line inesperadamente (por mais de 30 segundos).

Um novo consumidor B:

  • B chama GetMessages e o serviço Streaming usa a referência com commit mais recente e retorna mensagens com as referências 101-200.
  • B continua o loop de mensagens.

Neste exemplo, uma parte (15) das mensagens foi processada pelo menos uma vez, o que significa que elas poderiam ter sido processadas mais de uma vez, mas nenhum dado foi perdido.

O serviço Streaming fornece uma semântica "pelo menos uma vez" para grupos de consumidores. Considere quando as referências são submetidas a commit em um loop de mensagens. Se um consumidor ficar off-line antes de confirmar um batch de mensagens, esse batch poderá ser fornecido a outro consumidor. Quando uma partição é dada a outro consumidor, o consumidor usa a referência com commit recente para iniciar o consumo. O consumidor não recebe mensagens antes do commit da referência. Recomendamos que os aplicativos consumidores cuidem de duplicatas.

Observação

As referências de mensagem não são densas. As referências são números monotonicamente crescentes. Eles não diminuem, e às vezes aumentam em mais de uma unidade. Por exemplo, se você publicar duas mensagens na mesma partição, a primeira pode ter uma referência de 42 e a segunda pode ter uma referência de 45 (sendo que as referências 43 e 44 não existem).

Para substituir o comportamento de referência padrão e implementar um mecanismo de commit de referência personalizado, defina commitOnGet como false ao criar o cursor do grupo. Você pode usar ConsumerCommit (consulte Confirmando Manualmente uma Defasagem) para confirmar mensagens sem ler mais mensagens. ConsumerCommit retorna um cursor a ser usado na sua próxima solicitação.

Cuidado

Escrever uma lógica de commit personalizada é complicado e cheio de condições e considerações de corrida. Existem muitos casos em que algum estado interno é alterado e o cliente é obrigado a lidar com a situação.

Balanceando e Rebalanceando

O serviço Streaming considera o número de partições no stream e o número de instâncias no grupo de consumidores ao avaliar o saldo. O balanceamento de grupo é automático. Cada consumidor é designado a uma ou mais partições com base no seguinte cálculo:

(nPartições / nConsumidores) ± 1

Por exemplo, se houver oito partições no stream e quatro consumidores no grupo, cada consumidor será designado a duas partições. Se houver 10 partições no stream e quatro consumidores no grupo, dois consumidores serão designados a duas partições e dois consumidores serão designados a três partições.

À medida que as instâncias entram ou saem de um grupo de consumidores e há solicitações de mensagens, as designações de partição vão sendo reavaliadas. Se o stream tiver pelo menos uma partição a mais que o número de instâncias atuais no grupo, e uma nova instância se associar, as partições serão redesignadas a todas as instâncias, incluindo a nova. Se uma instância do grupo parar de consumir mensagens por mais de 30 segundos ou não enviar um ConsumerHeartbeat em 30 segundos, essa instância será removida do grupo de consumidores e sua partição será redesignada, se possível, a outra instância.

Esses eventos são chamados de rebalanceamento. As instâncias do grupo não estão conscientes do processo de rebalanceamento, mas o grupo coordenou a posse de um conjunto mutuamente exclusivo de partições no stream.

No final de uma operação de rebalanceamento bem-sucedida para um grupo de consumidores, cada partição dentro do stream pertence a uma instância dentro do grupo.

Dessa forma, você pode ampliar o número de instâncias até o número de partições até que cada instância esteja consumindo mensagens de apenas uma partição. Essa configuração maximiza o throughput disponível do seu stream. Após esse ponto, qualquer nova instância que ingressar no grupo permanecerá em estado inativo sem ser designada a qualquer partição.