6 Kafka Java Client Interface for Oracle Transactional Event Queues

This chapter includes the following topics:

Apache Kafka Overview

Apache Kafka is a community distributed event streaming platform that is horizontally-scalable and fault-tolerant.

Kafka is run as a cluster run on one or more servers. Each Kafka cluster stores streams of records in categories called topics. Each record consists of a key, a value, and a timestamp. Kafka APIs allow application to connect to Kafka cluster and use Kafka messaging platform.

Kafka Java Client for Transactional Event Queues

Oracle Database 20c introduces Kafka application compatibility with Oracle database. This provides easy migration for Kafka Java applications to Transaction Event Queues (TEQ). The Kafka Java APIs can now connect to Oracle database server and use TEQ as a messaging platform.

Figure 6-1 Kafka Application Integration with Transactional Event Queue



The figure shows OKafka library, which contains Oracle specific implementation of Kafka's Java APIs. This implmentation internally invokes AQ-JMS APIs which in turn uses JDBC driver to communicate with Oracle Database.

Developers can now migrate an existing Java application that uses Kafka to the Oracle database. Oracle Database 20c provides client side library which allows Kafka applications to connect to Oracle Database instead of Kafka cluster and use TEQ's messaging platform transparently.

Configuring Kafka Java Client for Transactional Event Queues

Two levels of configuration are required to migrate Kafka application to TEQ messaging platform:

  • Database level configuration

  • Application level configuration.

Kafka application needs to set certain properties which will allow OKafka library to locate the Oracle Database. This is analogous to how Kafka application provides zoo keeper information. These connection properties can be set in the following two ways:

  • using database user and pasword provided in plain text

  • using JDBC wallet.

Prerequisites

The following are the prerequisites for configuring and running Kafka Java client for TEQ in an Oracle Database.

  1. Create a database user.

  2. Grant the following user privileges.

    • grant connect, resource to user.

    • grant execute on dbms_aq to user.

    • grant execute on dbms_aqadm to user.

    • grant execute on dbms_aqin to user.

    • grant execute on dbms_aqjms to user.

    • grant select_catalog_role to user.

  3. Set the correct database configuration parameter to use TEQ.

    set streams_pool_size=400M
  4. Set LOCAL_LISTENER database parameter

    set LOCAL_LISTENER= (ADDRESS=(PROTOCOL=TCP)(HOST=<HOST NAME/ IP> )(PORT=<PORT NUMBER>))

Connection Configuration

