5 Interoperability of Transactional Event Queue with Apache Kafka

Oracle Transactional Event Queue (TEQ) makes it easy to implement event-based applications. It is also highly integrated with Apache Kafka, an open-source stream-processing software platform developed by LinkedIn and donated to the Apache Software Foundation, written in Scala and Java. Apart from enabling apps that use Kafka APIs to transparently operate on Oracle TEQ, Oracle TEQ also supports bi-directional information flow between TEQ and Kafka, so that changes are available in TEQ or Kafka as soon as possible in near-real-time.

Apache Kafka Connect is a framework included in Apache Kafka that integrates Kafka with other systems. Oracle TEQ will provide standard JMS package and related JDBC, Transaction packages to establish the connection and complete the transactional data flow. Oracle TEQ configures standard Kafka JMS connectors to establish interoperability and complete the data flow between the two mesaging systems.

This chapter contains the following topics.

Setup and Prerequisites

The Kafka Connect uses Java Naming and Directory Interface (JNDI) and JMS standard interface to create an JMS ConnectionFactory instance for the Oracle TEQ and then enqueue or dequeue messages to/from TEQ correspondingly.

The prerequisites are as follows:

  • Kafka Broker: Confluent Platform 3.3.0 or above, or Kafka 0.11.0 or above

  • Connect: Confluent Platform 4.1.0 or above, or Kafka 1.1.0 or above

  • Java 1.8

  • Oracle TEQ JMS 1.1+ Client Jars

Connecting from Apache Kafka to Oracle TEQ (Confluent Platform and CLI Example)

Steps for message transfer from Apache Kafka to TEQ are as follows.

  1. Start Oracle Database

  2. Setup TEQ

    1. Create TEQ user and Grant User Corresponding Privileges.

      CREATE USER <username> IDENTIFIED BY <password>;
      GRANT CONNECT, RESOURCE, AQ_ADMINISTRATOR_ROLE TO <username>;
      GRANT EXECUTE ON DBMS_AQ TO <username>;
      GRANT EXECUTE ON DBMS_AQADM TO <username>;
      -- alter table space privileges if needed
    2. Create TEQ and start

      BEGIN
      	DBMS_AQADM.CREATE_SHARDED_QUEUE(
      		queue_name => '<username>.<queuename>',
      		multiple_consumers => FALSE, -- False: Queue True: Topic 
      		queue_payload_type => DBMS_AQADM.JMS_TYPE);
      
      	DBMS_AQADM.START_QUEUE(queue_name => '<username>.<queuename>');
      END;
      /

      Note:

      multiple_consumers: False means Queue, True means Topic in JMS.

  3. Install Kafka Connect Sink Component

    # run from your Confluent Platform installation directory
    confluent-hub install confluentinc/kafka-connect-jms-sink:latest
  4. Import TEQ Jars into Kafka JMS Sink Connector

    Copy the following jars into the JMS Sink Connector's plugin folder (share/confluent-hub-components/confluentinc-kafka-connect-jms-sink/lib). This needs to be done on every Connect worker node and the workers must be restarted to pick up the TEQ jars.

    • aqapi.jar : TEQ JMS library jar

    • ojdbc8.jar : Oracle JDBC Connection library jar

    • jta-1.1.jar : JTA: standard Java interfaces between a transaction manager and the parties involved in a distributed transaction system

  5. Start Confluent Platform

    confluent local start
  6. Configure JMS Sink Connector: d jms-source.json

    {
      "name": "JmsSinkConnector",
      "config": {
        "connector.class": "io.confluent.connect.jms.JmsSinkConnector",
        "tasks.max": "1",
        "topics": "jms-messages",
        "java.naming.factory.initial": "oracle.jms.AQjmsInitialContextFactory",
        "java.naming.provider.url": <connection string>,
        "db_url": <connection string>,
        "java.naming.security.principal": <username>,
        "java.naming.security.credentials": <password>,
        "jndi.connection.factory": "javax.jms.XAQueueConnectionFactory",
        "jms.destination.type": "queue",
        "jms.destination.name": <queuename>,
        "key.converter":"org.apache.kafka.connect.storage.StringConverter",
        "value.converter":"org.apache.kafka.connect.storage.StringConverter",
        "confluent.topic.bootstrap.servers":"localhost:9092",
        "confluent.topic.replication.factor": "1"
      }
    }
  7. Load the JMS Sink Connector

    confluent local load jms -- -d jms-sink.json
  8. Post-Check Connector Status

    1. Using Confluent Platform Admin: Direct to http://localhost:9021, confluent platform admin, see connector status.

    2. Using Confluent CLI

      confluent local status jms
  9. Test Message Transfer

    Produce random messages into Kafka topic.

    seq 10 | confluent local produce jms-messages

    Check TEQ enqueued messages.

    SELECT * FROM GV$PERSISTENT_QUEUES;
    SELECT * FROM GV$AQ_SHARDED_SUBSCRIBER_STAT;

