5 Transactional Event QueueとApache Kafkaの相互運用性

Oracle Transactional Event Queue (TEQ)により、イベントベースのアプリケーションが簡単に実装できます。さらに、ScalaとJavaで作成されたオープンソースのストリーム処理ソフトウェア・プラットフォームであるApache Kafkaと高度に統合されています。このプラットフォームは、LinkedInによって開発され、Apache Software Foundationに寄贈されたものです。Kafka APIを使用するアプリは、透過的にOracle TEQを操作できるようになります。さらに、Oracle TEQでは、TEQとKafkaの双方向の情報フローもサポートされるため、変更内容はほぼリアルタイムでTEQまたはKafkaですぐに利用できるようになります。

Apache Kafka Connectは、Kafkaとその他のシステムを統合するApache Kafkaに組み込まれているフレームワークです。Oracle TEQは、標準JMSパッケージおよび関連するJDBCのTransactionパッケージを提供することで、接続を確立してトランザクション・データ・フローを完了します。Oracle TEQでは、標準のKafka JMSコネクタを構成することで、相互運用性を確立し、2つのメッセージング・システム間のデータ・フローを完了します。

この章の内容は以下のとおりです。

設定と前提条件

Kafka Connectは、Java Naming and Directory Interface (JNDI)とJMS標準インタフェースを使用して、Oracle TEQ用のJMS ConnectionFactoryインスタンスを作成し、TEQとの間でメッセージをエンキューまたはデキューします。

前提条件を次に示します。

  • Kafka Broker: Confluent Platform 3.3.0以降、またはKafka 0.11.0以降

  • Connect: Confluent Platform 4.1.0以降、またはKafka 1.1.0以降

  • Java 1.8

  • Oracle TEQ JMS 1.1+クライアントのJar

Apache KafkaからOracle TEQへの接続(Confluent PlatformおよびCLIの例)

Apache KafkaからTEQへのメッセージ転送のステップは、次のとおりです。

  1. Oracle Databaseを起動します

  2. TEQを設定します

    1. TEQユーザーを作成して、ユーザーに相応の権限を付与します。

      CREATE USER <username> IDENTIFIED BY <password>;
      GRANT CONNECT, RESOURCE, AQ_ADMINISTRATOR_ROLE TO <username>;
      GRANT EXECUTE ON DBMS_AQ TO <username>;
      GRANT EXECUTE ON DBMS_AQADM TO <username>;
      -- alter table space privileges if needed
    2. TEQを作成および起動します

      BEGIN
      	DBMS_AQADM.CREATE_SHARDED_QUEUE(
      		queue_name => '<username>.<queuename>',
      		multiple_consumers => FALSE, -- False: Queue True: Topic 
      		queue_payload_type => DBMS_AQADM.JMS_TYPE);
      
      	DBMS_AQADM.START_QUEUE(queue_name => '<username>.<queuename>');
      END;
      /

      ノート:

      multiple_consumers: Falseはキューを表し、TrueはJMSのトピックを表します。

  3. Kafka Connect Sinkコンポーネントをインストールします

    # run from your Confluent Platform installation directory
    confluent-hub install confluentinc/kafka-connect-jms-sink:latest
  4. TEQのJarをKafka JMS Sink Connectorにインポートします

    次のjarをJMS Sink Connectorのプラグイン・フォルダ(share/confluent-hub-components/confluentinc-kafka-connect-jms-sink/lib)にコピーします。この作業は、Connectワーカー・ノードごとに実行する必要があります。また、ワーカーはTEQのjarを採用するために再起動する必要があります。

    • aqapi.jar : TEQ JMSライブラリjar

    • ojdbc8.jar : Oracle JDBC Connectionライブラリjar

    • jta-1.1.jar : JTA: トランザクション・マネージャと分散トランザクション・システムに関与する要素との間の標準Javaインタフェース

  5. Confluent Platformを起動します

    confluent local start
  6. JMS Sink Connectorを構成します: d jms-source.json

    {
      "name": "JmsSinkConnector",
      "config": {
        "connector.class": "io.confluent.connect.jms.JmsSinkConnector",
        "tasks.max": "1",
        "topics": "jms-messages",
        "java.naming.factory.initial": "oracle.jms.AQjmsInitialContextFactory",
        "java.naming.provider.url": <connection string>,
        "db_url": <connection string>,
        "java.naming.security.principal": <username>,
        "java.naming.security.credentials": <password>,
        "jndi.connection.factory": "javax.jms.XAQueueConnectionFactory",
        "jms.destination.type": "queue",
        "jms.destination.name": <queuename>,
        "key.converter":"org.apache.kafka.connect.storage.StringConverter",
        "value.converter":"org.apache.kafka.connect.storage.StringConverter",
        "confluent.topic.bootstrap.servers":"localhost:9092",
        "confluent.topic.replication.factor": "1"
      }
    }
  7. JMS Sink Connectorをロードします

    confluent local load jms -- -d jms-sink.json
  8. Connectorステータスの事後チェックを実行します

    1. Confluent Platform管理を使用する場合: http://localhost:9021のConfluent Platform管理に移動して、コネクタのステータスを確認します。

    2. Confluent CLIを使用する場合

      confluent local status jms
  9. メッセージ転送をテストします

    Kafkaトピックにランダムなメッセージを生成します。

    seq 10 | confluent local produce jms-messages

    TEQのエンキュー済メッセージを確認します。

    SELECT * FROM GV$PERSISTENT_QUEUES;
    SELECT * FROM GV$AQ_SHARDED_SUBSCRIBER_STAT;

