5 Transactional Event QueueとApache Kafkaの相互運用性
Oracle Transactional Event Queue (TEQ)により、イベントベースのアプリケーションが簡単に実装できます。さらに、ScalaとJavaで作成されたオープンソースのストリーム処理ソフトウェア・プラットフォームであるApache Kafkaと高度に統合されています。このプラットフォームは、LinkedInによって開発され、Apache Software Foundationに寄贈されたものです。Kafka APIを使用するアプリは、透過的にOracle TEQを操作できるようになります。さらに、Oracle TEQでは、TEQとKafkaの双方向の情報フローもサポートされるため、変更内容はほぼリアルタイムでTEQまたはKafkaですぐに利用できるようになります。
Apache Kafka Connectは、Kafkaとその他のシステムを統合するApache Kafkaに組み込まれているフレームワークです。Oracle TEQは、標準JMSパッケージおよび関連するJDBCのTransactionパッケージを提供することで、接続を確立してトランザクション・データ・フローを完了します。Oracle TEQでは、標準のKafka JMSコネクタを構成することで、相互運用性を確立し、2つのメッセージング・システム間のデータ・フローを完了します。
この章の内容は以下のとおりです。
設定と前提条件
Kafka Connectは、Java Naming and Directory Interface (JNDI)とJMS標準インタフェースを使用して、Oracle TEQ用のJMS ConnectionFactory
インスタンスを作成し、TEQとの間でメッセージをエンキューまたはデキューします。
前提条件を次に示します。
-
Kafka Broker: Confluent Platform 3.3.0以降、またはKafka 0.11.0以降
-
Connect: Confluent Platform 4.1.0以降、またはKafka 1.1.0以降
-
Java 1.8
-
Oracle TEQ JMS 1.1+クライアントのJar
Apache KafkaからOracle TEQへの接続(Confluent PlatformおよびCLIの例)
Apache KafkaからTEQへのメッセージ転送のステップは、次のとおりです。
-
Oracle Databaseを起動します
-
TEQを設定します
-
TEQユーザーを作成して、ユーザーに相応の権限を付与します。
CREATE USER <username> IDENTIFIED BY <password>; GRANT CONNECT, RESOURCE, AQ_ADMINISTRATOR_ROLE TO <username>; GRANT EXECUTE ON DBMS_AQ TO <username>; GRANT EXECUTE ON DBMS_AQADM TO <username>; -- alter table space privileges if needed
-
TEQを作成および起動します
BEGIN DBMS_AQADM.CREATE_SHARDED_QUEUE( queue_name => '<username>.<queuename>', multiple_consumers => FALSE, -- False: Queue True: Topic queue_payload_type => DBMS_AQADM.JMS_TYPE); DBMS_AQADM.START_QUEUE(queue_name => '<username>.<queuename>'); END; /
ノート:
multiple_consumers
:False
はキューを表し、True
はJMSのトピックを表します。
-
-
Kafka Connect Sinkコンポーネントをインストールします
# run from your Confluent Platform installation directory confluent-hub install confluentinc/kafka-connect-jms-sink:latest
-
TEQのJarをKafka JMS Sink Connectorにインポートします
次のjarをJMS Sink Connectorのプラグイン・フォルダ
(share/confluent-hub-components/confluentinc-kafka-connect-jms-sink/lib
)にコピーします。この作業は、Connect
ワーカー・ノードごとに実行する必要があります。また、ワーカーはTEQのjarを採用するために再起動する必要があります。-
aqapi.jar
: TEQ JMSライブラリjar -
ojdbc8.jar
: Oracle JDBC Connectionライブラリjar -
jta-1.1.jar
: JTA: トランザクション・マネージャと分散トランザクション・システムに関与する要素との間の標準Javaインタフェース
-
-
Confluent Platformを起動します
confluent local start
-
JMS Sink Connectorを構成します:
d jms-source.json
{ "name": "JmsSinkConnector", "config": { "connector.class": "io.confluent.connect.jms.JmsSinkConnector", "tasks.max": "1", "topics": "jms-messages", "java.naming.factory.initial": "oracle.jms.AQjmsInitialContextFactory", "java.naming.provider.url": <connection string>, "db_url": <connection string>, "java.naming.security.principal": <username>, "java.naming.security.credentials": <password>, "jndi.connection.factory": "javax.jms.XAQueueConnectionFactory", "jms.destination.type": "queue", "jms.destination.name": <queuename>, "key.converter":"org.apache.kafka.connect.storage.StringConverter", "value.converter":"org.apache.kafka.connect.storage.StringConverter", "confluent.topic.bootstrap.servers":"localhost:9092", "confluent.topic.replication.factor": "1" } }
-
JMS Sink Connectorをロードします
confluent local load jms -- -d jms-sink.json
-
Connectorステータスの事後チェックを実行します
-
Confluent Platform管理を使用する場合:
http://localhost:9021
のConfluent Platform管理に移動して、コネクタのステータスを確認します。 -
Confluent CLIを使用する場合
confluent local status jms
-
-
メッセージ転送をテストします
Kafkaトピックにランダムなメッセージを生成します。
seq 10 | confluent local produce jms-messages
TEQのエンキュー済メッセージを確認します。
SELECT * FROM GV$PERSISTENT_QUEUES; SELECT * FROM GV$AQ_SHARDED_SUBSCRIBER_STAT;
Oracle TEQからApache Kafkaへの接続(Confluent PlatformおよびCLIの例)
TEQからApache Kafkaへのメッセージ転送のステップは、次のとおりです。
-
Oracle Databaseを起動します
-
TEQを設定します
-
TEQユーザーを作成して、ユーザーに相応の権限を付与します。
CREATE USER <username> IDENTIFIED BY <password>; GRANT CONNECT, RESOURCE, AQ_ADMINISTRATOR_ROLE TO <username>; GRANT EXECUTE ON DBMS_AQ TO <username>; GRANT EXECUTE ON DBMS_AQADM TO <username>; -- alter table space privileges if needed
-
TEQを作成および起動します
BEGIN DBMS_AQADM.CREATE_SHARDED_QUEUE( queue_name => '<username>.<queuename>', multiple_consumers => FALSE, -- False: Queue True: Topic queue_payload_type => DBMS_AQADM.JMS_TYPE); DBMS_AQADM.START_QUEUE(queue_name => '<username>.<queuename>'); END; /
ノート:
multiple_consumers
:False
はキューを表し、True
はJMSのトピックを表します。
-
-
Kafka Connect Sourceコンポーネントをインストールします
confluent-hub install confluentinc/kafka-connect-jms:latest
-
TEQのJarをKafka JMS Source Connectorにインポートします
次のjarをJMS Source Connectorのプラグイン・フォルダ(
share/confluent-hub-components/confluentinc-kafka-connect-jms/lib
)にコピーします。-
aqapi.jar
: TEQ JMSライブラリjar -
ojdbc8.jar
: Oracle JDBC Connectionライブラリjar -
jta-1.1.jar
: JTA: トランザクション・マネージャと分散トランザクション・システムに関与する要素との間の標準Javaインタフェース
-
-
Confluent Platformを起動します
confluent local start
-
JMS Source Connector
jms-source.json
を構成します{ "name": " JmsSourceConnector", "config": { "connector.class": "io.confluent.connect.jms.JmsSourceConnector", "kafka.topic": "jms-messages", "jms.destination.name": <queuename>, "jms.destination.type": "queue", "java.naming.factory.initial": "oracle.jms.AQjmsInitialContextFactory", "java.naming.provider.url": <connection string>, "db_url": <connection string>, "java.naming.security.principal": <username>, "java.naming.security.credentials": <password>, "confluent.license": "", "confluent.topic.bootstrap.servers": "localhost:9092" } }
-
JMS Source Connectorをロードします
confluent local load jms -- -d jms-source.json
-
Connectorステータスの事後チェックを実行します
-
Confluent Platform管理を使用する場合:
http://localhost:9021
のConfluent Platform管理に移動して、コネクタのステータスを確認します。 -
Confluent CLIを使用する場合
confluent local status jms
-
-
メッセージ転送をテストします
前述のSink Connectorを使用してメッセージをTEQにエンキューし、Sink Connectorを一時停止してからSource Connectorを起動します。メッセージがTEQからデキューされ、Kafkaトピック内に生成されます。
メッセージ転送の監視
Sink/Sourceコネクタのメッセージ転送は、両側から監視できます。
-
Apache Kafka:
http://localhost:9021
のConfluent Platform管理に移動して、統計についてのプロデューサ/コンシューマ・コンソールを確認します。 -
Oracle TEQ: 「Oracle Transactional Event QueuesおよびAdvanced Queuingの監視」を参照し、TEQ Monitorシステムを起動してエンキュー率/デキュー率、TEQの深さ、および詳細なDB/システム・レベルの統計情報を確認してください。