Kafkaプラットフォームからのデータの取得
OCI GoldenGateを使用して、Kafkaプラットフォーム・ストリーミング・ソースからメッセージを抽出します。
概要
OCI GoldenGateを使用して、次のストリーミング・ソースからメッセージを取得できます。
-
Apache Kafka
-
OCIストリーミング
-
Confluent KafkaとConfluent Schema Registryの有無
-
Azure Event Hubs
-
Amazon MSK
-
Microsoft Fabric Eventstream
OCI GoldenGateは、1つ以上のKafkaトピックからメッセージを読み取り、GoldenGate証跡ファイルに書き込まれる論理変更レコードにデータを変換します。GoldenGate Replicatプロセスは、生成証跡ファイルを使用してデータを伝播し、RDBMS実装をサポートできます。
タスク1: コンシューマ・プロパティの構成
-
次のデシリアライザまたはコンバータのいずれかを使用して、Kafkaコンシューマ・プロパティ・ファイルを作成します。ソースがConfluent Kafka with Confluent Schema Registryのトピックである場合、Avroコンバータを使用できます。他のソースの場合は、必要に応じてJSONコンバータまたはデシリアライザを使用します:
-
JSONデシリアライザのKafkaコンシューマ・プロパティ:
key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer -
JSONコンバータのKafkaコンシューマ・プロパティ:
key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter -
AvroコンバータのKafkaコンシューマ・プロパティ:
key.converter=io.confluent.connect.avro.AvroConverter value.converter=io.confluent.connect.avro.AvroConverter
-
-
プロパティ・ファイルを保存し、その場所をメモします。
タスク2: OCI GoldenGateリソースの作成
このタスクでは、リソースがまだ存在しない場合に新しいリソースを作成する方法について説明します。使用しているビッグ・データ・デプロイメントが使用可能な最新バージョンにアップグレードされていることを確認します。
-
接続を作成します。
ノート:次の接続を作成する場合は、必ず「拡張オプションの表示」を選択し、コンシューマ・プロパティ・ファイルをアップロードしてください。
-
Apache KafkaまたはAmazon MSKの場合は、Kafka接続を作成します。
-
Confluent Kafkaの場合は、Confluent Kafka接続を作成します。
-
Confluentスキーマ・レジストリの場合は、Confluentスキーマ・レジストリ接続を作成します。
-
Azure Event Hubsの場合は、Azure Event Hubs接続を作成します。
-
OCIストリーミングの場合、OCIストリーミング接続を作成します。
-
Microsoft Fabric Eventstreamの場合は、Microsoft Fabric Eventstream接続を作成します
-
タスク3: 資格証明の作成
Extractを作成する前に、まず資格証明を作成する必要があります。
資格証明を作成するには:
-
「デプロイメント」ページで、ビッグ・データ・デプロイメントを選択します。
-
デプロイメントの詳細ページで、「コンソールの起動」を選択します。
-
タスク2のステップ1でデプロイメントを作成したときに指定したユーザー名とパスワードを使用して、ビッグ・データ・デプロイメントにログインします。
-
ナビゲーション・メニューから、「DB接続」を選択します。
-
「構成」ページの「データベース」タブで、「DB接続の追加」(プラス・アイコン)を選択し、次のようにフォームに入力します。
-
「資格証明ドメイン」に、
OracleGoldenGateと入力します。 -
「資格証明別名」に、
kafkaと入力します。 -
「ユーザーID」に、
kafka://と入力します -
「パスワード」および「パスワードの確認」に、パスワードを入力します。
-
「送信」を選択します。
-
タスク4: Extractの作成
-
ホーム・ページで、「Extractの追加」(プラス・アイコン)を選択します。
-
次の値を含むExtractの追加:
-
「Extract情報」ページで、次のようにフィールドに入力し、「次へ」を選択します。
-
「抽出タイプ」で、「統合抽出」を選択します。
-
「プロセス名」に、抽出の名前を入力します。
-
-
「Extractオプション」ページで、次のようにフィールドに入力します。「次」を選択します。
-
「ドメイン」で、ドメインを選択します。
-
「別名」で、デプロイメントに割り当てられている接続を選択します。
-
「名前」に、2文字の名前を入力します。
-
-
「管理対象オプション」ページで、フィールドをそのままにして、「次へ」を選択します。
-
「パラメータ・ファイル」ページで、次の手順を実行します。
-
指定されたブートストラップ・サーバー内のすべてのトピックをリスニングするには、表マッピングを
TABLE TESTSCHEMA.*;のままにします。また、表マッピングをTABLE TESTSCHEMA.<topic-name>;として設定して、指定したトピックから取得することもできます。 -
SOURCEDB USERIDALIASをSOURCEDB USERIDALIAS kafka DOMAIN OracleGoldenGateに更新します。
-
-
「作成および実行」を選択します。
-
Oracle GoldenGateのホーム・ページに戻り、Extractプロセスの開始およびイベント・メッセージを確認できます。