Kafka StreamsデータをOracle Autonomous Databaseに接続
Oracle GoldenGate Stream Analyticsを使用すると、ユーザーはスケーラブルなデータ変換および分析パイプラインを構築して、俊敏なリアルタイムのビジネス・インサイトを得ることができます。
この項では、「開始する前に」セクションからOracle Autonomous Databaseがすでにインストールされていることを前提としています。ここでは、次の操作について学習します。
- GGSAコンソールにアクセスします。
- データを取り込むようにKafkaプロデューサを構成します。
- KafkaのGGSAをAutonomous Databaseに接続します。
GGSAコンソールへのアクセスの取得
- OCIコンソールの「コンピュート」で、「インスタンス」を選択します。GGSA Marketplaceインスタンスが稼働している必要があります。
- パブリックIPアドレスをコピーします。
- 秘密キーを使用してインスタンスにログインし、
/home/opc
の下のREADME.txt
を確認します。 - OSA UIパスワードをコピーします。
- ブラウザを開き、
https://<Public IP Address>/osa
と入力します。 - 「ユーザー名」に
osaadmin
と入力し、README.txt
からコピーしたパスワードを使用します。
データを取り込むためのKafkaプロデューサの構成
Kafkaストリームを作成するには、次のステップに従います。
- 「カタログ」ページで、「新規アイテムの作成」をクリックします。
- マウスを「接続」の上に置き、サブメニューから「Kafka」を選択します。
- 「タイプ・プロパティ」画面で、「名前」を入力し、「接続タイプ」として「Kafka」を選択します。この例では、GGSAインスタンスにインストールされているKafkaを使用します。
- 「接続の詳細」画面で、「Zookeepers」フィールドに
localhost:2181
と入力します。 - 「接続のテスト」をクリックします。「成功」というメッセージが表示されます。
- 「保存」をクリックします。
ノート:
ポート2181がイングレスで開いていることを確認します。
データを取り込むためにKafkaトピックを開始するには、次のステップに従います。
- GGSAインスタンスにSSHで接続し、
/u01/app/osa/utilities/kafka-utils
フォルダに移動します。 - 受信データとして
complex.json
を使用します。 - 次のコマンドを実行して、データ・フィードをKafkaトピックとしてループします:
複合Kafkaトピックは、データの生成を開始し、取込みの準備が整います。opc@ggsanew kafka-utils]$ ./loop-file.sh ./complex.json | ./sampler.sh 1 1 | ./kafka.sh feed complex
- 「カタログ」ページで、「新規アイテムの作成」をクリックし、Kafka接続を使用してストリームを作成します。
- マウスをStreamの上に置き、サブメニューからKafkaを選択します。
- 「タイプ・プロパティ」画面で、名前を入力し、「ストリーム・タイプ」として「Kafka」を選択します。
- 「次へ」をクリックします。
- 「ソースの詳細」画面で、「接続」で作成したKafka接続を選択します。
- 「トピック名」で「複合」を選択します。
- 「データ書式」で
JSON
を選択します。
- 「次へ」をクリックします。
- 「データ・フォーマット」画面で、デフォルト値のままにします。
- 「次へ」をクリックします。
- 「シェイプ」画面で、受信した
JSON
シェイプがStreamから推測されます。 - 「保存」をクリックします。Kafkaストリームは正常に作成されました。
KafkaのGGSAをAutonomous Databaseに接続します
GGSAからOracle Autonomous Databaseへの接続を作成するには、次のステップに従います。
- 「カタログ」ページで、「新規アイテムの作成」をクリックします。
- マウスを「接続」の上に置き、サブメニューから「Oracle Database」を選択します。
- 「タイプ・プロパティ」画面で、「名前」を入力し、「接続タイプ」で「Oracle Database」を選択します。
- 「次へ」をクリックします。
- 「接続の詳細」、「タイプ: Oracle Database」画面で、Autonomous Database接続の詳細を入力します。
- 「接続方法」で「Wallet」を選択し、ウォレット・ファイルをアップロードします。
- ドロップダウン・リストから「サービス名/SID」を選択します。
- 「ユーザー名」にadminと入力します。
- データベース管理パスワードを入力します。
- 「保存」をクリックします。Autonomous Database接続は正常に作成されます。
- Autonomous Databaseのスキーマにログインし、データを受信する表を作成します。
CREATE TABLE COMPLEX ( BOOLEANFIELD VARCHAR2(20) , NUMBERFIELD NUMBER , STRINGFIELD VARCHAR2(20) , OBJECTFIELD_A_KEY NUMBER , OBJECTFIELD_A_VALUE NUMBER , OBJECTFIELD_C VARCHAR2(20) , OBJECTFIELD_E VARCHAR2(20) , ARRAYFIELD_0 NUMBER , ARRAYFIELD_1 NUMBER );
次のステップに従って、GGSAにパイプラインを作成し、ソースとターゲットを設定します:
- 「カタログ」ページで、「新規アイテムの作成」をクリックし、ドロップダウン・リストから「パイプライン」を選択します。
- マウスを「ターゲット」の上に置き、サブメニューから「データベース表」を選択します。
- 「タイプ・プロパティ」画面で、「ターゲット」に「名前」を入力し、「データベース表」として「ターゲット・タイプ」を選択します。
- 「次へ」をクリックします。
- 「ターゲットの詳細」画面で、以前に作成したAutonomous Database表をドロップダウン・リストから選択します。
- 「次へ」をクリックします。
- 「形状」画面で、「表名」をドロップダウン・リストから「複合」として選択します。
- 「次へ」をクリックします。
- 「シェイプ」を推測し、「保存」をクリックします。
次のステップを実行して、ターゲットを設定します。
- 「カタログ」ページで、「新規アイテムの作成」をクリックし、ドロップダウン・リストから「パイプライン」を選択します。
- マウスを「ターゲット」の上に置き、サブメニューから「Kafkaストリーム」を選択します。
- 「保存」をクリックします。
- パイプラインで、ストリームを右クリックし、「ステージの追加」、「ターゲット」の順に選択します。
- 「ターゲット・ステージの作成」ウィンドウで、名前を入力して「保存」をクリックします。
- 以前に作成した「ターゲット」表を選択します。
- 「公開」をクリックしてパイプラインを公開し、ターゲット表のデータを使用可能にします。
-
データベースにログインして、データがAutonomous Databaseの複合表にロードされていることを確認します。