Utilisation de groupes de consommateurs
Consommer des messages à partir d'un flux à l'aide de groupes de consommateurs.
Vous pouvez configurer les consommateurs pour qu'ils consomment les messages dans le cadre d'un groupe. Dans un environnement de production comportant plusieurs partitions, il est recommandé d'utiliser un groupe de consommateurs pour consommer des messages du service de diffusion en continu.
Chaque partition de flux est affectée à un membre d'un groupe de consommateurs. Un membre individuel d'un groupe de consommateurs est appelé instance. Chaque instance d'un groupe de consommateurs reçoit les messages d'une ou de plusieurs partitions, sauf s'il existe plus d'instances que de partitions. Les instances dépassant le nombre de partitions du flux ne reçoivent pas de messages.
Les groupes de consommateurs gèrent la coordination requise pour que plusieurs consommateurs partagent la consommation d'un flux. Un groupe de consommateurs :
- Affecte une ou plusieurs partitions à une instance.
- Effectue le suivi des messages reçus par le groupe et gère les validations.
- Demande les partitions et les décalages appropriés pour le compte de chaque instance
- Équilibre le groupe au fur et à mesure que des instances rejoignent ou quittent le groupe.
Jusqu'à 50 groupes de consommateurs peuvent lire depuis un seul flux. Chaque groupe de consommateurs reçoit au moins une fois tous les messages du flux.
Les groupes de consommateurs sont éphémères. Ils disparaissent lorsqu'ils ne sont pas utilisés pendant la période de conservation du flux.
Création d'un groupe de consommateurs
Un groupe de consommateurs est créé lors de la première demande CreateGroupCursor (voir Création d'un curseur de groupe). Les curseurs de groupe définissent une paire nom de groupe/nom d'instance. Lorsque vous créez votre curseur de groupe, fournissez l'ID flux, un nom de groupe, un nom d'instance et l'un des types de curseur pris en charge suivants :
TRIM_HORIZON
- Le groupe commence à consommer à partir du message disponible le plus ancien dans le flux.AT_TIME
- Le groupe commence à consommer à partir d'une heure spécifique. L'heure du message retourné est identique ou postérieure à l'heure fournie.LATEST
- Le groupe commence à consommer les messages publiés après la création du curseur.
Les types de curseur de groupe sont ignorés dans les appels CreateGroupCursor qui incluent le nom d'un groupe existant. Les décalages validés de ce groupe sont utilisés au lieu du type de curseur fourni.
Le service de diffusion en continu utilise le nom d'instance pour identifier les membres du groupe lors de la gestion des décalages. Utilisez des noms d'instance uniques pour chaque instance du groupe de consommateurs.
Si vous voulez que le service de flux traite la validation des décalages, laissez la valeur commitOnGet
du curseur de groupe réglée à true
. Nous vous recommandons d'utiliser cette méthode pour réduire la complexité de l'application afin que votre application n'ait pas à gérer les commits.
Consommation en tant que groupe
Une fois que vos instances se sont jointes au groupe de consommateurs, elles peuvent lire les messages du flux à l'aide de GetMessages (voir Création d'un curseur de groupe). Chaque appel GetMessages retourne le curseur à utiliser dans l'appel GetMessages suivant en tant que valeur d'en-tête opc-next-cursor
. Le curseur retourné n'est jamais nul, mais il expire dans les 5 minutes qui suivent. Tant que vous consommez, vous n'avez pas besoin de recréer un curseur.
Lorsque le service de diffusion en continu reçoit une demande de messages d'une instance, il :
- Vérifie si un rééquilibrage de groupe est nécessaire.
- Valide le ou les décalages par rapport à la demande précédente de cette instance, le cas échéant.
- Répond aux messages définis par le curseur de la demande.
Les tailles de lot GetMessages sont basées sur la taille moyenne des messages publiés pour ce flux. Par défaut, le service retourne le plus grand nombre de messages possible. Vous pouvez utiliser le paramètre limit
pour spécifier n'importe quelle valeur allant jusqu'à 10 000, mais tenez compte de la taille moyenne des messages pour éviter de dépasser le débit du flux ou les temporisations.
S'il n'y a plus de messages non lus dans la partition, le service de diffusion en continu retourne une liste vide de messages.
Comme les groupes de consommateurs suppriment les instances qui ont cessé de consommer des messages pendant plus de 30 secondes, demandez moins de messages pour éviter les temporisations ou prolongez la temporisation à l'aide de ConsumerHeartbeat (voir Envoi d'une pulsation).
Il n'est pas possible d'affecter une partition à plusieurs instances dans le même groupe de consommateurs. Si vous avez plus d'instances que de partitions, les instances non affectées peuvent envoyer des demandes GetMesages, mais elles ne reçoivent aucun message. Elles restent inactives jusqu'à ce que le groupe de consommateurs remplace une instance, par exemple lorsqu'un membre existant du groupe n'agit pas dans le temps imparti.
Si vous devez mettre à jour manuellement la position du groupe, vous pouvez utiliser UpdateGroup (voir Mise à jour d'un groupe de consommateurs) pour réinitialiser l'emplacement de tous les consommateurs du groupe à l'emplacement spécifié dans le flux.
Décalages et validations
Les décalages indiquent l'emplacement d'un message dans une partition. Si un consommateur redémarre ou que vous devez récupérer après une panne, vous pouvez utiliser le décalage pour relancer la lecture à partir du flux.
Lorsque vous utilisez un groupe de consommateurs, le service de diffusion en continu gère les décalages automatiquement. Le comportement par défaut de commitOnGet=true
signifie que les décalages de la demande précédente sont validés. Par exemple :
Pour le consommateur A :
- A appelle GetMessages et reçoit les messages d'une partition arbitraire, avec des décalages de 1 à 100.
- A traite les 100 messages.
- A appelle GetMessages et le service de diffusion en continu valide le décalage 100 et retourne les messages comportant des décalages de 101 à 200.
- A traite 15 messages, puis passe en hors ligne de manière inattendue (pendant plus de 30 secondes).
Un nouveau consommateur B :
- B appelle GetMessages et le service de diffusion en continu utilise le dernier décalage validé et retourne les messages avec des décalages de 101 à 200.
- B poursuit la boucle de message.
Dans cet exemple, une partie (15) des messages ont été traités au moins une fois, ce qui signifie qu'ils auraient pu être traités plusieurs fois, mais aucune donnée n'est perdue.
Le service de diffusion en continu fournit une sémantique "au moins une fois" pour les groupes de consommateurs. Tenez compte du moment où les décalages sont validés dans une boucle de message. Si un consommateur est hors ligne avant de valider un lot de messages, ce lot peut être donné à un autre consommateur. Lorsqu'une partition est donnée à un autre consommateur, celui-ci utilise le dernier décalage validé pour démarrer la consommation. Le consommateur n'obtient pas les messages avant le décalage validé. Nous recommandons que les applications du consommateur traitent les doubles.
Les décalages de message ne sont pas denses. Les décalages augmentent de façon monotone. Ils ne diminuent pas, et parfois ils augmentent de plus d'un. Par exemple, si vous publiez deux messages dans la même partition, le premier message peut avoir un décalage de 42 et le deuxième message peut avoir un décalage de 45 (les décalages de 43 et de 44 sont inexistants).
Pour remplacer le comportement de décalage par défaut et mettre en oeuvre un mécanisme de validation de décalage personnalisé, réglez commitOnGet
à false
lors de la création du curseur de groupe. Vous pouvez utiliser ConsumerCommit (voir Validation manuelle d'une compensation) pour valider des messages sans lire plus de messages. ConsumerCommit retourne un curseur que vous pouvez utiliser dans votre prochaine demande.
L'écriture d'une logique de validation personnalisée est compliquée et remplie de conditions de course et de considérations. Il existe de nombreux cas dans lesquels l'état interne est modifié et le client doit gérer la situation.
Équilibrage et rééquilibrage
Le service de diffusion en continu tient compte du nombre de partitions dans le flux et du nombre d'instances du groupe de consommateurs lors de l'évaluation de l'équilibrage. L'équilibrage de groupe est automatique. Chaque consommateur est affecté à une ou plusieurs partitions selon le calcul suivant :
(npartitions / nconsommateurs) ± 1
Par exemple, s'il existe huit partitions dans le flux et quatre consommateurs dans le groupe, chaque consommateur est affecté à deux partitions. S'il y a 10 partitions dans le flux et quatre consommateurs dans le groupe, deux consommateurs sont affectés à deux partitions et deux consommateurs sont affectés à trois partitions.
Lorsque des instances rejoignent ou quittent un groupe de consommateurs et que des demandes de messages sont faites, les affectations de partition sont réévaluées. Si le flux comporte au moins une partition en plus du nombre d'instances courantes dans le groupe et qu'une nouvelle instance le rejoint, les partitions sont réaffectées à toutes les instances, y compris la nouvelle. Si une instance du groupe cesse de consommer des messages pendant plus de 30 secondes, ou si elle n'envoie pas ConsumerHeartbeat dans les 30 secondes, cette instance est supprimée du groupe de consommateurs et sa partition est réaffectée, si possible, à une autre instance.
Ces événements sont appelés rééquilibrage. Le processus de rééquilibre n'est pas connu pour les instances du groupe, mais le groupe a coordonné la propriété d'un jeu de partitions mutuellement exclusif dans le flux.
À la fin d'une opération de rééquilibrage réussie pour un groupe de consommateurs, chaque partition du flux appartient à une instance du groupe.
De cette façon, vous pouvez adapter le nombre d'instances au nombre de partitions jusqu'à ce que chaque instance consomme des messages d'une seule partition. Cette configuration optimise le débit disponible de votre flux. Après ce point, toute nouvelle instance rejoignant le groupe reste dans un état inactif sans être affectée à une partition.