The OKafka library connects to Oracle Database using JDBC Thin Driver. To setup this connection, Kafka application can provide username password in plain text or applications can configure SSL. To run Kafka application against Oracle Autonomous Transaction Processing (ATP) Database on Cloud, only SSL configuration is supported. You can connect to the Oracle Database using PLAINTEXT or SSL.

  • PLAINTEXT: In this protocol JDBC connection uses username and password to connect to Oracle instance.

    To use plaintext protocol the user has to provide following properties through application

    • oracle.service.name = <name of the service running on the instance>

    • oracle.instance.name = <name of the Oracle Database instance>

    • bootstrap.servers = <host:port>

    The following properties in ojdbc.properties file and ojdbc.properties file should be in location oracle.net.tns_admin.

    • user = <nameofdatabaseuser>

    • password = <userpassword>

  • SSL: To use ssl secured connections to connect to and ATP Database, perform the following steps.

    1. JDBC Thin Driver Connection prerequisites for ssl security:
      • JDK8u162 or higher.

      • oraclepki.jar, osdt_cert.jar, and osdt_core.jar

      • 18.3 JDBC Thin driver or higher( recommended)

    2. To leverage JDBC ssl security to connect to Oracle Database instance, user has to provide following properties. JDBC supports ssl secured connections to Oracle Database in two ways.

      • Using wallets. To use wallets:

        1. Add the required dependant jars for using Oracle Wallets in classpath.

          Download oraclepki.jar, osdt_cert.jar, and osdt_core.jar files alomg with JDBC thin driver add these jars to classpath.

        2. Enable Oracle PKI provider

          Add OraclePKIProvider at the end of file java.security (located at $JRE_HOME/jre/lib/security/java.security) if SSO wallet, that is, cwallet.sso is used for providing ssl security. For example, java.security:

                           security.provider.1=sun.security.provider.Sun
                           security.provider.2=sun.security.rsa.SunRsaSign
                           security.provider.3=com.sun.net.ssl.internal.ssl.Provider
                           security.provider.4=com.sun.crypto.provider.SunJCE
                           security.provider.5=sun.security.jgss.SunProvider
                           security.provider.6=com.sun.security.sasl.Provider
                           security.provider.7=oracle.security.pki.OraclePKIProvider

          To use ewallet.p12 for ssl security then place OraclePKIProvider before sun provider in file java.security. For example, java.security:

                          security.provider.1=sun.security.provider.Sun
                          security.provider.2=sun.security.rsa.SunRsaSign
                          security.provider.3=oracle.security.pki.OraclePKIProvider
                          security.provider.4=com.sun.net.ssl.internal.ssl.Provider
                          security.provider.5=com.sun.crypto.provider.SunJCE
                          security.provider.6=sun.security.jgss.SunProvider
                          security.provider.7=com.sun.security.sasl.Provider
        3. Provide following properties through application.

                        security.protocol = “”SSL”
                        oracle.net.tns_admin = “location of tnsnames.ora file” (for parsing jdbc connection string)
                        tns.alias = “alias of connection string in tnsnames.ora”
                        

          And following properties in ojdbc.properties file and ojdbc.properties file should be in location oracle.net.tns_admin.

                        user(in smallletters)=nameofdatabaseuser
                        password(in smallletters)=userpassword
                        oracle.net.ssl_server_dn_match=true
                        oracle.net.wallet_location=“(SOURCE=(METHOD=FILE)
                                                   (METHOD_DATA=(DIRECTORY=/location../wallet_dbname)))”
      • Using Java key store. To Provide JDBC SSL security with Java key store, provide following properties through the application:

               security.protocol = "SSL"              
               oracle.net.tns_admin = "location of tnsnames.ora file"              
               tns.alias = "alias of connection string in tnsnames.ora"

        And following properties in ojdbc.properties file and ojdbc.properties file should be in location oracle.net.tns_admin.

                      user(in smallletters)=nameofdatabaseuser
                      password(in smallletters)=userpassword
                      oracle.net.ssl_server_dn_match=true
                      javax.net.ssl.trustStore==${TNS_ADMIN}/truststore.jks
                      javax.net.ssl.trustStorePassword = password
                      javax.net.ssl.keyStore= ${TNS_ADMIN}/keystore.jks
                      javax.net.ssl.keyStorePassword="password" 

        Note:

        tnsnames.ora file in wallet downloaded from ATP contains jdbc connection string which is used for establishing jdbc .

Kafka Client Interfaces

Kafka applications mainly use Producer, Consumer, and Admin APIs to communicate with Kafka cluster. This version of Kafka client for TEQ supports only subset of Kafka 2.0's Producer, Consumer, and Admin APIs and properties.

Overview of Kafka Producer Implementation for TEQ

Producer APIs, allow Kafka application to publish messages into Oracle Transaction Event Queues. Kafka application needs to provide Oracle specific properties which are oracle.host, oracle.port, oracle.servicename, and oracle.instancename. More details about this properties are mentioned in configuration section. These properties are used to setup the database connection and produce the message into TEQ. In the current release, Oracle's implementation of KafkaProducer supports only a subset of the APIs.

Internally, Oracle Kafka Producer object encapsulates AQJMS producer object which is used to publish messages into Oracle TEQ. Similar to Kafka Producer, Producer also stores messages in batches. Each send() call will append a Kafka Record into a particular batch based on its Topic and Partition. A background thread will publish the entire batch, one at a time into Oracle TEQ. Each batch publish is committed by Producer. In current release, a topic can have only one partition and hence all KafkaRecords will be published into a single partition of TEQ.

