Kafka StreamsデータをOracle Autonomous Databaseに接続
コンピュート・インスタンスを作成し、Kafkaをインストールしてプロデューサとしてメッセージを送信するか、OCIコンソールで次のステップを使用できます。
- OCIコンソールにログインし、Marketplaceに移動します。
- 「すべてのアプリケーション」で、GoldenGateストリーム分析を検索します。
- 「スタックの起動」をクリックします。
- VCNの詳細を指定し、後で秘密キーを使用してGoldenGateストリーム分析インスタンスにログインするためのSSH公開キーを追加します。
ノート:
このGoldenGateストリーム分析インスタンスには、組込みのサンプルKafkaストリーム(プロデューサ)がインストールされています。独自のKafkaストリームがある場合は、これも使用できます。
Autonomous Databaseの作成
- 左上のメニューから「Oracle Database」をクリックし、「Autonomous Database」をクリックします。
- Autonomous Databasesホームページで、「Autonomous Databaseの作成」をクリックします。
- データベースを作成するコンパートメントを選択します。
- 表示名: Autonomous Databaseを簡単に識別できるようにするユーザーフレンドリな説明またはその他の情報を入力します。
- データベース名: 新しいAutonomous Databaseの名前を指定します。
- 「ワークロード・タイプ」に「データ・ウェアハウス」を選択します。
- 「サーバーレス」として「デプロイメント・タイプ」を選択します。
- 「データベース・バージョン」に19c、ECPUに2、ストレージ(TB)に1を選択します。
- 管理者資格証明を作成し、パスワードを入力します。パスワードは、Oracle Cloudセキュリティ標準に基づく強力なパスワード複雑性基準を満たしている必要があります。パスワード複雑性ルールの詳細は、データベース・ユーザーの作成を参照してください。
- 「すべての場所からのセキュア・アクセス」としてネットワーク・アクセスを選択します。
- 残りのパラメータをデフォルトのままにして、「Autonomous Databaseの作成」をクリックします。
新しいAutonomous Databaseが使用可能になるまで、画面に「プロビジョニング中」が表示されます。
Kafkaメッセージを格納する表の作成
- SQLDeveloperを開き、「+」アイコンをクリックして「Oracleデータベース接続の作成」をクリックします。
- 接続名を入力し、データベースのユーザー名とパスワードを指定します。
- 「接続タイプ」に「Cloud Wallet」を選択し、
wallet.zip
ファイルを参照して「接続」をクリックします。接続は正常に作成されます。 - 次のスクリプトを使用して、サンプル表を作成します。
CREATE TABLE "TEST"."SAMPLE" ( "FRUIT" VARCHAR2(50 BYTE)", "TOTAL" VARCHAR2(50 BYTE)", "COLOR" VARCHAR2(50 BYTE)" )
Oracle Integration 3インスタンスの作成
- OCIコンソールで「開発者サービス」をクリックし、「アプリケーション統合」で「統合」をクリックします。
- 統合インスタンスのホームページで、「インスタンスの作成」をクリックします。
- 「名前」を入力し、「バージョン」として「Oracle Integration 3」、「エディション」として「エンタープライズ」、「シェイプ」として「本番」、「ライセンス・タイプ」として「新規Oracle Integrationインスタンスのサブスクライブ」を選択します。
- 「Create」をクリックします。
インスタンスが作成されたら、サービス・コンソールをクリックすると、新しいウィンドウが表示されます。
Oracle Integration 3でのOracle Autonomous Data Warehouse接続の作成
- Oracle Integrationインスタンスで、ハンバーガー・メニューをクリックし、「デザイン」で「接続」を選択します。
- 「作成」をクリックし、Oracle ADWを検索して選択します。
- この接続に関する「名前」、「識別子」、「ロール」に「トリガーと呼出し」、「キーワード」および「説明」を入力します。
- 「プロパティ」で、「オプションのプロパティ」を展開し、「サービス名」を入力します。
- 「セキュリティ」で、「JDBC Over SSL」を選択し、
Wallet.zip
ファイルをアップロードします。 - データベースの作成時に指定されたWalletパスワードを入力します。
- データベース・サービス・ユーザー名をadminとして入力します。
- 管理ユーザーのデータベース・サービス・パスワードを入力します。
- 「アクセス・タイプ」に「パブリック・ゲートウェイ」を選択します。
- 右上の「テスト」をクリックします。
- 正常なメッセージが表示されたら、「保存」をクリックします。
Kafkaインスタンスでのエージェント・グループの関連付け
Kafka Streamsがインストールされているインスタンスで実行する必要があるエージェントが必要です。
- Oracle Integration 3インスタンスで、ハンバーガー・メニューをクリックし、「デザイン」で「エージェント」を選択します。
- 「エージェント」ページで、「作成」をクリックします。
- 「名前」、「識別子」、「説明」を入力して、「作成」をクリックします。
- エージェントが作成されたら、「ダウンロード」をクリックし、「接続エージェント」をクリックします。
- ダウンロードに成功したら、
oic_conn_agent_installer.zip
をKafkaトピックが実行されているKafkaインスタンスにコピーし、メッセージを生成します。このソリューション・プレイブックの例では、Oracle GoldenGate Stream Analyticsインスタンスです。 oic_conn_agent_installer.zip
を解凍します。- 「ステータス」列で、「...」アイコンをクリックし、「構成のダウンロード」をクリックします。
InstallerProfile.cfg
ファイルをコピーし、このファイルをKafkaインスタンスにコピーします。- Kafkaインスタンスの
InstallerProfile.cfg
ファイルを置き換えます。 InstallerProfile.cfg
を置換した後、次の文を実行して、このコマンドを使用してOracle GoldenGate Stream AnalyticsインスタンスまたはKafkaオンプレミス・インスタンスでエージェントを起動します:$ java -jar connectivityagent.jar
エージェントは正常に起動し、実行したままにして、停止しないでください。詳細は、Oracle Integration Generation 2での統合の使用を参照してください
Oracle Integration 3でのKafka接続の作成
- Oracle Integration 3インスタンスで、ハンバーガー・メニューをクリックし、「デザイン」で「接続」を選択します。
- 「作成」をクリックし、「Apache Kafka」を検索して選択します。
- この接続に関する「名前」、「識別子」、「ロール」に「トリガーと呼出し」、「キーワード」および「説明」を入力します。
- 「プロパティ」で、ブートストラップ・サーバー: instancename:9092を入力します。
ノート:
Kafkaインスタンスによって使用されているVCNの「セキュリティ・リスト」で、トラフィックを許可するためにポート9092を追加する必要があります。 - 「セキュリティ」で「セキュリティ・ポリシーなし」を選択します。
- 「アクセス・タイプ」に「接続性エージェント」を選択し、「エージェント・グループの関連付け」をクリックします。
- エージェントを選択して「使用」をクリックします。
- 「テスト」をクリックします。成功するメッセージが表示されます。
- 「保存」をクリックします。
Oracle Integration 3でのKafkaとOracle Autonomous Data Warehouseの統合の作成
- Oracle Integration 3インスタンスで、ハンバーガー・メニューをクリックし、「設計」で「統合」を選択します。
- 「作成」をクリックし、「統合の作成」ダイアログ・ボックスで「アプリケーション」を選択します。
- 統合名にKafkaToADWを入力し、「作成」をクリックします。
ノート:
Kafka用に作成した2つの接続と、トリガーとしてOracle Autonomous Data Warehouseがあります。 - Kafkaを選択すると、トリガーの名前を入力するように求められ、「メッセージ・タイプ」に「コンシューマ」を選択し、「続行」をクリックします。
- ドロップダウンからKafkaトピックを選択し、コンシューマ名を指定し、フィールドをデフォルトのままにして「続行」をクリックします。
- 「メッセージ構造」に「サンプルJSONドキュメント」を選択し、同じJSONファイルをドラッグ・アンド・ドロップします。
- 「サマリー」ウィンドウで、すべての詳細を確認し、「終了」をクリックします。
- 作成された「Trigger」の下に下矢印が表示されます。
- 「+」アイコンをクリックし、「起動」でADWを選択すると、「マップ」という名前の追加ボックスが表示されます。
- 「起動」にマウスを移動し、「...」をクリックして「編集」を選択します。
- 名前を指定し、「実行する操作」フィールドの「表に対する操作の実行」を選択します。「挿入」を選択し、「続行」をクリックします。
- 表を選択して「続行」をクリックし、「サマリー」ページで「終了」をクリックします。
- 次に、「マップ」ボックスにマウスを移動し、「...」をクリックして「編集」を選択します。
- 「マッピング」ページで、「ソース」および「ターゲット」フィールドに接続します。
- 「検証」をクリックすると、正常に検証されます。
- 「統合」ページで「保存」をクリックすると、統合が正常に構成されます。
- 統合ホームページで、「ステータス」フィールドにマウスを移動し、「電源」アイコンをクリックしてアクティブ化します。
- 「統合のアクティブ化」という名前のウィンドウが表示され、「本番」を選択して「アクティブ化」をクリックします。これにより、Kafkaメッセージを消費するために統合が正常にアクティブ化されます。
- Kafkaトピックを起動すると、データベースに格納されるメッセージが表示されます。SQLDeveloperを開き、表を開き、「データ」タブをクリックしてメッセージを表示します。
- 独自のKafkaストリームがある場合は、同じ形式でメッセージの作成を開始できます。
- Oracle GoldenGate Stream Analyticsを使用している場合は、Oracle GoldenGate Stream AnalyticsインスタンスにSSH接続し、
/u01/app/osa/utilities/kafka-utils
フォルダに移動します。 - 受信データとして
sample.json
を使用できます。次のjson形式を使用します。{"fruit": "Apple","total": "Large","color": "Red"}
- 次のコマンドを実行して、Kafkaトピックとしてデータ・フィードをループします:
opc@ggsanew kafka-utils]$ ./loop-file.sh ./sample.json | ./sampler.sh 1 1 | ./kafka.sh feed complex