18 Apache Kafkaアダプタ

Apache Kafkaアダプタを使用すると、Apache Kafkaメッセージング・システムに接続する統合を作成できます。Apache Kafkaアダプタは、Apache Kafka分散パブリッシュ/サブスクライブ・メッセージング・システムに接続し、Kafkaトピックからのメッセージのパブリッシュと消費を可能にします。

Apache Kafkaアダプタの概要

Apache Kafkaアダプタには、次の利点があります:

  • Apache Kafkaメッセージング・システムへの接続を確立して、メッセージのパブリッシュと消費を可能にします。
  • Kafkaトピックからメッセージを消費し、呼出し(アウトバウンド)方向でKafkaトピックへのメッセージを生成します。
  • アダプタ・エンドポイント構成ウィザードを使用して、使用可能なメタデータ(つまり、メッセージをパブリッシュおよび消費する宛先のトピックとパーティション)を参照できます。
  • コンシューマ・グループがサポートされます。
  • ヘッダーがサポートされます。
  • 次のメッセージ構造がサポートされます:
    • Avroスキーマ
    • サンプルJSON
    • XMLスキーマ(XSD)
    • サンプルXML
  • 次のセキュリティ・ポリシーがサポートされます:
    • Simple Authentication and Security Layer Plain (SASLPLAIN)
    • SASL Plain over SSL
    • TLS
    • 相互TLS
  • SSLを介したApache Kafkaメッセージング・システムへの直接接続がサポートされます。
  • Kafkaプロデューサをトランザクション・プロデューサとして構成できます(オプション)。これにより、アプリケーションは複数のパーティションにメッセージをアトミックに送信できます。

https://kafka.apache.org/を参照してください。

Apache Kafkaアダプタは、Oracle SOA Suiteに含まれる多数の事前定義済アダプタの1つです。

Apache Kafkaアダプタを構成するワークフロー

Apache Kafkaアダプタを構成するには、非常に単純なワークフローに従います。

次の表に、アダプタ・タスクと全体的なSOAコンポジット・タスクの両方のワークフロー・ステップを示し、各ステップの手順説明へのリンクを記載します。

ステップ 説明 参照先
1 BPELプロセス・サービス・コンポーネントを含むSOAアプリケーションをOracle JDeveloperで作成します。 「SOAコンポジット・アプリケーションの開発のスタート・ガイド」および「Oracle BPEL Process Managerのスタート・ガイド」を参照してください。
2 Apache Kafkaアダプタを追加して構成します。

Oracle JDeveloperの「コンポーネント」ウィンドウから「Apache Kafkaアダプタ」をドラッグ・アンド・ドロップします。Apache Kafkaアダプタの「ようこそ」画面が表示されます。詳細は、「Apache Kafkaアダプタの構成」を参照してください。

Kafkaサーバーを構成し、実行中にする必要があります。

3 サービス、サービス・コンポーネントおよび参照を接続します。 「SOAコンポジット・アプリケーションの開発のスタート・ガイド」ワイヤの追加に関する項を参照してください。
4 SOAコンポジット・アプリケーションをデプロイします。 「SOAコンポジット・アプリケーションの開発のスタート・ガイド」SOAコンポジット・アプリケーションのデプロイに関する項を参照してください。
5 SOAコンポジット・アプリケーションを管理およびテストします 「SOAコンポジット・アプリケーションの開発のスタート・ガイド」SOAコンポジット・アプリケーションの管理およびテストに関する項を参照してください。

前提条件

SOAコンポジット・アプリケーションでApache Kafkaアダプタを構成および追加するには、次の前提条件を満たす必要があります。

ブートストラップ・サーバーのホストおよびポートの確認

Kafkaブローカのリストへの接続に使用するブートストラップ・サーバーのホストおよびポートを確認します。

セキュリティ・ポリシー詳細の取得

Apache Kafkaアダプタに関する次のセキュリティ・ポリシー詳細を取得します。

  • Simple Authentication and Security Layer (SASL) Plain over SSLまたはSASL Plainセキュリティ・ポリシーを使用している場合は、SASLのユーザー名とパスワードを確認します。
  • SASL Plain over SSL、TLS、または相互TLSのポリシーを使用するには、必要な証明書を準備します。

以下の制限に注意してください。

  • Apache Kafkaアダプタでは、Apache Kafkaシリアライザ/デシリアライザ(String/ByteArray)がサポートされています。Confluentや他のシリアライザ/デシリアライザはサポートされていません。
  • SASL PLAIN over SSLセキュリティ・ポリシーのみがサポートされます。
  • XML/JSONおよびAVROメッセージ構造がサポートされます。それ以外の構造/形式はサポートされていません。
  • スキーマ・レジストリは、Apache Kafkaアダプタではサポートされていません。