The following KafkaProducer APIs are supported in Oracle Database 20c.

  • Constructor:

    KafkaProducer: Creates a producer object and internal AQ JMS objects. KafkaProducer class has four types of constructors defined, which all takes configuration parameters as input.

  • Methods:

    • send(ProducerRecord) , send(ProducerRecord, Callback):

      The send method asynchronously publishes a message into TEQ. This method returns immediately once a Kafka Record has been stored in the buffer of records waiting to be sent. If buffer memory is full, then send call blocks for a maximum of time max.block.ms. This allows sending many records in parallel without blocking to wait for the response after each one. Records will be published into the topic using AQ JMS.

      The result of the send is a Future<RecordMetadata> specifying the partition the record was sent to, the offset it was assigned and the timestamp of the record. Both the version send(ProducerRecord) and send(ProducerRecord, Callback) will be supported.

    • close: Closes the producer, its sender thread and frees the accumulator. It also closes internal AQ JMS objects like connection, session JMS producer and so on.

  • Classes

    • ProducerRecord: A class that represents a message in Kafka platform. It is translated into a message for TEQ platform, namely, AQ JMS message. Relevant fields like Payload and Key can be directly translated into TEQ payload and message key for TEQ.

    • RecordMetadata: This contains metadata of the record like topic, partition, offset, timestamp etc of the Record in KafkaPlatform. This is assigned value relevant for TEQs. A message id of TEQ is converted into an offset of RecordMetadata.

    • Callback Interface: A callback function which is executed once a Record is successfully published into Kafka topic.

    • Partitioner Interface: Defines methods which maps a Key of the message to a partition number of the topic. A partition number is analogous to a stream id of TEQs.

  • Properties

    • Key Serializer and Value Serializer: Converts Key and payload into byte array respectively. The Accumulator module will store the payloads in the form of byte array. Sender thread will then form an AQjmsBytes message and publish the message using AQ JMS Array Enqueue API.

    • acks: For okafka, only value relevant for acks property is all. Any other field set by the user is ignored.

    • linger.ms: Time in miliseconds for which sender thread waits before publishing the records in TEQ.

    • batch.size: Total size of records to be batched in bytes for which sender thread waits before publishing records in TEQ.

    • buffer.memory: Total memory in bytes the accumulator can hold.

    • max.block.ms: If buffer.memory size is full in accumulator, then wait for max.block.ms amount of time before send() method can receive out of memory error.

    • retries: This property enables producer to resend the record in case of transient errors. This value is an upper limit on how many resends.

    • retry.backoff.ms : The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios

    • bootstrap.servers: IP address and port of a machine where database instance running.

Overview of Kafka Consumer implementation for TEQ

Consumer API allows applications to read streams of data from Transactional Event Queue. Kafka consumer for TEQ uses AQ JMS APIs and use JDBC driver to consume messages from Oracle TEQ. For Oracle Kafka, consuming message from a topic implies dequeuing messages from Transactional Event Queue.

Similar to Kafka, in TEQ's implementation , a consumer group contains many consumer instances. Each consumer group has a unique group-id. Each consumer internally maintains a single connection/session to Oracle Database instance provided by bootstrap.servers property. For this release, since a topic can have only one partition, only one of the consumer instances will be assigned this single partition. A partition once assigned to a consumer of a consumer group then remains with that consumer till the session is closed. No two consumers from same group are assigned same partition of a topic.