Oracle TEQからApache Kafkaへの接続(Confluent PlatformおよびCLIの例)

TEQからApache Kafkaへのメッセージ転送のステップは、次のとおりです。

  1. Oracle Databaseを起動します

  2. TEQを設定します

    1. TEQユーザーを作成して、ユーザーに相応の権限を付与します。

      CREATE USER <username> IDENTIFIED BY <password>;
      GRANT CONNECT, RESOURCE, AQ_ADMINISTRATOR_ROLE TO <username>;
      GRANT EXECUTE ON DBMS_AQ TO <username>;
      GRANT EXECUTE ON DBMS_AQADM TO <username>;
      -- alter table space privileges if needed
    2. TEQを作成および起動します

      BEGIN
      	DBMS_AQADM.CREATE_SHARDED_QUEUE(
      		queue_name => '<username>.<queuename>',
      		multiple_consumers => FALSE, -- False: Queue True: Topic 
      		queue_payload_type => DBMS_AQADM.JMS_TYPE);
      
      	DBMS_AQADM.START_QUEUE(queue_name => '<username>.<queuename>');
      END;
      /

      ノート:

      multiple_consumers: Falseはキューを表し、TrueはJMSのトピックを表します。

  3. Kafka Connect Sourceコンポーネントをインストールします

    confluent-hub install confluentinc/kafka-connect-jms:latest
  4. TEQのJarをKafka JMS Source Connectorにインポートします

    次のjarをJMS Source Connectorのプラグイン・フォルダ(share/confluent-hub-components/confluentinc-kafka-connect-jms/lib)にコピーします。

    • aqapi.jar : TEQ JMSライブラリjar

    • ojdbc8.jar : Oracle JDBC Connectionライブラリjar

    • jta-1.1.jar : JTA: トランザクション・マネージャと分散トランザクション・システムに関与する要素との間の標準Javaインタフェース

  5. Confluent Platformを起動します

    confluent local start
  6. JMS Source Connector jms-source.jsonを構成します

    {
      "name": " JmsSourceConnector",
      "config": {
        "connector.class": "io.confluent.connect.jms.JmsSourceConnector",
        "kafka.topic": "jms-messages",
        "jms.destination.name": <queuename>,
        "jms.destination.type": "queue",
        "java.naming.factory.initial": "oracle.jms.AQjmsInitialContextFactory",
        "java.naming.provider.url":  <connection string>,
        "db_url": <connection string>,
        "java.naming.security.principal": <username>,
        "java.naming.security.credentials": <password>,
        "confluent.license": "",
        "confluent.topic.bootstrap.servers": "localhost:9092"
      }
    }
  7. JMS Source Connectorをロードします

    confluent local load jms -- -d jms-source.json
  8. Connectorステータスの事後チェックを実行します

    1. Confluent Platform管理を使用する場合: http://localhost:9021のConfluent Platform管理に移動して、コネクタのステータスを確認します。

    2. Confluent CLIを使用する場合

      confluent local status jms
  9. メッセージ転送をテストします

    前述のSink Connectorを使用してメッセージをTEQにエンキューし、Sink Connectorを一時停止してからSource Connectorを起動します。メッセージがTEQからデキューされ、Kafkaトピック内に生成されます。

メッセージ転送の監視

Sink/Sourceコネクタのメッセージ転送は、両側から監視できます。

  • Apache Kafka: http://localhost:9021のConfluent Platform管理に移動して、統計についてのプロデューサ/コンシューマ・コンソールを確認します。

  • Oracle TEQ: 「Oracle Transactional Event QueuesおよびAdvanced Queuingの監視」を参照し、TEQ Monitorシステムを起動してエンキュー率/デキュー率、TEQの深さ、および詳細なDB/システム・レベルの統計情報を確認してください。