Connecting from Oracle TEQ to Apache Kafka (Confluent Platform and CLI Example)

Steps for message transfer from TEQ to Apache Kafka are as follows.

  1. Start Oracle Database

  2. Setup TEQ

    1. Create TEQ user and Grant User Corresponding Privileges.

      CREATE USER <username> IDENTIFIED BY <password>;
      GRANT CONNECT, RESOURCE, AQ_ADMINISTRATOR_ROLE TO <username>;
      GRANT EXECUTE ON DBMS_AQ TO <username>;
      GRANT EXECUTE ON DBMS_AQADM TO <username>;
      -- alter table space privileges if needed
    2. Create TEQ and start

      BEGIN
      	DBMS_AQADM.CREATE_SHARDED_QUEUE(
      		queue_name => '<username>.<queuename>',
      		multiple_consumers => FALSE, -- False: Queue True: Topic 
      		queue_payload_type => DBMS_AQADM.JMS_TYPE);
      
      	DBMS_AQADM.START_QUEUE(queue_name => '<username>.<queuename>');
      END;
      /

      Note:

      multiple_consumers: False means Queue, True means Topic in JMS.

  3. Install Kafka Connect Source Component

    confluent-hub install confluentinc/kafka-connect-jms:latest
  4. Import TEQ Jars into Kafka JMS Source Connector

    Copy the following jars into the JMS Source Connector's plugin folder (share/confluent-hub-components/confluentinc-kafka-connect-jms/lib).

    • aqapi.jar : TEQ JMS library jar

    • ojdbc8.jar : Oracle JDBC Connection library jar

    • jta-1.1.jar : JTA: standard Java interfaces between a transaction manager and the parties involved in a distributed transaction system

  5. Start Confluent Platform

    confluent local start
  6. Configure JMS Source Connector jms-source.json

    {
      "name": " JmsSourceConnector",
      "config": {
        "connector.class": "io.confluent.connect.jms.JmsSourceConnector",
        "kafka.topic": "jms-messages",
        "jms.destination.name": <queuename>,
        "jms.destination.type": "queue",
        "java.naming.factory.initial": "oracle.jms.AQjmsInitialContextFactory",
        "java.naming.provider.url":  <connection string>,
        "db_url": <connection string>,
        "java.naming.security.principal": <username>,
        "java.naming.security.credentials": <password>,
        "confluent.license": "",
        "confluent.topic.bootstrap.servers": "localhost:9092"
      }
    }
  7. Load the JMS Source Connector

    confluent local load jms -- -d jms-source.json
  8. Post-Check Connector Status

    1. Using Confluent Platform Admin: Direct to http://localhost:9021, confluent platform admin, see connector status.

    2. Using Confluent CLI

      confluent local status jms
  9. Test Message Transfer

    Use sink connector above to enqueue messages in the TEQ, then pause the sink connector and start the source connector. The messages would be dequeued from the TEQ and produce into Kafka topic.

Monitoring Message Transfer

The Sink/Source connector messages transfer can be monitored from both two sides: