Uso dei gruppi di consumer

Utilizza i messaggi di un flusso utilizzando i gruppi di consumer.

È possibile configurare i consumer in modo che utilizzino i messaggi nell'ambito di un gruppo. In un ambiente di produzione con più partizioni, l'utilizzo di un gruppo di consumer è il metodo consigliato per utilizzare i messaggi di streaming.

Ogni partizione di flusso viene assegnata a un membro di un gruppo di consumer. Un singolo membro di un gruppo di consumer viene definito istanza. Ogni istanza di un gruppo di consumer riceve messaggi da una o più partizioni, a meno che non vi siano più istanze di partizioni. Le istanze in eccesso rispetto al conteggio delle partizioni per il flusso non ricevono messaggi.

I gruppi di consumatori gestiscono il coordinamento necessario per consentire a più consumatori di condividere il consumo di un flusso. Un gruppo di consumatori automaticamente:

  • Assegna una o più partizioni a un'istanza
  • Tiene traccia dei messaggi ricevuti dal gruppo e gestisce i commit
  • Richiede le partizioni e gli offset appropriati per conto di ogni istanza
  • Salda il gruppo quando le istanze si uniscono o lasciano

Fino a 50 gruppi di consumer possono leggere da un singolo flusso. Ogni gruppo di consumer riceve tutti i messaggi nel flusso almeno una volta.

I gruppi di consumer sono effimeri. Essi scompaiono quando non vengono utilizzati per il periodo di conservazione del flusso.

Creazione di un gruppo di consumer

Nella prima richiesta CreateGroupCursor viene creato un gruppo di consumer (vedere Creazione di un cursore di gruppo). I cursori dei gruppi definiscono una coppia nome gruppo/nome istanza. Quando si crea il cursore del gruppo, fornire l'ID del flusso, il nome di un gruppo, il nome di un'istanza e uno dei seguenti tipi di cursore supportati:

  • TRIM_HORIZON: il gruppo inizia a utilizzare il messaggio meno recente disponibile nel flusso.
  • AT_TIME: il gruppo inizia a utilizzare da un'ora specifica. L'indicatore orario del messaggio restituito corrisponde o è successivo all'ora specificata.
  • LATEST: il gruppo inizia a utilizzare i messaggi pubblicati dopo la creazione del cursore.

I tipi di cursore gruppo vengono ignorati nelle chiamate CreateGroupCursor che includono il nome di un gruppo esistente. I offset impegnati del gruppo vengono utilizzati al posto del tipo di cursore fornito.

Il servizio di streaming utilizza il nome dell'istanza per identificare i membri del gruppo durante la gestione degli offset. Utilizzare nomi di istanza univoci per ogni istanza del gruppo di consumer.

Se si desidera che il servizio di streaming gestisca l'impegno degli offset, lasciare il valore commitOnGet del cursore del gruppo impostato su true. Si consiglia di utilizzare questo metodo per ridurre la complessità dell'applicazione in modo che l'applicazione non debba gestire i commit.

Consumare come gruppo

Dopo aver aderito al gruppo di consumer, le istanze possono leggere i messaggi dal flusso utilizzando GetMessages (vedere Creazione di un cursore di gruppo). Ogni chiamata GetMessages restituisce il cursore da utilizzare nella successiva chiamata GetMessages come valore di intestazione opc-next-cursor. Il cursore restituito non è mai nullo, ma scade tra cinque minuti. Finché si continua a consumare, non è necessario ricreare un cursore.

Quando Streaming riceve una richiesta di messaggi da un'istanza, il servizio effettua le operazioni riportate di seguito.

Le dimensioni batch di GetMessages si basano sulla dimensione media dei messaggi pubblicati nel flusso. Per impostazione predefinita, il servizio restituisce il maggior numero possibile di messaggi. È possibile utilizzare il parametro limit per specificare qualsiasi valore fino a 10.000, ma considerare la dimensione media dei messaggi per evitare di superare il throughput nel flusso o nei timeout.

Se non ci sono più messaggi non letti nella partizione, Streaming restituisce un elenco vuoto di messaggi.

Poiché i gruppi di consumer rimuovono le istanze che hanno interrotto il consumo dei messaggi per più di 30 secondi, richiedere un numero inferiore di messaggi per evitare timeout o estendere il timeout utilizzando ConsumerHeartbeat (vedere Invio di un heartbeat).

Impossibile assegnare una partizione a più istanze all'interno dello stesso gruppo di consumer. Se si dispone di più istanze rispetto alle partizioni, le istanze non assegnate possono inviare richieste GetMesages, ma non ricevono messaggi. Restano inattivi finché il gruppo di consumer non deve sostituire un'istanza, ad esempio quando un membro esistente del gruppo non agisce entro il periodo di timeout.

Se è necessario aggiornare manualmente la posizione del gruppo, è possibile utilizzare UpdateGroup (vedere Aggiornamento di un gruppo di consumer) per reimpostare la posizione di tutti i consumer del gruppo sulla posizione specificata nel flusso.