Apache Kafkaアダプタの構成

Apache Kafkaアダプタを外部参照スイムレーンにドラッグすると、アダプタ・エンドポイント構成ウィザードが起動します。このウィザードで示される順序に従って、Apache Kafkaアダプタのエンドポイントのプロパティを構成します。

次の各項では、Apache KafkaアダプタをSOAコンポジット・アプリケーションの統合のトリガーおよび呼出しとして構成する手順をガイドするウィザード・ページについて説明します。

「基本情報」ページ

アプリケーションで各アダプタの「基本情報」ページに名前と説明を入力できます。

要素 説明
エンドポイントにどのような名前を付けますか。

他の人にもこのアダプタの役割がわかるように、意味のある名前を指定します。名前には、英語のアルファベット文字、数字、アンダースコアおよびハイフンを使用できます。次の文字を含めることはできません:

  • 空白文字(My Inbound Connectionなど)は含めない
  • アンダースコアおよびハイフン以外の特殊文字(#;83&、righ(t)now4など)は含めない
  • マルチバイト文字は含めない
このエンドポイントでは何が行われますか。

アダプタの役割の説明(オプション)を入力します。たとえば:

このアダプタは、インバウンド・リクエストを受信してアカウント情報とクラウド・アプリケーションを同期化します。

「接続情報」ページ

Apache Kafkaアダプタの接続詳細を指定します。

要素 説明
接続URL

Apache KafkaインスタンスのURLを入力します。

セキュリティ・ポリシー

環境に適したセキュリティ・ポリシー(USERNAME_PASSWORD_TOKENなど)を選択します。

  • ウィザードには、適用できないものを含むすべてのポリシーが表示されます。正しい選択を行うには、ポリシーに関する知識が必要です。たとえば、アイデンティティは伝播されないため、SAMLベースのポリシーは選択できません。
  • クラウド・アダプタに適用するポリシーは、そのクラウド・アダプタに一意のもので、コンポジットの他のエンドポイントに影響を与えることはありません。
認証キー
CSF認証キーを選択します。
  • 追加: クリックして新しい認証キーを作成します。キー名、ユーザー名およびパスワードを指定する必要があります。
  • 編集: クリックして認証キーを編集します。
  • 削除: クリックして認証キーを削除します。
テスト クリックして、認証キーを検証します。

「操作」ページ

実行する操作を選択します。

要素 説明
Kafkaトピックに対してどのような操作を実行しますか。
  • レコードをKafkaトピックにパブリッシュする。
  • Kafkaトピックからレコードを消費する。
  • オフセットを指定してKafkaトピックからレコードを消費する。

「トピックおよびパーティション」ページ

操作および操作を実行するトピックを選択し、オプションでメッセージ構造を指定します。

呼出し接続としてのApache Kafkaアダプタの構成
要素 説明
トピックの選択 操作を実行するトピックを選択します。トピックの最初の数文字を入力して、トピックの表示をフィルタすることもできます。トピックとは、アプリケーションがメッセージを追加、処理および再処理できるカテゴリです。トピック単位でメッセージにサブスクライブします。
パーティションの指定(このフィールドは、「レコードをKafkaトピックにパブリッシュする」または「Kafkaトピックからレコードを消費する」を選択した場合にのみ表示されます。) 選択したトピックをプッシュする宛先のパーティションを指定します。Kafkaトピックは、データを複数のブローカ間で分割できるようにパーティションに分割されています。特定のパーティションを選択せずに「デフォルト」の選択を使用すると、Kafkaによって使用可能なすべてのパーティションが考慮され、使用するパーティションが決定されます。
グループの消費(このフィールドは、「Kafkaトピックからレコードを消費する」を選択した場合にのみ表示されます。) アタッチするコンシューマ・グループを指定します。コンシューマは、同じグループIDを使用してグループに参加します。Kafkaによって、トピックのパーティションがグループ内のコンシューマに割り当てられます。
メッセージ消費のオプションの指定(このフィールドは、「Kafkaトピックからレコードを消費する」を選択した場合にのみ表示されます。)
  • 最新のものを読み取る: アプリケーションがデプロイされた時点から始まる最新のメッセージが読み取られます。
  • 最初から読み取る: 最初からメッセージを読み取る場合に選択します。たとえば、最初から読み取るように選択してアプリケーションをデプロイした場合、最初のスケジュール済実行では20件のレコードが選択され、その次のスケジュール済実行では次の20件のレコードが選択されます。その後にアプリケーションが非アクティブ化、編集および再アクティブ化されると、次の20件のレコードが選択されます。

最初からメッセージを読み取るようにアダプタを構成し、再編集した場合は、次のオプションが表示されます:

  • 先頭にリセット: メッセージを最初から再度読み取ります(重複が発生する可能性があります)。
  • 読取りを続行: 非アクティブ化の前に中断したところから次のメッセージを読み取ります。

ノート:

「最初から読み取る」または「最新のものを読み取る」オプションを選択した場合、1回の実行ですべてのメッセージが消費されることは保証されません。ただし、残ったメッセージは後続の実行で消費されます。
フェッチする最大レコード数(このフィールドは、「Kafkaトピックからレコードを消費する」または「オフセットを指定してKafkaトピックからレコードを消費する」を選択した場合にのみ表示されます。)

読み取るメッセージ数を指定します。完全なメッセージ・ペイロードのしきい値は10MBです。

ノート:

このフィールドは、フェッチするレコードの上限を指定します。指定した量のレコードが1回の実行でストリームから取得されることは保証されません。残ったメッセージは後続の実行でフェッチされます。
メッセージ構造を指定しますか。 ウィザードの「メッセージ構造」ページで、使用するメッセージ構造を定義する場合は、「はい」を選択します。そうでない場合は、「いいえ」を選択します。
メッセージのヘッダーを指定しますか。 ウィザードの「ヘッダー」ページで、使用するメッセージ・ヘッダーを定義する場合は、「はい」を選択します。そうでない場合は、「いいえ」を選択します。
拡張構成の確認と更新 「編集」をクリックして「拡張オプション」セクションを開き、トランザクション・プロデューサを有効または無効にします。
  1. トランザクション・プロデューサ: このオプションには次のような機能があります:
    1. 選択した場合、トランザクション・プロデューサはアプリケーションからメッセージを複数のパーティションにアトミックに送信できるようになります。
    2. 選択しない場合、Apache Kafkaアダプタは非トランザクション・プロデューサとして構成されます。
  2. メッセージ・タイプ: このオプションでは、メッセージ・タイプを定義します。選択可能なオプションは、「文字列」または「バイト」です。これによってメッセージに使用するシリアライザが定義されます。ここで選択した内容は、メッセージ・キーとメッセージ値に適用されます。
トリガー接続としてのApache Kafkaアダプタの構成
要素 説明
トピックの選択 操作を実行するトピックを選択します。トピックの最初の数文字を入力して、トピックの表示をフィルタすることもできます。トピックとは、アプリケーションがメッセージを追加、処理および再処理できるカテゴリです。トピック単位でメッセージにサブスクライブします。
パーティションの指定 選択したトピックをプッシュする宛先のパーティションを指定します。Kafkaトピックは、データを複数のブローカ間で分割できるようにパーティションに分割されています。特定のパーティションを選択せずに「デフォルト」の選択を使用すると、Kafkaによって使用可能なすべてのパーティションが考慮され、使用するパーティションが決定されます。
コンシューマ・グループ アタッチするコンシューマ・グループを指定します。コンシューマは、同じグループIDを使用してグループに参加します。Kafkaによって、トピックのパーティションがグループ内のコンシューマに割り当てられます。
ポーリング頻度(秒)

レコードをフェッチする頻度を指定します。

フェッチする最大レコード数

読み取るメッセージ数を指定します。完全なメッセージ・ペイロードのしきい値は10MBです。

ノート:

このフィールドは、フェッチするレコードの上限を指定します。指定した数のレコードが1回の実行でストリームから取得されることは保証されません。残ったメッセージは後続の実行でフェッチされます。
メッセージ構造を指定しますか。 ウィザードの「メッセージ構造」ページで、使用するメッセージ構造を定義する場合は、「はい」を選択します。そうでない場合は、「いいえ」を選択します。
メッセージのヘッダーを指定しますか。 ウィザードの「ヘッダー」ページで、使用するメッセージ・ヘッダーを定義する場合は、「はい」を選択します。そうでない場合は、「いいえ」を選択します。
拡張構成の確認と更新 「編集」をクリックして「拡張オプション」セクションを開き、トランザクション・プロデューサを有効または無効にします。
  1. トランザクション・プロデューサ: このオプションには次のような機能があります:
    1. 選択した場合、トランザクション・プロデューサはアプリケーションからメッセージを複数のパーティションにアトミックに送信できるようになります。
    2. 選択しない場合、Apache Kafkaアダプタは非トランザクション・プロデューサとして構成されます。
  2. メッセージ・タイプ: このオプションでは、メッセージ・タイプを定義します。選択可能なオプションは、「文字列」または「バイト」です。これによってメッセージに使用するシリアライザが定義されます。ここで選択した内容は、メッセージ・キーとメッセージ値に適用されます。

「メッセージ構造」ページ

使用するメッセージ構造を選択します。このページは、「トピックおよびパーティション」ページの「メッセージ構造を指定しますか。」フィールドで「はい」を選択した場合に表示されます。

要素 説明
メッセージ構造をどのように指定しますか。
  • Avroスキーマ・ドキュメント
  • サンプルJSONドキュメント
  • XMLスキーマ(XSD)ドキュメント
  • サンプルXMLドキュメント
ファイルの選択 「参照」をクリックしてファイルを選択します。選択すると、「ファイル名」フィールドにファイル名が表示されます。
要素 XSDまたはAvroファイルを指定した場合は、要素を選択します。

「ヘッダー」ページ

メッセージにアタッチするメッセージ・ヘッダー構造を定義します。このページは、「トピックおよびパーティション」ページの「メッセージのヘッダーを指定しますか。」フィールドで「はい」を選択した場合に表示されます。

要素 説明
メッセージ・ヘッダーの指定 ヘッダーと説明(オプション)を指定します。

「サマリー」ページ

「サマリー」ページで、指定したアダプタの構成値を確認できます。

要素 説明
サマリー

ウィザードの前のページで定義した構成値のサマリーを表示します。

表示される情報は、アダプタによって異なる場合があります。一部のアダプタでは、選択したビジネス・オブジェクトおよび操作名が表示されます。生成されたXSDファイルが提供されるアダプタの場合は、XSDリンクをクリックすると、ファイルの読取り専用バージョンが表示されます。

前のページに戻って値を更新するには、左パネルの適切なタブをクリックするか、「戻る」をクリックします。

構成詳細を取り消すには、「取消」をクリックします。

Apache Kafkaアダプタを使用した共通パターンの実装

Apache Kafkaアダプタを使用して、次の共通パターンを実装できます。

Apache Kafkaトピックへのメッセージの発行

FTPアダプタおよびステージ・ファイルの読取りアクションを使用してレコードを読み取り、Apache Kafkaアダプタの発行操作を使用してレコードをApache Kafkaトピックにパブリッシュするように、SOAコンポジット・アプリケーションを構成できます。

次の統合では、このパターンの実装方法の一例を示します:

  • プロジェクトを含むSOAコンポジット・アプリケーション。
  • SOAコンポジットとFTPアダプタの間でソースからターゲットへの適切なマッピングを実行するBPELプロセス。
  • 入力ディレクトリからファイル(レコード)をフェッチしてダウンロード・ディレクトリに保存するFTPアダプタ。
  • 次のように構成されたステージ・ファイル・アクション:
    • ダウンロード・ディレクトリ内の各ファイル(レコード)に対してセグメント内のファイル読取り操作を実行する。
    • 使用するメッセージの内容に構造(この例では、XMLスキーマ(XSD)ドキュメント)を指定する。
    • ステージ・ファイル・アクションとApache Kafkaアダプタの間でソースからターゲットへの適切なマッピングを実行する。
  • 次のように構成されたApache Kafkaアダプタ:
    • レコードをKafkaトピックにパブリッシュする。
    • 使用するメッセージ構造(この例では、XMLスキーマ(XSD)ドキュメント)およびメッセージに使用するヘッダーを指定する。
  • Apache KafkaアダプタとFTPアダプタの間でソースからターゲットへの適切なマッピングを実行するBPELプロセス。
  • 処理の完了時にダウンロード・ディレクトリからファイルを削除するFTPアダプタ。

完了後のSOAコンポジット・アプリケーションは次のようになります:



Apache Kafkaトピックからのメッセージの消費

Apache Kafkaアダプタを使用してApache Kafkaトピックからメッセージを消費するようにSOAコンポジット・アプリケーションを構成できます。

次の使用例では、このパターンの実装方法の一例を示します:

  • プロジェクトを含むSOAコンポジット・アプリケーション。
  • SOAコンポジットとFTPアダプタの間でソースからターゲットへの適切なマッピングを実行するBPELプロセス。
  • 次のように構成されたApache Kafkaアダプタ:
    • Kafkaトピックからレコードを消費する。
    • アタッチするコンシューマ・グループを指定します。Kafkaによって、トピックのパーティションがグループ内のコンシューマに割り当てられます。
    • メッセージ消費のオプションとして「最新のものを読み取る」を指定します。SOAコンポジットがデプロイされた時点から始まる最新のメッセージが読み取られます。
    • 使用するメッセージ構造(この例では、XMLスキーマ(XSD)ドキュメント)およびメッセージに使用するヘッダーを指定する。

完了後の設定は次のようになります: