Einzelne Consumers verwenden

Nachrichten aus einem Stream mit einem einzelnen Consumer konsumieren.

Wenn Sie einzelne Consumer verwenden, um Nachrichten aus Streams anstatt Consumer-Gruppen zu verwenden, können Sie viele der Vorteile von Streaming nicht nutzen, wie z.B. serviceverwaltete Koordination, horizontale Skalierung und Offsetmanagement. Ihre Anwendungen müssen diese Szenarios und viele weitere programmgesteuert verarbeiten.

Aus diesen Gründen wird die Verwendung von Consumer-Gruppen in einer Produktionsumgebung empfohlen. Es kann jedoch nützlich sein, einzelne Consumer für Test- oder Proof-of-Concept-Anwendungen zu verwenden.

Cursor verwenden

Ein Cursor ist ein Zeiger auf eine Position in einem Stream. Die Position kann ein bestimmter Offset oder eine bestimmte Zeit in einer Partition sein.

Bevor Sie mit dem Konsumieren von Nachrichten beginnen, müssen Sie den Punkt angeben, an dem der Konsum beginnen soll. Dazu können Sie einen Cursor erstellen.

Fünf Cursortypen werden unterstützt:

  • TRIM_HORIZON: Konsumieren Sie Nachrichten ab der ältesten verfügbaren Nachricht im Stream. Erstellen Sie einen Cursor für TRIM_HORIZON, um alle Nachrichten in einem Stream zu konsumieren.
  • AT_OFFSET : Nachrichten beginnend mit einem angegebenen Offset konsumieren. Der Offset muss größer oder gleich dem Offset der ältesten Nachricht und kleiner oder gleich dem zuletzt veröffentlichten Offset sein.
  • AFTER_OFFSET : Nachrichten nach dem angegebenen Offset konsumieren. Dieser Cursor unterliegt denselben Einschränkungen wie der AT_OFFSET-Cursor.
  • AT_TIME: Konsumieren Sie Nachrichten ab einem bestimmten Zeitpunkt. Der Zeitstempel der zurückgegebenen Nachricht entspricht dem angegebenen oder einem späteren Zeitpunkt.
  • LATEST : Nachrichten, die nach der Erstellung des Cursors veröffentlicht wurden, konsumieren.

Wenn Sie einen Cursor für einen einzelnen Consumer erstellen, müssen Sie die Partition im Stream für den zu verwendenden Cursor angeben. Wenn Ihr Stream mehrere Partitionen mit Nachrichten enthält, müssen Sie mehrere Cursor erstellen, um diese zu lesen.

Nachdem Sie einen Cursor erstellt haben, können Sie beginnen, Nachrichten zu konsumieren, indem Sie sie abrufen.

Solange Sie weiterhin Nachrichten konsumieren, müssen Sie keinen Cursor neu erstellen. Erstellen Sie Cursor außerhalb Ihrer Schleifen, um Nachrichten abzurufen.

Nachrichten abrufen

Nachdem Sie einen Cursor erstellt haben, rufen Sie Nachrichten ab (lesen) und geben Sie diesen Cursor an, um mit dem Konsumieren von Nachrichten zu beginnen. Der Service antwortet mit Ihren Nachrichten und dem Headerwert opc-next-cursor. Verwenden Sie den zurückgegebenen Headerwert im nächsten GetMessages-Aufruf. Der zurückgegebene Cursor ist nie Null, läuft aber nach fünf Minuten ab. Wenn Sie mehr als fünf Minuten keine Nachrichten konsumieren, müssen Sie einen Cursor neu erstellen.

Wenn mehrere Consumer aus derselben Partition lesen, erhalten sie dieselben Nachrichten. Entscheiden Sie, wie Ihre Anwendung diese Nachrichten verarbeitet.

Wenn die Partition keine ungelesenen Nachrichten mehr enthält, gibt Streaming eine leere Nachrichtenliste zurück.

GetMessages-Batchgrößen basieren auf der durchschnittlichen Nachrichtengröße, die im Stream veröffentlicht wird. Standardmäßig gibt der Service so viele Nachrichten wie möglich zurück. Mit dem Parameter limit können Sie einen beliebigen Wert bis zu 10.000 angeben. Beachten Sie jedoch die durchschnittliche Nachrichtengröße, damit der Durchsatz im Stream nicht überschritten wird.

Rückstand

Um zu bestimmen, ob Ihr Consumer in Rückstand gerät (Sie produzieren Nachrichten schneller als Sie sie konsumieren), können Sie den Zeitstempel der Nachricht verwenden. Wenn der Consumer in Rückstand ist, sollten Sie mehr Consumer starten, um einige Partitionen des ersten Consumers zu übernehmen. Es gibt keine Abhilfemöglichkeit, wenn Sie auf einer einzelnen Partition in Rückstand geraten.

Sie haben folgende Optionen:

  • Erstellen Sie einen neuen Stream mit mehr Partitionen.
  • Verwenden Sie Consumer-Gruppen.
  • Wenn das Problem durch einen Hotspot verursacht wird, ändern Sie die Nachrichtenschlüsselstrategie.
  • Reduzieren Sie die Zeit für die Nachrichtenverarbeitung, oder verarbeiten Sie Anforderungen parallel.

Um herauszufinden, wie viele Nachrichten in einer bestimmten Partition zum Konsumieren verbleiben, verwenden Sie einen Cursor vom Typ LATEST, rufen den Offset der nächsten veröffentlichten Nachricht ab, und erstellen Sie das Delta mit dem Offset, den Sie derzeit konsumieren.

Offsets verwalten

Offsets geben die Position einer Nachricht innerhalb einer Partition an. Wenn der Consumer neu gestartet wird oder Sie einen Fehler beheben müssen, können Sie den Offset verwenden, um den Lesevorgang aus dem Stream neu zu starten.

Tipp

Consumer-Gruppen können Offset-Commits automatisch verwalten.

Wenn Sie einzelne Consumer verwenden, muss Ihre Consumer-Anwendung verarbeitete Offsets verwalten (siehe Offsets manuell festschreiben). Der Consumer ist für die Speicherung der Offsets verantwortlich, die er für jede Partition erreicht oder bei denen er gestoppt hat. Wenn der Consumer neu gestartet wird, lesen Sie den Offset der letzten verarbeiteten Nachricht. Erstellen Sie dann einen Cursor vom Typ AFTER_OFFSET, und geben Sie den gerade erhaltenen Offset an. Wir bieten keine Anleitung zum Speichern des Offsets der letzten verarbeiteten Nachricht. Sie können jede beliebige Methode verwenden, wie z.B. einen anderen Stream, eine Datei auf Ihrem Rechner oder Object Storage.

Hinweis

Nachrichten-Offsets sind nicht dicht besetzt. Offsets sind monoton aufsteigende Zahlen. Sie nehmen nicht ab, und manchmal erhöhen sie sich um mehr als einen. Beispiel: Wenn Sie zwei Nachrichten in derselben Partition veröffentlichen, könnte die erste Nachricht den Offset 42 aufweisen. Die zweite Nachricht könnte den Offset 45 aufweisen (Offsets 43 und 44 sind nicht vorhanden).