Utilisation de groupes de destinataires

Consommer les messages provenant d'un flux de données à l'aide de groupes de destinataires.

Les destinataires peuvent être configurés de manière à utiliser les messages en tant que membres d'un groupe. Dans un environnement de production comportant plusieurs partitions, l'emploi d'un groupe de destinataires est la méthode recommandée pour utiliser les messages Streaming.

Chaque partition de flux de données est affectée à un membre d'un groupe de destinataires. Tout membre individuel d'un groupe de destinataires est appelé instance. Chaque instance d'un groupe de destinataires reçoit des messages provenant d'au moins une partition, sauf s'il existe plus d'instances que de partitions. Les instances qui dépassent le nombre de partitions du flux de données ne reçoivent pas de messages.

Les groupes de destinataires gèrent la coordination requise pour que plusieurs destinataires partagent l'utilisation d'un flux de données. Un groupe de destinataires effectue les opérations suivantes automatiquement :

  • Affectation de partitions à une instance
  • Suivi des messages reçus par le groupe et gestion des validations
  • Demande les partitions et les décalages appropriés pour le compte de chaque instance
  • Equilibrage du groupe au fur et à mesure de l'arrivée ou du départ d'instances

Jusqu'à 50 groupes de destinataires peuvent lire un flux de données unique. Chaque groupe de destinataires reçoit au moins une fois tous les messages du flux de données.

Les groupes de destinataires sont éphémères. Ils disparaissent lorsqu'ils ne sont pas utilisés pendant la période de conservation du flux de données.

Création d'un groupe de destinataires

Un groupe de destinataires est créé lors de la première demande CreateGroupCursor (reportez-vous à Création d'un curseur de groupe). Les curseurs de groupe définissent une paire nom de groupe/nom d'instance. Lorsque vous créez le curseur de groupe, indiquez l'ID du flux de données, un nom de groupe, un nom d'instance et l'un des types de curseur pris en charge suivants :

  • TRIM_HORIZON : le groupe commence l'utilisation à partir du plus ancien message disponible dans le flux de données.
  • AT_TIME : le groupe commence à consommer à partir d'une heure spécifique. L'horodatage du message renvoyé est identique ou postérieur à l'heure indiquée.
  • LATEST : le groupe commence l'utilisation des messages qui ont été publiés après la création du curseur.

Les types de curseur de groupe sont ignorés lors des appels CreateGroupCursor qui incluent le nom d'un groupe existant. Les positions validées de ce groupe sont utilisées à la place du type de curseur indiqué.

Streaming utilise le nom d'instance pour identifier les membres du groupe lors de la gestion des positions. Utilisez des noms uniques pour chaque instance du groupe de destinataires.

Si vous voulez que le service Streaming gère la validation des positions, laissez la valeur commitOnGet du curseur de groupe définie sur true. Nous vous recommandons d'utiliser cette méthode pour réduire la complexité de l'application afin qu'elle n'ait pas à gérer les validations.

Utilisation en tant que groupe

Une fois que vos instances ont rejoint le groupe de destinataires, elles peuvent lire les messages du flux de données à l'aide de GetMessages (reportez-vous à Création d'un curseur de groupe). Chaque appel GetMessages renvoie le curseur à utiliser lors du prochain appel GetMessages sous la forme d'une valeur d'en-tête opc-next-cursor. Le curseur renvoyé n'est jamais NULL, mais expire au bout de cinq minutes. Tant que vous continuez à consommer, vous n'avez pas besoin de recréer un curseur.

Lorsque le service Streaming reçoit d'une instance une demande de messages, il effectue les opérations suivantes :

  • Il détermine si un rééquilibrage de groupe est nécessaire.
  • Il valide les positions à partir de la demande précédente de l'instance, le cas échéant.
  • Il répond avec les messages définis par le curseur de la demande.

Les tailles de batch GetMessages reposent sur la taille moyenne des messages publiés dans le flux de données. Par défaut, le service renvoie autant de messages que possible. Vous pouvez utiliser le paramètre limit afin d'indiquer n'importe quelle valeur jusqu'à 10 000. Tenez toutefois compte de la taille moyenne de vos messages pour éviter de dépasser le débit sur le flux de données ou les délais d'expiration.

Si la partition ne comporte plus de messages non lus, Streaming renvoie une liste de messages vide.

Etant donné que les groupes de destinataires enlèvent les instances qui ont cessé d'utiliser des messages pendant plus de 30 secondes, demandez moins de messages pour éviter les expirations ou prolongez le délai d'expiration à l'aide de ConsumerHeartbeat (reportez-vous à Envoi d'un signal d'activité).

Une partition ne peut pas être affectée à plusieurs instances du même groupe de destinataires. Si vous avez plus d'instances que de partitions, les instances non affectées peuvent envoyer des demandes GetMesages, mais ne reçoivent aucun message. Elles restent inactives tant que le groupe de destinataires n'a pas besoin de remplacer une instance, par exemple lorsqu'un membre existant du groupe n'agit pas dans le délai d'expiration imparti.

