Consumer-Gruppen verwenden

Nachrichten aus einem Stream mit Consumer-Gruppen konsumieren.

Consumers können so konfiguriert werden, dass sie Nachrichten im Rahmen einer Gruppe konsumieren. In einer Produktionsumgebung mit mehreren Partitionen wird die Verwendung einer Consumer-Gruppe empfohlen, um Streaming-Nachrichten zu konsumieren.

Jede Streampartition wird einem Mitglied einer Consumer-Gruppe zugewiesen. Ein einzelnes Mitglied einer Consumer-Gruppe wird als Instanz bezeichnet. Jede Instanz in einer Consumer-Gruppe erhält Nachrichten von einer oder mehreren Partitionen, es sei denn, es sind mehr Instanzen als Partitionen vorhanden. Instanzen, die über die Partitionsanzahl für den Stream liegen, empfangen keine Nachrichten.

Consumer-Gruppen übernehmen die erforderliche Koordination, damit mehrere Consumer den Stream gemeinsam konsumieren können. Eine Consumer-Gruppe führt automatisch folgende Aktionen durch:

  • Sie weist einer Instanz eine oder mehrere Partitionen zu.
  • Sie erfasst die von der Gruppe empfangenen Nachrichten und verwaltet Commits.
  • fordert die richtigen Partitionen und Offsets für jede Instanz an
  • Sie gleicht die Gruppe aus, wenn Instanzen beitreten oder sie verlassen.

Bis zu 50 Consumer-Gruppen können aus einem einzelnen Stream lesen. Jede Consumer-Gruppe empfängt alle Nachrichten im Stream mindestens einmal.

Consumer-Gruppen sind ephemer. Sie verschwinden, wenn sie nicht für die Aufbewahrungszeit des Streams verwendet werden.

Consumer-Gruppe erstellen

Eine Consumer-Gruppe wird bei der ersten CreateGroupCursor-Anforderung erstellt (siehe Gruppencursor erstellen). Gruppencursor definieren ein Paar aus Gruppenname/Instanzname. Wenn Sie den Gruppencursor erstellen, geben Sie die ID des Streams, einen Gruppennamen, einen Instanznamen und einen der folgenden unterstützten Cursertypen an:

  • TRIM_HORIZON: Die Gruppe konsumiert ab der ältesten verfügbaren Nachricht im Stream.
  • AT_TIME: Die Gruppe beginnt ab einem bestimmten Zeitpunkt zu konsumieren. Der Zeitstempel der zurückgegebenen Nachricht entspricht oder liegt nach dem angegebenen Zeitpunkt.
  • LATEST: Die Gruppe konsumiert Nachrichten, die veröffentlicht wurden, nachdem Sie den Cursor erstellt haben.

Gruppencursortypen werden bei CreateGroupCursor-Aufrufen, die den Namen einer vorhandenen Gruppe enthalten, ignoriert. Anstelle des angegebenen Cursortyps werden die festgeschriebenen Offsets dieser Gruppe verwendet.

Streaming verwendet den Instanznamen, um Mitglieder der Gruppe beim Verwalten von Offsets zu identifizieren. Verwenden Sie eindeutige Instanznamen für jede Instanz der Consumer-Gruppe.

Wenn der Streaming-Service Offsets festschreiben soll, lassen Sie den Wert commitOnGet des Gruppencursors auf true gesetzt. Diese Methode wird empfohlen, um die Anwendungskomplexität zu reduzieren, damit Ihre Anwendung keine Commits verarbeiten muss.

Als Gruppe konsumieren

Nachdem Instanzen der Consumer-Gruppe beigetreten sind, können sie Nachrichten aus dem Stream mit GetMessages lesen (siehe Gruppencursor erstellen). Jeder GetMessages-Aufruf gibt den im nächsten GetMessages-Aufruf zu verwendenden Cursor als opc-next-cursor-Headerwert zurück. Der zurückgegebene Cursor ist nie Null, läuft aber nach fünf Minuten ab. Solange Sie weiter konsumieren, müssen Sie keinen Cursor neu erstellen.

