Using Individual Consumers

If you choose to use individual consumers to consume messages from your streams instead of using consumer groups, you can't take advantage of many of the benefits of Streaming, such as service-managed coordination, horizontal scaling, and offset management. Your applications will need to handle these scenarios, and many more, programmatically.

For these reasons, we recommend using consumer groups in a production environment, but it may be useful to use individual consumers for testing or proof-of-concept applications.

Using Cursors

A cursor is a pointer to a location in a stream. The location could be a specific offset or time in a partition.

Before you start to consume messages, you need to indicate the point from which you want to start consumption. You can do this by creating a cursor using the CreateCursor API.

There are five supported cursor types:

  • TRIM_HORIZON - Start consuming from the oldest available message in the stream. Create a cursor at the TRIM_HORIZON to consume all messages in a stream.
  • AT_OFFSET - Start consuming at a specified offset. The offset must be greater than or equal to the offset of the oldest message and less than or equal to the latest published offset.
  • AFTER_OFFSET - Start consuming after the given offset. This cursor has the same restrictions as the AT_OFFSET cursor.
  • AT_TIME - Start consuming from a given time. The timestamp of the returned message will be on or after the supplied time.
  • LATEST - Start consuming messages that were published after you created the cursor.

When you create a cursor for an individual consumer, you need to specify the partition in the stream that the cursor should use. If your stream has more than one partition with messages, you need to create multiple cursors to read them.

Once you've created a cursor, you can start to consume messages using GetMessages.

As long as you keep consuming messages, there is no need to re-create a cursor, so cursors should be created outside of your loops to get messages.

Getting Messages

Once you've created a cursor, call GetMessages and specify that cursor to start consuming messages. The service responds with your messages and the opc-next-cursor header value that you should use in your next GetMessages call. The returned cursor is never null, but it expires in five minutes. If you stop consuming messages for longer than five minutes, you will need to re-create a cursor.

If you have more than one consumer reading from the same partition, they will receive the same messages. Your application should decide how to process messages.

If there are no more unread messages in the partition, Streaming returns a list of empty messages.

GetMessages batch sizes are based on the average message size published to the stream. By default, the service returns as many messages as possible. You can use the limit parameter to specify any value up to 10,000, but consider your average message size to avoid exceeding throughput on the stream.

Falling behind

To determine if your consumer is falling behind (you're producing faster than you're consuming), you can use the timestamp of the message. If the consumer is falling behind, consider spawning additional consumers to take over some of the partitions from the first consumer. There's no way to recover if you're falling behind on a single partition.

Consider the following options:

  • Create a new stream with more partitions.
  • Use consumer groups.
  • If the issue is caused by a hotspot, change the message key strategy.
  • Reduce message processing time, or handle requests in parallel.

If you want to know how many messages are left to consume in a given partition, use a cursor of type LATEST, get the offset of the next published message, and make the delta with the offset that you are currently consuming.

Managing Offsets

Offsets indicate the location of a message within a partition. If your consumer restarts or you need to recover from a failure, you can use the offset to restart reading from the stream.

Tip

Consumer groups can manage offset commits automatically.

When you use individual consumers, your consumer application must manage processed offsets. The consumer is responsible for storing which offsets it reached or stopped at, for each partition. When your consumer restarts, read the offset of the last message that you processed, and then create a cursor of type AFTER_OFFSET and specify the offset that you just got. We don't provide any guidance for storing the offset of the last message that you processed. You may use any method, such as another stream, a file on your machine, or Object Storage.

Note

Message offsets aren't dense. Offsets are monotonically increasing numbers. They do not decrease, and sometimes they increase by more than one. For example, if you publish two messages to the same partition, the first message could have an offset of 42 and the second message could have an offset of 45 (offsets 43 and 44 being non-existent).

Using the Command Line Interface (CLI)

For information about using the CLI, see Command Line Interface (CLI). For a complete list of flags and options available for CLI commands, see the Command Line Reference.

To create a cursor

oci streaming stream cursor create-cursor --stream-id <stream_OCID> --partition <partition> --type <cursor_type> --endpoint <messages_endpoint>

For example:

oci streaming stream cursor create-cursor --stream-id ocid1.stream.oc1.phx.exampleuniqueID --partition 0 --type TRIM_HORIZON --endpoint https://cell-1.streaming.us-phoenix-1.oci.oraclecloud.com
{
  "data": {
    "value": "examplecursorvalue"
  }
}
To get messages

oci streaming stream message get --stream-id <stream_OCID> --cursor <cursor> --endpoint <messages_endpoint>

Your first request to get messages should use the value returned when you created a cursor. Each subsequent request should use the opc-next-cursor value returned in the previous response.

For example:

oci streaming stream message get --stream-id ocid1.stream.oc1.phx.exampleuniqueID --cursor examplecursorvalue --endpoint https://cell-1.streaming.us-phoenix-1.oci.oraclecloud.com
{
  "data": [
    {
      "key": "a2V5MQ==",
      "offset": 0,
      "partition": "0",
      "stream": "MyStream",
      "timestamp": "2020-11-03T21:52:58.470000+00:00",
      "value": "dmFsdWUx"
    },
    {
      "key": "a2V5Mg==",
      "offset": 1,
      "partition": "0",
      "stream": "MyStream",
      "timestamp": "2020-11-03T21:52:58.470000+00:00",
      "value": "dmFsdWUy"
    },
    {
      "key": "a2V5MQ==",
      "offset": 2,
      "partition": "0",
      "stream": "MyStream",
      "timestamp": "2020-11-03T22:00:48.305000+00:00",
      "value": "dmFsdWUx"
    },
    {
      "key": "a2V5Mg==",
      "offset": 3,
      "partition": "0",
      "stream": "MyStream",
      "timestamp": "2020-11-03T22:00:48.305000+00:00",
      "value": "dmFsdWUy"
    }
  ],
  "opc-next-cursor": "examplenextcursorvalue"
}