Offset e commit

Gli offset indicano la posizione di un messaggio all'interno di una partizione. Se un consumer si riavvia o è necessario eseguire il ripristino da un errore, è possibile utilizzare l'offset per riavviare la lettura dal flusso.

Quando si utilizza un gruppo di consumer, Streaming gestisce automaticamente gli offset. Il comportamento predefinito di commitOnGet=true indica che viene eseguito il commit degli offset dalla richiesta precedente. Ad esempio:

Per il consumatore A:

  • A chiama GetMessages e riceve i messaggi da una partizione arbitraria, con offset compresi tra 1 e 100.
  • A elabora tutti i 100 messaggi correttamente.
  • A chiama GetMessages e il servizio di streaming esegue il commit dell'offset 100 e restituisce i messaggi con offset 101-200.
  • A elabora 15 messaggi, quindi va offline in modo imprevisto (per più di 30 secondi).

Un nuovo consumatore B:

  • B chiama GetMessages e il servizio di streaming utilizza l'offset di cui è stato eseguito il commit più recente e restituisce i messaggi con offset 101-200.
  • B continua il ciclo dei messaggi.

In questo esempio, una parte (15) dei messaggi sono stati elaborati almeno una volta, il che significa che potrebbero essere stati elaborati più di una volta, ma nessun dato viene perso.

Lo streaming fornisce la semantica "al-least-once" per i gruppi di consumatori. Considerare quando viene eseguito il commit degli offset in un loop di messaggi. Se un consumer va offline prima di eseguire il commit di un batch di messaggi, tale batch potrebbe essere assegnato a un altro consumer. Quando una partizione viene assegnata a un altro consumer, il consumer utilizza l'offset impegnato più recente per avviare il consumo. Il consumer non riceve messaggi prima dell'offset di cui è stato eseguito il commit. Ti consigliamo di fare in modo che siano le applicazioni consumer a occuparsi dei duplicati.

Nota

Gli offset dei messaggi non sono densi. Gli offset sono numeri monotoni in aumento. Non diminuiscono, e a volte aumentano di più di uno. Ad esempio, se si pubblicano due messaggi nella stessa partizione, il primo messaggio potrebbe avere un offset di 42 e il secondo messaggio potrebbe avere un offset di 45 (offset di 43 e 44 inesistenti).

Per sostituire il comportamento di offset predefinito e implementare un meccanismo di commit offset personalizzato, impostare commitOnGet su false durante la creazione del cursore del gruppo. È possibile utilizzare ConsumerCommit (vedere Commit manuale di un offset) per eseguire il commit dei messaggi senza leggere altri messaggi. ConsumerCommit restituisce un cursore da utilizzare nella richiesta successiva.

Attenzione

La scrittura della logica di commit personalizzata è complicata e ricca di condizioni e considerazioni relative alla razza. Esistono molti casi in cui viene modificato uno stato interno e il cliente è tenuto a gestire la situazione.

Bilanciamento e ribilanciamento

Lo streaming considera il numero di partizioni nel flusso e il numero di istanze nel gruppo di consumer durante la valutazione del saldo. Il bilanciamento del gruppo è automatico. Ogni consumer viene assegnato a una o più partizioni in base al seguente calcolo:

(nPartizioni / nConsumatori) ± 1

Ad esempio, se nel flusso sono presenti otto partizioni e quattro consumer, ogni consumer viene assegnato a due partizioni. Se nel flusso sono presenti 10 partizioni e quattro consumer, due consumer vengono assegnati a due partizioni e due consumer vengono assegnati a tre partizioni.

Quando le istanze si uniscono o lasciano un gruppo di consumer e vengono effettuate richieste per i messaggi, le assegnazioni delle partizioni vengono rivalutate. Se il flusso dispone di almeno una partizione superiore al numero di istanze correnti nel gruppo e di join di una nuova istanza, le partizioni vengono riassegnate a tutte le istanze, inclusa la nuova. Se un'istanza del gruppo interrompe il consumo dei messaggi per più di 30 secondi o non invia un messaggio ConsumerHeartbeat entro 30 secondi, tale istanza viene rimossa dal gruppo di consumer e la relativa partizione viene riassegnata, se possibile, a un'altra istanza.

Questi eventi sono denominati ribilanciamento. Le istanze del gruppo non sono a conoscenza del processo di ribilanciamento, ma il gruppo si è coordinato per possedere un set di partizioni reciprocamente esclusive nel flusso.

Al termine di un'operazione di ribilanciamento riuscita per un gruppo di consumer, ogni partizione all'interno del flusso è di proprietà di un'istanza all'interno del gruppo.

In questo modo, è possibile ridimensionare il numero di istanze fino al numero di partizioni finché ogni istanza non utilizza messaggi da una sola partizione. Questa configurazione ottimizza il throughput disponibile del flusso. Dopo questo punto, qualsiasi nuova istanza che entra a far parte del gruppo rimane in stato inattivo senza essere assegnata ad alcuna partizione.