The following KafkaConsumer APIs are supported in Oracle Database 20c.

  • Constructor: KafkaConsumer: Creates a consumer that allows the application to consume message from key based TEQ. Internal client side TEQ objects created are not visible to client application. All variation of the KafkaConsumer constructor are supported in Oracle Database 20c.

  • Methods:

    • Subscribe: This method takes a list of topics to subscribe to. In Oracle Database 20c, only the first topic of the list will be subscribed to. An exception is thrown if size of list is > 1. This method creates a durable subscriber on TEQ server side with Group-Id as subscriber name.

    • Poll: poll method returns a batch of messages from the assigned partition from TEQ. It attempts to dequeue a message from the key based TEQ for the subscriber. TEQ uses array dequeue API of AQ JMS to receive a batch of messages dequeued from the queue. The size of the batch depends on the parameter max.poll.records set by the kafka client application. Poll takes time in milliseconds as an argument. AQ JMS API of array dequeue can pass this timeout time as a dequeue option to the TEQ Server and make dequeue call wait for messages till the timeout time, if the full array batch is not complete.

      When poll is invoked for the first time, Oracle TEQ assigns a single available partition to this Kafka consumer. This assignment stays for the entire lifetime of the Kafka consumer. Messages returned belongs to the partition assigned to the consumer. One queue partition is assigned to each Kafka consumer. It is the responsibility of the application developer to start as many consumers as number of partitions of the queue. If the number of Kafka consumers are less than the number of partitions, then messages from unassigned partitions are never consumed. If the number of Kafka consumers are more than the number of partitions, then extra consumers will not be assigned any partition and hence, will not be able to consume any messages. No two consumer application will consume from same partition at the same time.

    • commitSync: Commits all consumed messages. Commit to an offset is not supported in Oracle Database 20c. This call directly calls commit on the database which commits all consumed messages from TEQ.

    • commitAsync: This call is translated into commitSync. A callback function passed as argument gets executed once the commit is successful.

    • Unsubscribe: Unsubscribes the topic that it has subscribed to. A consumer can no longer consume messages from unsubscribed topics. This call does not remove a subscriber group from the TEQ metadata. Other consumer application can still continue to consume.

    • close: Closes the consumer and unsubscribes the topic it has subscribed to.

  • Class: ConsumerRecord: A class that represents a consumed record in Kafka platform. In Oracle Dataase 20c, AQ JMS message is converted into ConsumerRecord.

  • Properties:

    • key.deserializer and value.deserialzer: In Oracle TEQ's Key based partitioned queue key, value are stored as byte array in user property, payload of JMS message respectively. On consuming these byte arrays are deserialized into key, value having user provided format internally by the consumer using key.deserialize and value.deserializer respectively.

    • group.id: This is a consumer group name for which messages are consumed from the Kafka topic. This property is used as a durable subscriber name for key based TEQs.

    • max.poll.records: Maximum number of records to fetch in single array dequeue call from an Oracle TEQ server.
    • fetch.max.wait.ms: Maximum amount of time in milliseconds to wait for fetching messages if not available.

    • enable.auto.commit: Enables auto commit of consumed messages for every specified interval.

    • auto.commit.interval.ms: Interval in milliseconds for auto commit of messages.

    • bootstrap.servers: IP address and port of a machine where database instance running.

Overview of Kafka Admin Implementation for TEQ

Kafka admin API allows applications to perform administrative tasks like creating a topic, deeting a topic, add partition to a topic and so on. Oracle Database 20c supports the following admin APIs:

  • Methods

    • create(props) and create(config): Creates an object of KafkaAdmin class that uses passed parameters. The user creates a database session which is used for further operations. Client application has to provide oracle.host, oracle.port, oracle.servicename, oracle.instancename, oracle.user, and oracle.password. These Oracle Database properties are used to setup the database connection.

    • close(): Closes a database session and Admin client.

    • deleteTopic: Stops and drops a TEQ.

  • Classes: NewTopic: Class used for creating a new topic. This class contains parameters with which a transactional event queue is created.

  • Properties

    • bootstrap.servers: IP address and port of a machine where database instance running.

    • retention.ms: Amount of time in milliseconds a message is retained in queue after all consumer groups or subscribers dequed a message.

    • partitions: A parameter in class NewTopic. The number of partitions with which a new transactional event queue is created.

