Using Oracle Cloud Infrastructure SDKs with Streaming
Oracle Cloud Infrastructure OCI provides SDKs so that you can interact with Streaming without having to create a framework. The OCI SDKs let you manage streams, stream pools, and Kafka Connect configurations, and publish and consume messages. Refer to the Streaming Service Overview for key concepts and additional information.
This topic includes examples that use the OCI SDK for Java, but basic Streaming usage examples are included with all our SDKs:
- OCI SDK for Python examples
- OCI SDK for TypeScript and JavaScript examples
- OCI SDK for .NET examples
- OCI SDK for Go examples
For more information about using the OCI SDKs, see the SDK Guides.
Streaming Clients
The SDKs encapsulate the Streaming service in two
clients: the StreamAdminClient
and the
StreamClient
.
StreamAdminClient
The StreamAdminClient
incorporates the control plane operations of
Streaming. You can use it to create, delete,
update, modify, and list streams.
To instantiate the StreamAdminClient
object:
StreamAdminClient adminClient = new StreamAdminClient([authProvider]);
adminClient.setEndpoint("<streaming_endpoint>"); // You cannot use the setRegion method
StreamClient
The StreamClient
is used to publish and consume messages.
To instantiate a StreamClient
object:
// First you have to get the stream you want to consume from/publish to.
// You can either make a CreateStream, GetStream, or ListStream call. They all return a "messagesEndpoint" as part of a Stream object.
// That endpoint needs to be used when creating the StreamClient object.
GetStreamRequest getStreamRequest = GetStreamRequest.builder().streamId(streamId).build();
Stream stream = adminClient.getStream(getStreamRequest).getStream();
StreamClient streamClient = new StreamClient([authProvider]);
streamClient.setEndpoint(stream.getMessagesEndpoint());
Managing Streams
To create a stream, use the createStream
method of
StreamAdminClient
.
Creating a stream is an asynchronous operation. You can check on the completion of the
create operation by checking that the lifecycleStateDetails
property of
your new stream is either Active
or Failed
. See Managing Streams for more information.
The following is an example showing how to create a stream:
// No error handling
CreateStreamDetails createStreamDetails = CreateStreamDetails.builder()
.partitions(5) // number of partitions you want in your stream
.name("myStream") // the name of the stream - only used in the console
.compartmentId(tenancy) // the compartment id you want your stream to live in
.build();
// You can also add tags to the createStreamDetails object.
CreateStreamRequest createStreamRequest =
CreateStreamRequest.builder()
.createStreamDetails(createStreamDetails)
.build();
Stream stream = adminClient.createStream(createStreamRequest).getStream();
while (stream.getLifecycleState() != Stream.LifecycleState.Active && stream.getLifecycleState() != Stream.LifecycleState.Failed) {
GetStreamRequest getStreamRequest = GetStreamRequest.builder().streamId(stream.getId()).build();
stream = adminClient.getStream(getStreamRequest).getStream();
}
// Handle stream Failure
Use the listStreams
method to return a list of streams for a given
compartment.
You can filter the returned list by OCID, life cycle state, and name.
The results can be sorted in ascending or descending order by name or creation time.
The results are passed back in a paginated list. A token is passed back with each page of
results; pass this token back to the getOpcNextPage
method to retrieve
the next page of results. A null token returned from getOpcNextPage
indicates that no more results are available.
For example:
// No error handling
ListStreamsRequest listStreamsRequest =
ListStreamsRequest.builder()
.compartmentId(tenancy)
.build();
// You can filter by OCID (exact match only) [builder].id(streamId) -> This will return 0..1 item
// You can filter by name (exact match only) [builder].name(name) -> This will return 0..n items
// You can order the result per TimeCreated or Name [builder].sortBy(SortBy.[TimeCreated|Name])
// You can change the ordering [builder].sortOrder(SortOrder.[Asc|Desc])
// You can filter by lifecycleState [builder].lifecycleState(lifecycleState)
String page;
do {
ListStreamsResponse listStreamsResponse = adminClient.listStreams(listStreamsRequest);
List<StreamSummary> streams = listStreamsResponse.getItems();
// Do something with the streams
page = listStreamsResponse.getOpcNextPage();
} while (page != null);
To get details about a stream, use the getStream
method and then examine
the properties of the stream. For example:
// No error handling
GetStreamRequest getStreamRequest =
GetStreamRequest.builder()
.streamId(streamId)
.build();
Stream stream = adminClient.getStream(getStreamRequest).getStream();
To delete a stream, use the deleteStream
method API of the
StreamAdminClient
. Deleting a stream is an asynchronous operation;
the stream state changes to Deleted
once the delete operation is
finished. During the deletion process, the stream can't be used for consuming or
producing messages.
The following example shows how to use the deleteStream
method to delete
a stream:
// No error handling
DeleteStreamRequest deleteStreamRequest =
DeleteStreamRequest.builder()
.streamId(stream.getId())
.build();
adminClient.deleteStream(deleteStreamRequest);
Managing Kafka Connect Configurations
In order to use Kafka Connect with Streaming, you need a Kafka Connect configuration, or Kafka Connect harness. You can retrieve the OCID for a harness when you create a new harness or use an existing one. For more information, see Using Kafka Connect.
The following code example shows how to create a Kafka Connect harness:
CreateConnectHarnessDetails createConnectHarnessDetails = CreateConnectHarnessDetails.builder()
.compartmentId(compartment) //compartment where you want to create connect harness
.name("myConnectHarness") //connect harness name
.build();
CreateConnectHarnessRequest connectHarnessRequest = CreateConnectHarnessRequest.builder()
.createConnectHarnessDetails(createConnectHarnessDetails)
.build();
CreateConnectHarnessResponse createConnectHarnessResponse = streamAdminClient.createConnectHarness(connectHarnessRequest);
ConnectHarness connectHarness = createConnectHarnessResponse.getConnectHarness();
while (connectHarness.getLifecycleState() != ConnectHarness.LifecycleState.Active && connectHarness.getLifecycleState() != ConnectHarness.LifecycleState.Failed) {
GetConnectHarnessRequest getConnectHarnessRequest = GetConnectHarnessRequest.builder().connectHarnessId(connectHarness.getId()).build();
connectHarness = streamAdminClient.getConnectHarness(getConnectHarnessRequest).getConnectHarness();
}
The following code example shows how to list Kafka Connect harnesses:
ListConnectHarnessesRequest listConnectHarnessesRequest = ListConnectHarnessesRequest.builder()
.compartmentId(comaprtment) // compartment id to list all the connect harnesses.
.lifecycleState(ConnectHarnessSummary.LifecycleState.Active)
.build();
ListConnectHarnessesResponse listConnectHarnessesResponse = streamAdminClient.listConnectHarnesses(listConnectHarnessesRequest);
List<ConnectHarnessSummary> items = listConnectHarnessesResponse.getItems();
Publishing Messages
Once a stream is created and active, you can publish messages using the
streamClient.putMessages
method. See Publishing Messages
for more information about publishing.
The following code example shows how to publish a message:
// No error handling
List<PutMessagesDetailsEntry> messages = new ArrayList<>();
for (int i = 0; i < 40; i++) {
byte[] key = "<myKey>".getBytes(Charsets.UTF_8); // In this case, all messages will go on the same partition since the key is the same.
byte[] value = UUID.randomUUID().toString().getBytes(Charsets.UTF_8);
messages.add(new PutMessagesDetailsEntry(key, value));
}
PutMessagesDetails putMessagesDetails =
PutMessagesDetails.builder()
.messages(messages)
.build();
PutMessagesRequest putMessagesRequest =
PutMessagesRequest.builder()
.putMessagesDetails(putMessagesDetails)
.build();
PutMessagesResult putMessagesResult = streamClient.putMessages(putMessagesRequest).getPutMessagesResult();
// It's not because the call didn't fail that the messages were successfully published!
int failures = putMessagesResult.getFailures();
// If failures is > 0, it means we have a partial-success call.
List<PutMessagesResultEntry> entries = putMessagesResult.getEntries();
// entries is a list of the same size as the list of messages you sent.
// It is guaranteed that the order of the messages is the same as when you sent them.
// Each entry contains either "offset/partition/timestamp" if the message was successfully published
// or "error/errorMessage" if it failed.
if (failures != 0) {
entries.forEach(entry -> {
if (StringUtils.isNotEmpty(entry.getError())) {
// That particular message failed to get published.
// It could be a throttle error and in that case the error would be "429" and errorMessage would contain a meaningful message.
// Or it could be an internal error and the error would be "500".
// Possible solution would be to republish only failed messages.
}
});
}
Consuming Messages
Consuming messages requires the use of a cursor, which is a pointer to an offset into a partition. First you must create a cursor, then you must use the cursor to get messages. For more information, see Using Cursors and Getting Messages.
Cursors
This example creates a TRIM_HORIZON
cursor, which starts consuming
starting from the oldest available message:
// No error handling
CreateCursorDetails createCursorDetails =
CreateCursorDetails.builder()
.type(Type.TrimHorizon)
.partition("0")
.build();
// If using AT_OFFSET or AFTER_OFFSET you need to specify the offset [builder].offset(offset)
// If using AT_TIME you need to specify the time [builder].time(new Date(xxx))
CreateCursorRequest createCursorRequest =
CreateCursorRequest.builder()
.createCursorDetails(createCursorDetails)
.build();
String cursor = streamClient.createCursor(createCursorRequest).getCursor().getValue();
// Cursor will then be used to get messages from the stream.
GetMessages
Once you've created a cursor, you can start to consume messages using the GetMessages method. Each call to GetMessages returns the cursor to use in the next GetMessages call.
Here's an example of using a cursor to retrieve messages:
// No error handling (there is a high chance of getting a throttling error using a tight loop)
while (true) { // or your own exit condition
GetMessagesRequest getMessagesRequest =
GetMessagesRequest.builder()
.cursor(cursor)
.build();
GetMessagesResponse getMessagesResponse = streamClient.getMessages(getMessagesRequest);
// This could be empty, but we will always return an updated cursor
getMessagesResponse.getItems().forEach(message -> {
// Process the message
});
cursor = getMessagesResponse.getOpcNextCursor();Consuming Messages
Consuming Messages as a Group
Consumers can be configured to consume messages as part of a group. Stream partitions are distributed among members of a consumer group so that messages from any single partition are only sent to a single consumer. Group consumption is accomplished using the same cursor mechanism as with single consumers, but using a different kind of cursor.
For more information, see Using Consumer Groups.
To create a consumer group, create a group cursor, providing a group, instance name, and cursor type. Group cursors support the following cursor types:
TRIM_HORIZON
- The group will start consuming from the oldest available message in the stream.AT_TIME
- The group will start consuming from a given time. The timestamp of the returned message will be on or after the supplied time.LATEST
- The group will start consuming messages that were published after you created the cursor.
Consumer groups are created on the first request to create a cursor. For example:
CreateGroupCursorRequest groupRequest = CreateGroupCursorRequest.builder()
.streamId(streamId)
.createGroupCursorDetails(CreateGroupCursorDetails.builder()
.groupName(groupName)
.instanceName(instanceName)
.type(CreateGroupCursorDetails.Type.TrimHorizon)
.commitOnGet(true)
.build())
.build());
CreateGroupCursorResponse groupCursorResponse = streamClient.createGroupCursor(groupRequest);
String groupCursor = groupCursorResponse.getCursor().getValue();
// this groupCursor can be used in the same message loop a described above; subsequent getMessages calls return an updated groupCursor.
Once you've created a group cursor, you can start to consume messages using GetMessages.