Wenn Streaming eine Anforderung nach Nachrichten von einer Instanz empfängt, führt der Service folgende Schritte aus:

  • Er prüft, ob ein Rebalancing einer Gruppe erforderlich ist.
  • Er schreibt die Offsets aus der vorherigen Anforderung dieser Instanz fest, sofern vorhanden.
  • Er antwortet mit den vom Cursor der Anforderung definierten Nachrichten.

GetMessages-Batchgrößen basieren auf der durchschnittlichen Nachrichtengröße, die in diesem Stream veröffentlicht wird. Standardmäßig gibt der Service so viele Nachrichten wie möglich zurück. Mit dem Parameter limit können Sie einen beliebigen Wert bis zu 10.000 angeben. Beachten Sie jedoch die durchschnittliche Nachrichtengröße, damit der Durchsatz im Stream nicht überschritten wird und keine Timeouts ausgelöst werden.

Wenn die Partition keine ungelesenen Nachrichten mehr enthält, gibt Streaming eine leere Nachrichtenliste zurück.

Da Consumer-Gruppen Instanzen entfernen, die seit mehr als 30 Sekunden keine Nachrichten mehr verwenden, fordern Sie weniger Nachrichten an, um Timeout zu vermeiden, oder erweitern Sie den Timeout mit ConsumerHeartbeat (siehe Heartbeat senden).

Eine Partition kann nicht mehreren Instanzen innerhalb derselben Consumer-Gruppe zugewiesen werden. Wenn mehr Instanzen als Partitionen vorhanden sind, können die nicht zugewiesenen Instanzen GetMesages-Anforderungen senden. Sie erhalten jedoch keine Nachrichten. Sie bleiben andernfalls inaktiv, bis die Consumer-Gruppe eine Instanz ersetzen muss, z.B. wenn ein vorhandenes Mitglied der Gruppe nicht innerhalb des Timeoutzeitraums handelt.

Wenn Sie die Position der Gruppe manuell aktualisieren müssen, können Sie UpdateGroup (siehe Nutzergruppe aktualisieren) verwenden, um die Position aller Consumer in der Gruppe auf die angegebene Position im Stream zurückzusetzen.

Offsets und Commits

Offsets geben die Position einer Nachricht innerhalb einer Partition an. Wenn ein Consumer neu gestartet wird oder Sie einen Fehler beheben müssen, können Sie den Offset verwenden, um den Lesevorgang aus dem Stream neu zu starten.

Wenn Sie eine Consumer-Gruppe verwenden, verarbeitet Streaming Offsets automatisch. Das Standardverhalten von commitOnGet=true bedeutet, dass Offsets aus der vorherigen Anforderung festgeschrieben werden. Beispiel:

Für Consumer A:

  • A ruft GetMessages auf und empfängt Nachrichten von einer beliebigen Partition mit Offsets 1-100.
  • A verarbeitet alle 100 Nachrichten erfolgreich.
  • A ruft GetMessages auf, und der Streaming-Service schreibt Offset 100 fest und gibt Nachrichten mit Offsets 101-200 zurück.
  • A verarbeitet 15 Nachrichten und geht dann unerwartet offline (länger als 30 Sekunden).

Ein neuer Consumer-B:

  • B ruft GetMessages auf, und der Streaming-Service verwendet den letzten festgeschriebenen Offset und gibt Nachrichten mit Offsets 101-200 zurück.
  • B setzt die Nachrichtenschleife fort.

In diesem Beispiel wurde ein Teil (15) der Nachrichten mindestens einmal verarbeitet. Das bedeutet, dass sie möglicherweise mehrmals verarbeitet wurden, aber keine Daten verloren gehen.

