Using Kafka APIs

This topic describes how to configure Apache Kafka for API compatibility with Oracle Cloud Infrastructure Streaming. When your producers use Kafka APIs to interact with Streaming the decision of which partition to publish a unique message to is handled client-side by Kafka.

Please refer to Kafka API Support for additional information.

Endpoints

For bootstrap servers, use your region endpoint on port 9092. For example:

streaming.us-phoenix-1.oci.oraclecloud.com:9092

Authentication

Authentication with the Kafka protocol uses auth tokens and the SASL/PLAIN mechanism. You can generate tokens in the Console user details page. See Working with Auth Tokens for more information.

Tip

Create a dedicated group/user and grant that group the permission to manage streams in the appropriate compartment or tenancy. The policy in Let streaming admins manage streaming resources lets the specified group do everything with streaming and related Streaming service resources. You then can generate an auth token for the user you created and use it in your Kafka client configuration.

Your username must be in the following format:

tenancyName/username/streamPoolId
Tip

If you are using the Java SDK, you can also use instance principal authorization.

Kafka Configuration

Set the following properties for your Kafka client.

For the Java SDK

Recommended settings for Java SDK:

Properties properties = new Properties();
	properties.put("bootstrap.servers", "streaming.{region}.oci.oraclecloud.com:9092");
	properties.put("security.protocol", "SASL_SSL");
	properties.put("sasl.mechanism", "PLAIN");
	properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"{tenancyName}/{username}/{streamPoolId}\" password=\"{authToken}\";");

Recommended settings for Java SDK producers:

properties.put("retries", 5); // retries on transient errors and load balancing disconnection
		properties.put("max.request.size", 1024 * 1024); // limit request size to 1MB

Recommended settings for Java SDK consumers:

properties.put("max.partition.fetch.bytes", 1024 * 1024); // limit request size to 1MB per partition
For the Librdkafka SDK

Recommended settings for Librdkafka SDK:

'metadata.broker.list': 'streaming.{region}.oci.oraclecloud.com:9092',
			 'security.protocol': 'SASL_SSL',
		 	 'sasl.mechanisms': 'PLAIN',
			 'sasl.username': '{tenancyName}/{username}/{streamPoolID}',
		         'sasl.password': '{authToken}'

Recommended settings for Librdkafka SDK producers:

'message.send.max.retries': 5 // retries on transient errors and load balancing disconnection
		'max.request.size': 1024 * 1024 // limit request size to 1 MB

Recommended settings for Librdkafka SDK consumers:

'max.partition.fetch.bytes': 1024 * 1024 // limit request size to 1 MB per partition

Instance Principal Authorization for the Java SDK

If you are using the Java SDK, you can authorize an instance to interact with Streaming instead of using auth tokens.

To configure the Java SDK for instance principal authorization:

  1. Verify that you have a valid Oracle Cloud Infrastructure (OCI) SDK and CLI configuration file.
  2. Import the Oracle Cloud Infrastructure SDK for Java into your project. See Getting Started with the SDK for Java for more information.
  3. Add the following Oracle Cloud Infrastructure SDK for Java dependency:
    <dependency>
      <groupId>com.oracle.oci.sdk</groupId>
      <artifactId>oci-java-sdk-addons-sasl</artifactId>
      <optional>false</optional>
      <version>1.13.1</version> <!-- that's the minimum version to use -->
    </dependency>
  4. Modify the sasl.mechanism property of your Kafka client configuration:
    properties.put("sasl.mechanism", OciMechanism.OCI_RSA_SHA256.mechanismName());
  5. Modify the sasl.jaas.config property of your Kafka client configuration using one of the following options:
    properties.put("sasl.jaas.config", "com.oracle.bmc.auth.sasl.InstancePrincipalsLoginModule required intent=\"streamPoolId:<streamPoolId>\";");
    properties.put("sasl.jaas.config", "com.oracle.bmc.auth.sasl.UserPrincipalsLoginModule required config=\"<pathToConfig>\" profile=\"<profile>\" intent=\"streamPoolId:<streamPoolId>\";");
    • If config is not specified, the default config path is used (~/.oci/config).
    • If profile is not specified, the default profile is used (DEFAULT).

For More Information