Si vous devez mettre à jour manuellement la position du groupe, vous pouvez utiliser UpdateGroup (reportez-vous à Mise à jour d'un groupe de destinataires) pour réinitialiser l'emplacement de tous les destinataires du groupe sur l'emplacement indiqué dans le flux de données.

Positions et validations

Les positions indiquent l'emplacement d'un message dans une partition. Si un destinataire redémarre ou que vous devez effectuer une récupération suite à une panne, vous pouvez utiliser la position pour redémarrer la lecture du flux de données.

Lorsque vous utilisez un groupe de destinataires, Streaming gère automatiquement les positions. Avec le comportement par défaut de commitOnGet=true, les positions de la demande précédente sont validées. Par exemple :

Pour le destinataire A :

  • A appelle GetMessages et reçoit des messages d'une partition arbitraire, avec les positions 1 à 100.
  • A traite les 100 messages.
  • A appelle GetMessages. Le service Streaming valide la position 100 et renvoie les messages avec les positions 101 à 200.
  • A traite 15 messages, puis se met hors ligne de manière inattendue (pendant plus de 30 secondes).

Nouveau destinataire B :

  • B appelle GetMessages. Le service Streaming utilise la dernière position validée et renvoie les messages avec les positions 101 à 200.
  • B poursuit la boucle de message.

Dans cet exemple, une partie des messages (15) a été traitée au moins une fois, ce qui signifie qu'ils ont pu être traités plusieurs fois, mais aucune donnée n'est perdue.

Streaming fournit une sémantique "au moins une fois" pour les groupes de destinataires. Examinez le moment où les positions sont validées dans une boucle de message. Si un destinataire passe hors ligne avant de valider un batch de messages, ce batch peut être attribué à un autre destinataire. Lorsqu'une partition est donnée à un autre destinataire, celui-ci utilise la dernière position validée pour démarrer l'utilisation. Le destinataire n'obtient pas de messages situés avant la position validée. Il est recommandé que les applications destinataires se chargent des doublons.

Remarque

Les positions de message ne sont pas denses. Les positions sont des nombres à croissance monotone. Elles ne diminuent pas et augmentent parfois de plus de un. Par exemple, si vous publiez deux messages dans la même partition, le premier peut avoir la position 42 et le second la position 45 (les positions 43 et 44 étant inexistantes).

Pour remplacer le comportement de position par défaut et implémenter un mécanisme de validation de position personnalisé, définissez commitOnGet sur false lors de la création du curseur de groupe. Vous pouvez utiliser ConsumerCommit (reportez-vous à Validation manuelle d'un décalage) pour valider des messages sans lire d'autres messages. ConsumerCommit renvoie un curseur à utiliser lors de la demande suivante.

Attention

L'écriture d'une logique de validation personnalisée est compliquée et soumise à de nombreuses conditions de concurrence et considérations. Il existe de nombreux cas où un état interne est modifié et où le client doit gérer la situation.

Equilibrage et rééquilibrage

Streaming prend en compte le nombre de partitions du flux de données et le nombre d'instances du groupe de destinataires lors de l'évaluation de l'équilibre. L'équilibrage de groupe est automatique. Chaque destinataire est affecté à au moins une partition en fonction du calcul suivant :

(nPartitions / nDestinataires) ± 1

Par exemple, s'il existe huit partitions dans le flux de données et quatre destinataires dans le groupe, chaque destinataire est affecté à deux partitions. S'il existe 10 partitions dans le flux de données et quatre destinataires dans le groupe, deux destinataires sont affectés à deux partitions et les deux autres à trois partitions.

Au fur et à mesure que des instances rejoignent ou quittent un groupe de destinataires et que des demandes sont émises pour les messages, les affectations de partition sont réévaluées. Si le flux de données comporte des partitions en surnombre par rapport au nombre d'instances en cours dans le groupe et qu'une nouvelle instance rejoint ce dernier, les partitions sont réaffectées à toutes les instances, y compris la nouvelle. Si une instance du groupe cesse d'utiliser des messages pendant plus de 30 secondes ou n'envoie pas de signal d'activité ConsumerHeartbeat pendant 30 secondes, elle est enlevée du groupe de destinataires et sa partition réaffectée, si possible, à une autre instance.

Ces événements sont appelés rééquilibrage. Les instances du groupe n'ont pas conscience du processus de rééquilibrage, mais le groupe s'est coordonné de manière à posséder un ensemble de partitions mutuellement exclusives du flux de données.

Une fois que l'opération de rééquilibrage d'un groupe de destinataires est terminée, chaque partition du flux de données appartient à une instance du groupe.

De cette façon, vous pouvez augmenter le nombre d'instances jusqu'au nombre de partitions jusqu'à ce que chaque instance utilise des messages provenant d'une seule partition. Cette configuration optimise le débit disponible du flux de données. Une fois ce stade atteint, toute nouvelle instance rejoignant le groupe reste inactive sans être affectée à une partition.