Kafka Connectの使用

この情報では、Kafka Connectの使用について説明します。

Kafka接続構成の管理には、コンソール、CLIおよびAPIを使用するステップがあります。OCI SDKの使用ステップは、次のとおりです。

KafkaコネクタをOracle Cloud Infrastructure Streamingで使用するには、コンソールまたはコマンドライン・インタフェース(CLI)を使用してKafka Connect構成を作成します。ストリーミングAPIは、これらの構成ハーネスをコールします。

ノート

特定のコンパートメントで作成されたKafka Connect構成は、同じコンパートメント内のストリームに対してのみ機能します。

同じKafka Connect構成を持つ複数のKafkaコネクタを使用できます。別個のコンパートメントでストリームを生成または消費する必要がある場合、またはKafka Connect構成でのスロットル制限に達すること(例: コネクタが多すぎる、コネクタのワーカーが多すぎる)を回避するためにさらに多くの容量が必要な場合は、Kafkaコネクタ構成を追加で作成できます。

コンソールおよびストリーミングAPIを使用してKafka Connect構成を管理する方法の詳細は、Kafka接続構成の管理を参照してください。

Kafkaコネクタ

ストリーミングのKafka Connect互換性とは、既存の多くのファーストパーティ・コネクタやサードパーティ・コネクタを利用して、ソースからターゲットにデータを移動できることを意味します。

Oracle製品用のKafkaコネクタ:

サードパーティのKafkaソース・コネクタおよびシンク・コネクタの完全なリストは、公式のConfluent Kafkaハブを参照してください。

Kafka Connectトピック

ストリーミング・サービスは、Kafka Connect構成の作成時にKafka Connectを使用するために必要な3つのトピック(configoffsetおよびstatus)を自動的に作成します。これらのトピックの名前には、Kafka Connect構成のOCIDが含まれています。

これらのトピック名を、ストリーミングで使用するKafkaコネクタのconnect-distributed.propertiesファイルに配置します。

例:

# Relevant Kafka Connect setting
config.storage.topic:<connect_configuration_OCID>-config
offset.storage.topic:<connect_configuration_OCID>-offset
status.storage.topic:<connect_configuration_OCID>-status
ノート

これらの3つの圧縮されたトピックは、構成および状態管理データを格納するためにKafka Connectおよびストリーミングで使用されることを意図しているため、データの格納には使用しないでください。Kafka Connect構成のトピックが、コネクタによって意図されている目的で使用されるように、これらのトピックに対して50kb/sと50rpsのハード・スロットル制限があります。

ブートストラップ・サーバー

Kafkaコネクタ・プロパティ・ファイルのブートストラップ・サーバーを、ポート9092のストリーミングのエンドポイントに設定します。例:

streaming.us-phoenix-1.oci.oraclecloud.com:9092
ノート

ストリーミングのエンドポイントのリストは、APIリファレンスおよびエンドポイントストリーミングに関する項を参照してください。

認証

Kafkaプロトコルを使用した認証では、認証トークンとSASL/PLAINメカニズムが使用されます。コンソールのユーザー詳細ページでトークンを生成できます。詳細は、認証トークンの作業を参照してください。

ヒント

専用のグループ/ユーザーを作成し、適切なコンパートメントまたはテナンシでストリームを管理する権限をそのグループに付与することをお薦めします。その後、作成したユーザーの認証トークンを生成し、Kafkaクライアント構成で使用できます。

Kafkaコネクタ・プロパティ・ファイルの例

次に、Kafkaコネクタのconnect-distributed.propertiesファイルの例を示します:

bootstrap.servers=<streaming_endpoint>:9092
sasl.mechanism=PLAIN
security.protocol=SASL_PLAINTEXT
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<userid>" password="<authToken>";
producer.sasl.mechanism=PLAIN
producer.security.protocol=SASL_PLAINTEXT
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<userid>" password="<authToken>";
consumer.sasl.mechanism=PLAIN
consumer.security.protocol=SASL_PLAINTEXT
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<userid>" password="<authToken>";
config.storage.topic:<connect_configuration_OCID>-config
offset.storage.topic:<connect_configuration_OCID>-offset
status.storage.topic:<connect_configuration_OCID>-status

必要なIAMポリシー

Oracle Cloud Infrastructureを使用するには、管理者によってポリシーでセキュリティ・アクセス権が付与されている必要があります。このアクセス権は、コンソール、あるいはSDK、CLIまたはその他のツールを使用したREST APIのいずれを使用している場合でも必要です。権限がない、または認可されていないというメッセージが表示される場合は、管理者に連絡して、どのタイプのアクセス権があり、どのコンパートメントで作業するかを確認してください。

グループがKafka Connect構成を管理できるようにするには、テナンシで正しいポリシーを作成する必要があります。例:

allow group <identity_domain_name>/<group_name> KafkaAdmins to manage connect-harnesses in tenancy

管理者の場合: ストリーミング管理者によるストリーミング・リソースの管理のポリシーにより、指定したグループは、ストリーミングおよび関連するストリーミング・サービス・リソースに対してすべてのことを実行できます。

ポリシーを初めて使用する場合は、ポリシーの開始共通ポリシーを参照してください。ストリーミング・サービス用のポリシーの作成についてさらに詳しく調べる場合は、IAMポリシー・リファレンスの「ストリーミング・サービスの詳細」を参照してください。詳細は、次を参照してください:

SDKを使用したKafka Connect構成の管理

ストリーミングでKafka Connectを使用するには、Kafka Connect構成またはKafka Connectハーネスが必要です。新しいハーネスを作成するか、既存のハーネスを使用する場合、ハーネスのOCIDを取得できます。詳細は、Kafka Connectの使用を参照してください。

Kafka Connectハーネスの作成

次のコード例は、OCI SDK for Javaを使用してKafka Connectハーネスを作成する方法を示しています:

CreateConnectHarnessDetails createConnectHarnessDetails = CreateConnectHarnessDetails.builder()
    .compartmentId(compartment) //compartment where you want to create connect harness
    .name("myConnectHarness") //connect harness name
    .build();
 
CreateConnectHarnessRequest connectHarnessRequest = CreateConnectHarnessRequest.builder()
    .createConnectHarnessDetails(createConnectHarnessDetails)
    .build();
 
CreateConnectHarnessResponse createConnectHarnessResponse = streamAdminClient.createConnectHarness(connectHarnessRequest);
ConnectHarness connectHarness = createConnectHarnessResponse.getConnectHarness();
 
while (connectHarness.getLifecycleState() != ConnectHarness.LifecycleState.Active && connectHarness.getLifecycleState() != ConnectHarness.LifecycleState.Failed) {
    GetConnectHarnessRequest getConnectHarnessRequest = GetConnectHarnessRequest.builder().connectHarnessId(connectHarness.getId()).build();
     connectHarness = streamAdminClient.getConnectHarness(getConnectHarnessRequest).getConnectHarness();
}
Kafka Connectハーネスのリスト

次のコード例は、OCI SDK for Javaを使用してKafka Connectハーネスをリストする方法を示しています:

ListConnectHarnessesRequest listConnectHarnessesRequest = ListConnectHarnessesRequest.builder()
    .compartmentId(compartment) // compartment id to list all the connect harnesses.
    .lifecycleState(ConnectHarnessSummary.LifecycleState.Active)
    .build();
 
ListConnectHarnessesResponse listConnectHarnessesResponse = streamAdminClient.listConnectHarnesses(listConnectHarnessesRequest);
List<ConnectHarnessSummary> items = listConnectHarnessesResponse.getItems();