Examples: How to Use

Example 6-1 Producer.java

import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.oracle.okafka.clients.admin.AdminClient;
import org.oracle.okafka.clients.admin.CreateTopicsOptions;
import org.oracle.okafka.clients.admin.NewTopic;
import org.oracle.okafka.clients.producer.KafkaProducer;
import org.oracle.okafka.clients.producer.ProducerRecord;
import org.oracle.okafka.common.KafkaFuture;

public class Producer {	
	
public static void main(String[] args) {			
		
		if(args.length != 1) {
			System.out.println("Please provide topic name to produce messages.");
			return ;
		}		
		String topic = args[0].trim(); 			
              KafkaProducer<String,String> prod = null;		
		Properties props = new Properties();
		
		props.put("oracle.instance.name", "kafka");
		props.put("oracle.service.name", "kafka.regress.rdbms.dev.us.oracle.com");	  
              props.put("oracle.user.name", "aq");
	       props.put("oracle.password", "aq");	    
              props.put("bootstrap.servers", "localhost:1521");
		props.put("batch.size", 200);
		props.put("linger.ms", 100);
		props.put("buffer.memory", 335544);
		props.put("key.serializer", "org.oracle.okafka.common.serialization.StringSerializer");
		props.put("value.serializer", "org.oracle.okafka.common.serialization.StringSerializer");	
		
		System.out.println("Creating producer now");		  
		prod=new KafkaProducer<String, String>(props);
		System.out.println("Producer created.");
		
		 try {
			 int i;	
			 for(i = 0; i < 10; i++)				 
			     prod.send(new ProducerRecord<String, String>(topic ,0, i+"000","This is new message"+i));
 		     System.out.println("Sent "+ i + "messages");	 
		 } catch(Exception ex) {			  
			 System.out.println("Failed to send messages:");
			 ex.printStackTrace();
		 }
		 finally {
			 prod.close();
		 }		
	}
}

Example 6-2 Consumer.java

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import org.oracle.okafka.clients.consumer.ConsumerRecord;
import org.oracle.okafka.clients.consumer.ConsumerRecords;
import org.oracle.okafka.clients.consumer.KafkaConsumer;
public class Consumer {	
	public static void main(String[] args) {		
	    Properties props = new Properties();	    
	    if(args.length != 1) {
			System.out.println("Please provide topic name to consume messages.");
			return ;
		}	  
	    String topic = args[0].trim(); 
           props.put("oracle.service.name", "kafka.regress.rdbms.dev.us.oracle.com");	    	     
           props.put("oracle.instance.name", "kafka");
	    props.put("oracle.user.name", "aq");
	    props.put("oracle.password", "aq");
	    props.put("bootstrap.servers", "localhost:1521");
           props.put("group.id", "kafka");
           props.put("enable.auto.commit", "true");
           props.put("auto.commit.interval.ms", "10000");
           props.put("key.deserializer",  "org.oracle.okafka.common.serialization.StringDeserializer");	      
           props.put("value.deserializer",    "org.oracle.okafka.common.serialization.StringDeserializer");  	    
           props.put("max.poll.records", 100);
      KafkaConsumer<String, String> consumer = null;
	    consumer = new KafkaConsumer<String, String>(props);
     consumer.subscribe(Arrays.asList(topic));
     ConsumerRecords<String, String> records = null;
	   try {
		   records = consumer.poll(Duration.ofMillis(1000));
	 	   for (ConsumerRecord<String, String> record : records) {		 	  	   
                    System.out.println("topic = , partition=  ,key= , value = \n"+ 		 	  	             
                                  record.topic()+ "  "+record.partition()+ "  "+record.key()+"  "+ record.value());                  
                    System.out.println(".......");
 	 	    }
	 	   consumer.commitSync();		 	  	    	 
	     }catch(Exception ex) {
	    	 ex.printStackTrace(); 
	     } finally {
	    	 consumer.close();
	     } 
	}    
}