Streaming stellt "At-Least-Once"-Semantik für Consumer-Gruppen bereit. Berücksichtigen Sie, wann Offsets in einer Nachrichtenschleife festgeschrieben wurden. Wenn ein Consumer offline geht, bevor ein Nachrichtenbatch festgeschrieben wurde, kann dieser Batch einem anderen Consumer übergeben werden. Wenn eine Partition einem anderen Consumer zugewiesen wird, verwendet der Consumer den letzten festgeschriebenen Offset, um den Konsum zu starten. Der Consumer erhält keine Nachrichten vor dem festgeschriebenen Offset. Wir empfehlen, dass der Umgang mit Duplikaten von Consumer-Anwendungen übernommen wird.

Hinweis

Nachrichten-Offsets sind nicht dicht besetzt. Offsets sind monoton aufsteigende Zahlen. Sie nehmen nicht ab, und manchmal erhöhen sie sich um mehr als einen. Beispiel: Wenn Sie zwei Nachrichten in derselben Partition veröffentlichen, könnte die erste Nachricht den Offset 42 aufweisen. Die zweite Nachricht könnte den Offset 45 aufweisen (Offsets 43 und 44 sind nicht vorhanden).

Um das Standard-Offsetverhalten zu überschreiben und einen benutzerdefinierten Offset-Commit zu implementieren, setzen Sie commitOnGet auf false, wenn Sie den Gruppencursor erstellen. Mit ConsumerCommit (siehe Offset manuell festschreiben) können Sie Nachrichten festschreiben, ohne weitere Nachrichten zu lesen. ConsumerCommit gibt einen Cursor zurück, den Sie in der nächsten Anforderung verwenden können.

Achtung

Das Schreiben benutzerdefinierter Commit-Logik ist kompliziert und enthält Rennbedingungen und Erwägungen. Es gibt viele Fälle, in denen sich ein interner Status ändert und der Client die Situation bewältigen muss.

Balancing und Rebalancing

Streaming berücksichtigt die Anzahl der Partitionen im Stream und die Anzahl der Instanzen in der Consumer-Gruppe bei der Balancing-Bewertung. Das Gruppen-Balancing erfolgt automatisch. Jeder Consumer wird einer oder mehreren Partitionen basierend auf der folgenden Berechnung zugewiesen:

(nPartitionen / nConsumers) ± 1

Beispiel: Wenn im Stream acht Partitionen und in der Gruppe vier Consumers vorhanden sind, wird jeder Consumer zwei Partitionen zugewiesen. Wenn sich 10 Partitionen im Stream und vier Consumers in der Gruppe befinden, werden zwei Consumers zwei Partitionen zugewiesen, und zwei Consumers werden drei Partitionen zugewiesen.

Wenn Instanzen einer Consumer-Gruppe beitreten oder diese verlassen und Anforderungen für Nachrichten gestellt werden, werden Partitionszuweisungen neu bewertet. Wenn der Stream mindestens eine Partition mehr als aktuelle Instanzen in der Gruppe enthält und eine neue Instanz beitritt, werden Partitionen allen Instanzen, einschließlich der neuen, neu zugewiesen. Wenn eine Instanz in der Gruppe länger als 30 Sekunden keine Nachrichten konsumiert oder nicht innerhalb von 30 Sekunden einen ConsumerHeartbeat sendet, wird diese Instanz aus der Consumer-Gruppe entfernt. Ihre Partition wird dann nach Möglichkeit einer anderen Instanz neu zugewiesen.

Diese Ereignisse werden als Rebalancing bezeichnet. Die Instanzen in der Gruppe sind sich des Rebalancing-Prozesses nicht bewusst, aber die Gruppe wurde so koordiniert, dass sie Eigentümer eines sich gegenseitig ausschließenden Sets von Partitionen im Stream ist.

Am Ende eines erfolgreichen Rebalancing-Vorgangs für eine Consumer-Gruppe gehört jede Partition innerhalb des Streams zu einer Instanz in der Gruppe.

Auf diese Weise können Sie die Anzahl der Instanzen bis zur Anzahl der Partitionen skalieren, bis jede Instanz Nachrichten von nur einer Partition konsumiert. Diese Konfiguration maximiert den verfügbaren Durchsatz Ihres Streams. Danach bleibt jede neue Instanz, die der Gruppe beitritt, inaktiv, ohne einer Partition zugewiesen zu werden.