18 Apache Kafkaアダプタ
Apache Kafkaアダプタを使用すると、Apache Kafkaメッセージング・システムに接続する統合を作成できます。Apache Kafkaアダプタは、Apache Kafka分散パブリッシュ/サブスクライブ・メッセージング・システムに接続し、Kafkaトピックからのメッセージのパブリッシュと消費を可能にします。
Apache Kafkaアダプタの概要
Apache Kafkaアダプタには、次の利点があります:
- Apache Kafkaメッセージング・システムへの接続を確立して、メッセージのパブリッシュと消費を可能にします。
- Kafkaトピックからメッセージを消費し、呼出し(アウトバウンド)方向でKafkaトピックへのメッセージを生成します。
- アダプタ・エンドポイント構成ウィザードを使用して、使用可能なメタデータ(つまり、メッセージをパブリッシュおよび消費する宛先のトピックとパーティション)を参照できます。
- コンシューマ・グループがサポートされます。
- ヘッダーがサポートされます。
- 次のメッセージ構造がサポートされます:
- Avroスキーマ
- サンプルJSON
- XMLスキーマ(XSD)
- サンプルXML
- 次のセキュリティ・ポリシーがサポートされます:
- Simple Authentication and Security Layer Plain (SASLPLAIN)
- SASL Plain over SSL
- TLS
- 相互TLS
- SSLを介したApache Kafkaメッセージング・システムへの直接接続がサポートされます。
- Kafkaプロデューサをトランザクション・プロデューサとして構成できます(オプション)。これにより、アプリケーションは複数のパーティションにメッセージをアトミックに送信できます。
https://kafka.apache.org/を参照してください。
Apache Kafkaアダプタは、Oracle SOA Suiteに含まれる多数の事前定義済アダプタの1つです。
Apache Kafkaアダプタを構成するワークフロー
Apache Kafkaアダプタを構成するには、非常に単純なワークフローに従います。
次の表に、アダプタ・タスクと全体的なSOAコンポジット・タスクの両方のワークフロー・ステップを示し、各ステップの手順説明へのリンクを記載します。
ステップ | 説明 | 参照先 |
---|---|---|
1 | BPELプロセス・サービス・コンポーネントを含むSOAアプリケーションをOracle JDeveloperで作成します。 | 「SOAコンポジット・アプリケーションの開発のスタート・ガイド」および「Oracle BPEL Process Managerのスタート・ガイド」を参照してください。 |
2 | Apache Kafkaアダプタを追加して構成します。 |
Oracle JDeveloperの「コンポーネント」ウィンドウから「Apache Kafkaアダプタ」をドラッグ・アンド・ドロップします。Apache Kafkaアダプタの「ようこそ」画面が表示されます。詳細は、「Apache Kafkaアダプタの構成」を参照してください。 Kafkaサーバーを構成し、実行中にする必要があります。 |
3 | サービス、サービス・コンポーネントおよび参照を接続します。 | 「SOAコンポジット・アプリケーションの開発のスタート・ガイド」のワイヤの追加に関する項を参照してください。 |
4 | SOAコンポジット・アプリケーションをデプロイします。 | 「SOAコンポジット・アプリケーションの開発のスタート・ガイド」のSOAコンポジット・アプリケーションのデプロイに関する項を参照してください。 |
5 | SOAコンポジット・アプリケーションを管理およびテストします | 「SOAコンポジット・アプリケーションの開発のスタート・ガイド」のSOAコンポジット・アプリケーションの管理およびテストに関する項を参照してください。 |
前提条件
SOAコンポジット・アプリケーションでApache Kafkaアダプタを構成および追加するには、次の前提条件を満たす必要があります。
セキュリティ・ポリシー詳細の取得
Apache Kafkaアダプタに関する次のセキュリティ・ポリシー詳細を取得します。
- Simple Authentication and Security Layer (SASL) Plain over SSLまたはSASL Plainセキュリティ・ポリシーを使用している場合は、SASLのユーザー名とパスワードを確認します。
- SASL Plain over SSL、TLS、または相互TLSのポリシーを使用するには、必要な証明書を準備します。
以下の制限に注意してください。
- Apache Kafkaアダプタでは、Apache Kafkaシリアライザ/デシリアライザ(String/ByteArray)がサポートされています。Confluentや他のシリアライザ/デシリアライザはサポートされていません。
- SASL PLAIN over SSLセキュリティ・ポリシーのみがサポートされます。
- XML/JSONおよびAVROメッセージ構造がサポートされます。それ以外の構造/形式はサポートされていません。
- スキーマ・レジストリは、Apache Kafkaアダプタではサポートされていません。
Apache Kafkaアダプタの構成
Apache Kafkaアダプタを外部参照スイムレーンにドラッグすると、アダプタ・エンドポイント構成ウィザードが起動します。このウィザードで示される順序に従って、Apache Kafkaアダプタのエンドポイントのプロパティを構成します。
次の各項では、Apache KafkaアダプタをSOAコンポジット・アプリケーションの統合のトリガーおよび呼出しとして構成する手順をガイドするウィザード・ページについて説明します。
「基本情報」ページ
アプリケーションで各アダプタの「基本情報」ページに名前と説明を入力できます。
要素 | 説明 |
---|---|
エンドポイントにどのような名前を付けますか。 |
他の人にもこのアダプタの役割がわかるように、意味のある名前を指定します。名前には、英語のアルファベット文字、数字、アンダースコアおよびハイフンを使用できます。次の文字を含めることはできません:
|
このエンドポイントでは何が行われますか。 |
アダプタの役割の説明(オプション)を入力します。たとえば: このアダプタは、インバウンド・リクエストを受信してアカウント情報とクラウド・アプリケーションを同期化します。 |
「接続情報」ページ
Apache Kafkaアダプタの接続詳細を指定します。
要素 | 説明 |
---|---|
接続URL |
Apache KafkaインスタンスのURLを入力します。 |
セキュリティ・ポリシー |
環境に適したセキュリティ・ポリシー(USERNAME_PASSWORD_TOKENなど)を選択します。
|
認証キー |
CSF認証キーを選択します。
|
テスト | クリックして、認証キーを検証します。 |
「操作」ページ
実行する操作を選択します。
要素 | 説明 |
---|---|
Kafkaトピックに対してどのような操作を実行しますか。 |
|
「トピックおよびパーティション」ページ
操作および操作を実行するトピックを選択し、オプションでメッセージ構造を指定します。
呼出し接続としてのApache Kafkaアダプタの構成
要素 | 説明 |
---|---|
トピックの選択 | 操作を実行するトピックを選択します。トピックの最初の数文字を入力して、トピックの表示をフィルタすることもできます。トピックとは、アプリケーションがメッセージを追加、処理および再処理できるカテゴリです。トピック単位でメッセージにサブスクライブします。 |
パーティションの指定(このフィールドは、「レコードをKafkaトピックにパブリッシュする」または「Kafkaトピックからレコードを消費する」を選択した場合にのみ表示されます。) | 選択したトピックをプッシュする宛先のパーティションを指定します。Kafkaトピックは、データを複数のブローカ間で分割できるようにパーティションに分割されています。特定のパーティションを選択せずに「デフォルト」の選択を使用すると、Kafkaによって使用可能なすべてのパーティションが考慮され、使用するパーティションが決定されます。 |
グループの消費(このフィールドは、「Kafkaトピックからレコードを消費する」を選択した場合にのみ表示されます。) | アタッチするコンシューマ・グループを指定します。コンシューマは、同じグループIDを使用してグループに参加します。Kafkaによって、トピックのパーティションがグループ内のコンシューマに割り当てられます。 |
メッセージ消費のオプションの指定(このフィールドは、「Kafkaトピックからレコードを消費する」を選択した場合にのみ表示されます。) |
最初からメッセージを読み取るようにアダプタを構成し、再編集した場合は、次のオプションが表示されます:
ノート: 「最初から読み取る」または「最新のものを読み取る」オプションを選択した場合、1回の実行ですべてのメッセージが消費されることは保証されません。ただし、残ったメッセージは後続の実行で消費されます。 |
フェッチする最大レコード数(このフィールドは、「Kafkaトピックからレコードを消費する」または「オフセットを指定してKafkaトピックからレコードを消費する」を選択した場合にのみ表示されます。) |
読み取るメッセージ数を指定します。完全なメッセージ・ペイロードのしきい値は10MBです。 ノート: このフィールドは、フェッチするレコードの上限を指定します。指定した量のレコードが1回の実行でストリームから取得されることは保証されません。残ったメッセージは後続の実行でフェッチされます。 |
メッセージ構造を指定しますか。 | ウィザードの「メッセージ構造」ページで、使用するメッセージ構造を定義する場合は、「はい」を選択します。そうでない場合は、「いいえ」を選択します。 |
メッセージのヘッダーを指定しますか。 | ウィザードの「ヘッダー」ページで、使用するメッセージ・ヘッダーを定義する場合は、「はい」を選択します。そうでない場合は、「いいえ」を選択します。 |
拡張構成の確認と更新 | 「編集」をクリックして「拡張オプション」セクションを開き、トランザクション・プロデューサを有効または無効にします。
|
トリガー接続としてのApache Kafkaアダプタの構成
要素 | 説明 |
---|---|
トピックの選択 | 操作を実行するトピックを選択します。トピックの最初の数文字を入力して、トピックの表示をフィルタすることもできます。トピックとは、アプリケーションがメッセージを追加、処理および再処理できるカテゴリです。トピック単位でメッセージにサブスクライブします。 |
パーティションの指定 | 選択したトピックをプッシュする宛先のパーティションを指定します。Kafkaトピックは、データを複数のブローカ間で分割できるようにパーティションに分割されています。特定のパーティションを選択せずに「デフォルト」の選択を使用すると、Kafkaによって使用可能なすべてのパーティションが考慮され、使用するパーティションが決定されます。 |
コンシューマ・グループ | アタッチするコンシューマ・グループを指定します。コンシューマは、同じグループIDを使用してグループに参加します。Kafkaによって、トピックのパーティションがグループ内のコンシューマに割り当てられます。 |
ポーリング頻度(秒) |
レコードをフェッチする頻度を指定します。 |
フェッチする最大レコード数 |
読み取るメッセージ数を指定します。完全なメッセージ・ペイロードのしきい値は10MBです。 ノート: このフィールドは、フェッチするレコードの上限を指定します。指定した数のレコードが1回の実行でストリームから取得されることは保証されません。残ったメッセージは後続の実行でフェッチされます。 |
メッセージ構造を指定しますか。 | ウィザードの「メッセージ構造」ページで、使用するメッセージ構造を定義する場合は、「はい」を選択します。そうでない場合は、「いいえ」を選択します。 |
メッセージのヘッダーを指定しますか。 | ウィザードの「ヘッダー」ページで、使用するメッセージ・ヘッダーを定義する場合は、「はい」を選択します。そうでない場合は、「いいえ」を選択します。 |
拡張構成の確認と更新 | 「編集」をクリックして「拡張オプション」セクションを開き、トランザクション・プロデューサを有効または無効にします。
|
「メッセージ構造」ページ
使用するメッセージ構造を選択します。このページは、「トピックおよびパーティション」ページの「メッセージ構造を指定しますか。」フィールドで「はい」を選択した場合に表示されます。
要素 | 説明 |
---|---|
メッセージ構造をどのように指定しますか。 |
|
ファイルの選択 | 「参照」をクリックしてファイルを選択します。選択すると、「ファイル名」フィールドにファイル名が表示されます。 |
要素 | XSDまたはAvroファイルを指定した場合は、要素を選択します。 |
Apache Kafkaアダプタを使用した共通パターンの実装
Apache Kafkaアダプタを使用して、次の共通パターンを実装できます。
Apache Kafkaトピックへのメッセージの発行
FTPアダプタおよびステージ・ファイルの読取りアクションを使用してレコードを読み取り、Apache Kafkaアダプタの発行操作を使用してレコードをApache Kafkaトピックにパブリッシュするように、SOAコンポジット・アプリケーションを構成できます。
次の統合では、このパターンの実装方法の一例を示します:
- プロジェクトを含むSOAコンポジット・アプリケーション。
- SOAコンポジットとFTPアダプタの間でソースからターゲットへの適切なマッピングを実行するBPELプロセス。
- 入力ディレクトリからファイル(レコード)をフェッチしてダウンロード・ディレクトリに保存するFTPアダプタ。
- 次のように構成されたステージ・ファイル・アクション:
- ダウンロード・ディレクトリ内の各ファイル(レコード)に対してセグメント内のファイル読取り操作を実行する。
- 使用するメッセージの内容に構造(この例では、XMLスキーマ(XSD)ドキュメント)を指定する。
- ステージ・ファイル・アクションとApache Kafkaアダプタの間でソースからターゲットへの適切なマッピングを実行する。
- 次のように構成されたApache Kafkaアダプタ:
- レコードをKafkaトピックにパブリッシュする。
- 使用するメッセージ構造(この例では、XMLスキーマ(XSD)ドキュメント)およびメッセージに使用するヘッダーを指定する。
- Apache KafkaアダプタとFTPアダプタの間でソースからターゲットへの適切なマッピングを実行するBPELプロセス。
- 処理の完了時にダウンロード・ディレクトリからファイルを削除するFTPアダプタ。
完了後のSOAコンポジット・アプリケーションは次のようになります:
Apache Kafkaトピックからのメッセージの消費
Apache Kafkaアダプタを使用してApache Kafkaトピックからメッセージを消費するようにSOAコンポジット・アプリケーションを構成できます。
次の使用例では、このパターンの実装方法の一例を示します:
- プロジェクトを含むSOAコンポジット・アプリケーション。
- SOAコンポジットとFTPアダプタの間でソースからターゲットへの適切なマッピングを実行するBPELプロセス。
- 次のように構成されたApache Kafkaアダプタ:
- Kafkaトピックからレコードを消費する。
- アタッチするコンシューマ・グループを指定します。Kafkaによって、トピックのパーティションがグループ内のコンシューマに割り当てられます。
- メッセージ消費のオプションとして「最新のものを読み取る」を指定します。SOAコンポジットがデプロイされた時点から始まる最新のメッセージが読み取られます。
- 使用するメッセージ構造(この例では、XMLスキーマ(XSD)ドキュメント)およびメッセージに使用するヘッダーを指定する。
完了後の設定は次のようになります: