Usando Consumidores Individuais

Consumir mensagens de um stream usando um consumidor individual.

Se usar consumidores individuais para consumir mensagens de seus streams em vez de usar grupos de consumidores, você não poderá aproveitar muitos dos benefícios do Streaming, como coordenação gerenciada por serviços, dimensionamento horizontal e gerenciamento de contrapartidas. Seus aplicativos precisarão lidar com esses cenários, e muito mais, de modo programático.

Por esses motivos, recomendamos usar grupos de consumidores em um ambiente de produção, mas pode ser útil usar consumidores individuais para testes ou aplicativos de prova de conceito.

Usando Cursores

Um cursor é um ponteiro para um local em um stream. O local pode ser uma referência ou horário específico em uma partição.

Antes de começar a consumir mensagens, você precisa indicar o ponto a partir do qual deseja iniciar o consumo. Você pode fazer isso criando um cursor.

Há cinco tipos de cursor suportados:

  • TRIM_HORIZON - Inicia o consumo da mensagem mais antiga disponível no stream. Crie um cursor no TRIM_HORIZON para consumir todas as mensagens em um stream.
  • AT_OFFSET - Inicia o consumo em um deslocamento especificado. O deslocamento deve ser maior ou igual ao deslocamento da mensagem mais antiga e menor ou igual ao deslocamento publicado mais recente.
  • AFTER_OFFSET - Inicia o consumo após o deslocamento especificado. Este cursor tem as mesmas restrições do cursor AT_OFFSET.
  • AT_TIME - Inicia o consumo a partir de um determinado horário. O timestamp da mensagem retornada será no horário fornecido ou após o horário fornecido.
  • LATEST - Inicia o consumo de mensagens que foram publicadas após a criação do cursor.

Ao criar um cursor para um consumidor individual, você precisa especificar a partição no stream para o cursor usar. Se o seu stream tiver mais de uma partição com mensagens, será necessário criar vários cursores para lê-las.

Depois de criar um cursor, você pode começar a consumir mensagens obtendo-as.

Contanto que você continue consumindo mensagens, não é necessário recriar um cursor. Crie cursores fora dos loops para obter mensagens.

Obtendo Mensagens

Depois de criar um cursor, obtenha (leia) mensagens, especificando esse cursor para começar a consumir mensagens. O serviço responde com suas mensagens e o valor do cabeçalho opc-next-cursor. Use o valor de cabeçalho retornado na sua próxima chamada GetMessages. O cursor retornado nunca é nulo, mas expira em cinco minutos. Se você parar de consumir mensagens por mais de cinco minutos, será necessário recriar um cursor.

Se houver mais de um consumidor lendo na mesma partição, eles receberão as mesmas mensagens. Decida como o aplicativo processa essas mensagens.

Se não houver mais mensagens não lidas na partição, o serviço Streaming retornará uma lista vazia de mensagens.

Os tamanhos de batch GetMessages se baseiam no tamanho médio da mensagem publicada no stream. Por padrão, o serviço retorna o máximo de mensagens possível. Você pode usar o parâmetro limit para especificar qualquer valor até 10.000, mas considere seu tamanho médio de mensagem para evitar exceder o throughput no stream.

Ficando para trás

Para determinar se o seu consumidor está ficando para trás (produção maior do que o consumo), você pode usar o timestamp da mensagem. Se o consumidor estiver ficando para trás, considere a geração de mais consumidores para assumir algumas das partições do primeiro consumidor. Não haverá recuperação possível se você estiver ficando para trás em uma única partição.

Considere as seguintes opções:

  • Crie um novo stream com mais partições.
  • Use grupos de consumidores.
  • Se o problema for causado por um ponto de acesso, altere a estratégia da chave da mensagem.
  • Reduza o tempo de processamento das mensagens ou lide com as solicitações em paralelo.

Para descobrir quantas mensagens permanecem para serem consumidas em uma partição específica, use um cursor do tipo LATEST, obtenha o deslocamento da próxima mensagem publicada e faça o delta com o deslocamento que você está consumindo no momento.

Gerenciando Referências

As referências indicam a localização de uma mensagem dentro de uma partição. Se o seu consumidor for reiniciado ou você precisar se recuperar de uma falha, poderá usar a referência para reiniciar a leitura do stream.

Dica

Grupos de consumidores podem gerenciar commits de referência automaticamente.

Quando você usa consumidores individuais, seu aplicativo consumidor deve gerenciar referências processadas (consulte Confirmando Manualmente uma Defasagem). O consumidor é responsável por armazenar quais referências ele atingiu ou nas quais parou em cada partição. Quando o consumidor reiniciar, leia a referência da última mensagem que você processou e, em seguida, crie um cursor do tipo AFTER_OFFSET e especifique a referência que você acabou de receber. Não fornecemos orientação para armazenar a referência da última mensagem processada. Você pode usar qualquer método, como outro stream, um arquivo em sua máquina ou o Object Storage.

Observação

As referências de mensagem não são densas. As referências são números monotonicamente crescentes. Eles não diminuem, e às vezes aumentam em mais de uma unidade. Por exemplo, se você publicar duas mensagens na mesma partição, a primeira pode ter uma referência de 42 e a segunda pode ter uma referência de 45 (sendo que as referências 43 e 44 não existem).