Kafka Connectの使用
Oracle Cloud Infrastructure StreamingでKafka Connectを使用します。
Kafka接続構成の管理では、コンソール、CLIおよびAPIを使用するステップを示します。
KafkaコネクタをOracle Cloud Infrastructure Streamingで使用するには、Kafka Connect構成を作成します。ストリーミングAPIは、これらの構成ハーネスをコールします。
Kafka Connect構成は、同じコンパートメント内のストリームに対してのみ機能します。
同じKafka Connect構成を持つ複数のKafkaコネクタを使用できます。別個のコンパートメントでストリームを生成または消費する必要がある場合、またはKafka Connect構成でのスロットル制限に達すること(例: コネクタが多すぎる、コネクタのワーカーが多すぎる)を回避するためにさらに多くの容量が必要な場合は、Kafkaコネクタ構成を追加で作成できます。
Kafka Connect構成の管理の詳細は、Kafka接続構成の管理を参照してください。
Kafkaコネクタ
ストリーミングでのKafka Connect互換性とは、既存の多くのファーストパーティ・コネクタやサードパーティ・コネクタを利用して、ソースからターゲットにデータを移動できることを意味します。
Oracle製品用のKafkaコネクタ:
- Oracle Cloud Infrastructure Object Storage (Kafka Connect for S3を使用)
- Kafka Connect Amazon S3ソース・コネクタ(プロデューサ用)
- Kafka Connect Amazon S3シンク・コネクタ(コンシューマ用)
- Oracle Integration Cloud
- Oracle Database (Kafka Connect JDBCを使用)
- Oracle GoldenGate
サードパーティのKafkaソース・コネクタおよびシンク・コネクタの完全なリストは、公式のConfluent Kafkaハブを参照してください。
Kafka Connectトピック
ストリーミング・サービスは、Kafka Connect構成の作成時にKafka Connectを使用するために必要な3つのトピック(config、offsetおよびstatus)を自動的に作成します。これらのトピックの名前には、Kafka Connect構成のOCIDが含まれています。
これらのトピック名を、ストリーミングで使用するKafkaコネクタのconnect-distributed.properties
ファイルに配置します。
例:
# Relevant Kafka Connect setting
config.storage.topic:<connect_configuration_OCID>-config
offset.storage.topic:<connect_configuration_OCID>-offset
status.storage.topic:<connect_configuration_OCID>-status
これらの3つの圧縮されたトピックは、構成および状態管理データを格納するためにKafka ConnectおよびStreamingで使用されることを意図しています。データの保存に使用しないでください。Kafka Connect構成のトピックが、コネクタによって意図されている目的で使用されるように、これらのトピックに対して50kb/sと50rpsのハード・スロットル制限があります。
ブートストラップ・サーバー
Kafkaコネクタ・プロパティ・ファイルのブートストラップ・サーバーを、ポート9092のストリーミングのエンドポイントに設定します。例:
streaming.us-phoenix-1.oci.oraclecloud.com:9092
認証
Kafkaプロトコルを使用した認証では、認証トークンとSASL/PLAINメカニズムが使用されます。コンソールのユーザー詳細ページでトークンを生成できます。詳細は、認証トークンの作業を参照してください。
専用のグループを作成し、適切なコンパートメントまたはテナンシでストリームを管理するための権限を付与します。その後、作成したユーザーの認証トークンを生成し、Kafkaクライアント構成で使用できます。
Kafkaコネクタ・プロパティ・ファイルの例
次に、Kafkaコネクタのconnect-distributed.properties
ファイルの例を示します:
bootstrap.servers=<streaming_endpoint>:9092
sasl.mechanism=PLAIN
security.protocol=SASL_PLAINTEXT
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<userid>" password="<authToken>";
producer.sasl.mechanism=PLAIN
producer.security.protocol=SASL_PLAINTEXT
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<userid>" password="<authToken>";
consumer.sasl.mechanism=PLAIN
consumer.security.protocol=SASL_PLAINTEXT
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<userid>" password="<authToken>";
config.storage.topic:<connect_configuration_OCID>-config
offset.storage.topic:<connect_configuration_OCID>-offset
status.storage.topic:<connect_configuration_OCID>-status
必要なIAMポリシー
Oracle Cloud Infrastructureを使用するには、管理者がテナンシ管理者によってポリシーでセキュリティ・アクセス権が付与されたグループのメンバーである必要があります。コンソールまたは(SDK、CLIまたはその他のツールを使用した) REST APIのどれを使用しているかにかかわらず、このアクセス権が必要です。権限がない、または認可されていないというメッセージが表示された場合は、どのタイプのアクセス権があり、どのコンパートメントでアクセスが機能するかをテナンシ管理者に確認してください。
グループがKafka Connect構成を管理できるようにするには、テナンシで正しいポリシーを作成する必要があります。例:
allow group <identity_domain_name>/<group_name> KafkaAdmins to manage connect-harnesses in tenancy
管理者の場合: ストリーミング管理者によるストリーミング・リソースの管理のポリシーにより、指定したグループは、ストリーミングおよび関連するストリーミング・サービス・リソースに対してすべてのことを実行できます。
ポリシーを初めて使用する場合は、IAMポリシーの概要を参照してください。詳細は、次を参照してください: