Kafka APIの使用

このトピックでは、Oracle Cloud Infrastructure StreamingとのAPI互換性のためにApache Kafkaを構成する方法について説明します。プロデューサがKafka APIを使用してストリーミングと対話する場合、どのパーティションに一意のメッセージを公開するかを決定するのは、Kafkaによってクライアント側で処理されます。

詳細は、Kafka APIサポートを参照してください。

エンドポイント

ブートストラップ・サーバーの場合、ポート9092のリージョン・エンドポイントを使用します。例:

streaming.us-phoenix-1.oci.oraclecloud.com:9092

認証

Kafkaプロトコルを使用した認証では、認証トークンとSASL/PLAINメカニズムが使用されます。コンソールのユーザー詳細ページでトークンを生成できます。詳細は、認証トークンの作業を参照してください。

ヒント

専用のグループ/ユーザーを作成し、適切なコンパートメントまたはテナンシでストリームを管理する権限をそのグループに付与します。ストリーミング管理者によるストリーミング・リソースの管理のポリシーにより、指定したグループは、ストリーミングおよび関連するストリーミング・サービス・リソースに対してすべてのことを実行できます。その後、作成したユーザーの認証トークンを生成し、Kafkaクライアント構成で使用できます。

ユーザー名は次のフォーマットである必要があります:

tenancyName/username/streamPoolId
ヒント

Java SDKを使用している場合は、インスタンス・プリンシパル認可を使用することもできます。

Kafka構成

Kafkaクライアントに次のプロパティを設定します。

Java SDKの場合

Java SDKの推奨設定:

Properties properties = new Properties();
	properties.put("bootstrap.servers", "streaming.{region}.oci.oraclecloud.com:9092");
	properties.put("security.protocol", "SASL_SSL");
	properties.put("sasl.mechanism", "PLAIN");
	properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"{tenancyName}/{username}/{streamPoolId}\" password=\"{authToken}\";");

Java SDKプロデューサの推奨設定:

properties.put("retries", 5); // retries on transient errors and load balancing disconnection
		properties.put("max.request.size", 1024 * 1024); // limit request size to 1MB

Java SDKコンシューマの推奨設定:

properties.put("max.partition.fetch.bytes", 1024 * 1024); // limit request size to 1MB per partition
Librdkafka SDKの場合

Librdkafka SDKの推奨設定:

'metadata.broker.list': 'streaming.{region}.oci.oraclecloud.com:9092',
			 'security.protocol': 'SASL_SSL',
		 	 'sasl.mechanisms': 'PLAIN',
			 'sasl.username': '{tenancyName}/{username}/{streamPoolID}',
		         'sasl.password': '{authToken}'

Librdkafka SDKプロデューサの推奨設定:

'message.send.max.retries': 5 // retries on transient errors and load balancing disconnection
		'max.request.size': 1024 * 1024 // limit request size to 1 MB

Librdkafka SDKコンシューマの推奨設定:

'max.partition.fetch.bytes': 1024 * 1024 // limit request size to 1 MB per partition

Java SDKのインスタンス・プリンシパル認可

Java SDKを使用している場合、認証トークンを使用するかわりに、インスタンスがストリーミングと対話することを認可できます。

インスタンス・プリンシパル認可用のJava SDKを構成するには:

  1. 有効なOracle Cloud Infrastructure (OCI) SDKおよびCLI構成ファイルがあることを確認します。
  2. Oracle Cloud Infrastructure SDK for Javaをプロジェクトにインポートします。詳細は、SDK for Javaの開始を参照してください。
  3. 次のOracle Cloud Infrastructure SDK for Javaの依存性を追加します:
    <dependency>
      <groupId>com.oracle.oci.sdk</groupId>
      <artifactId>oci-apisdk-java-sdk-addons-sasl</artifactId>
      <optional>false</optional>
      <version>1.13.1</version> <!-- that's the minimum version to use -->
    </dependency>
  4. Kafkaクライアント構成sasl.mechanismプロパティを変更します:
    properties.put("sasl.mechanism", OciMechanism.OCI_RSA_SHA256.mechanismName());
  5. 次のオプションのいずれかを使用して、Kafkaクライアント構成sasl.jaas.configプロパティを変更します:
    properties.put("sasl.jaas.config", "com.oracle.bmc.auth.sasl.InstancePrincipalsLoginModule required intent=\"streamPoolId:<streamPoolId>\";");
    properties.put("sasl.jaas.config", "com.oracle.bmc.auth.sasl.UserPrincipalsLoginModule required config=\"<pathToConfig>\" profile=\"<profile>\" intent=\"streamPoolId:<streamPoolId>\";");
    • configが指定されていない場合、デフォルトの構成パスが使用されます(~/.oci/config)。
    • profileが指定されていない場合、デフォルトのプロファイルが使用されます(DEFAULT)。

詳細情報