5 Kafka APIs for Oracle Transactional Event Queues
Oracle Transactional Event Queue (TxEventQ) makes it easy to implement event-based applications. It is also highly integrated with Apache Kafka, an open-source stream-processing software platform developed by LinkedIn and donated to the Apache Software Foundation, written in Scala and Java. Apart from enabling applications that use Kafka APIs to transparently operate on Oracle TxEventQ, Oracle TxEventQ also supports bi-directional information flow between TxEventQ and Kafka, so that changes are available in TxEventQ or Kafka as soon as possible in near-real-time.
Apache Kafka Connect is a framework included in Apache Kafka that integrates Kafka with other systems. Oracle TxEventQ will provide standard JMS package and related JDBC, Transaction packages to establish the connection and complete the transactional data flow. Oracle TxEventQ configures standard Kafka JMS connectors to establish interoperability and complete the data flow between the two messaging systems.
This chapter includes the following topics:
Apache Kafka Overview
Apache Kafka is a community distributed event streaming platform that is horizontally-scalable and fault-tolerant.
Kafka is deployed on a cluster of one or more servers. Each Kafka cluster stores streams of records in categories called topics. Each record consists of a key, a value, and a timestamp. The Kafka APIs allow an application to connect to a Kafka cluster and use the Kafka messaging platform.
Kafka Java Client for Transactional Event Queues
Oracle Database 21c introduced Kafka application compatibility with the Oracle Database. Oracle Database 23ai provides more refined compatibility for a Kafka application with the Oracle Database. This provides easy migration for Kafka Java applications to Transactional Event Queues (TxEventQ). The Kafka Java APIs can now connect to an Oracle database server and use TxEventQ as a messaging platform.
Figure 5-1 Kafka Application Integration with Transactional Event Queue
Description of "Figure 5-1 Kafka Application Integration with Transactional Event Queue"
This figure shows the Kafka API library, which contains Oracle specific implementation of Kafka's Java APIs which depends on the kafka-clients-2.8.0.jar
file. This implementation internally invokes AQ-JMS APIs which in turn use the JDBC driver to communicate with the Oracle Database.
Developers can now migrate an existing Java application that uses Kafka to the Oracle database using okafka.jar
. This client side library allows Kafka applications to connect to the Oracle Database instead of a Kafka cluster and use TxEventQ's messaging platform transparently.
Configuring Kafka Java Client for Transactional Event Queues
Prerequisites
The following are the prerequisites for configuring and running the Kafka Java client for TxEventQ in an Oracle Database.
-
Create a database user.
-
Grant the following privileges to the user.
Note:
It is preferred in general to assign or grant a specific quota on a tablespace to a database user instead of granting unlimited quota in default tablespace. One can create a table space and use the following command to grant quota on a specific tablespace to a database user.
ALTER USER user QUOTA UNLIMITED /* or size-clause */ on tablespace_name
-
GRANT EXECUTE on DBMS_AQ to user
. -
GRANT EXECUTE on DBMS_AQADM to user
. -
GRANT SELECT on GV_$SESSION to user;
-
GRANT SELECT on V_$SESSION to user;
-
GRANT SELECT on GV_$INSTANCE to user;
-
GRANT SELECT on GV_$LISTENER_NETWORK to user;
-
GRANT SELECT on GV_$PDBS to user;
-
GRANT SELECT on USER_QUEUE_PARTITION_ASSIGNMENT_TABLE to user;
-
exec DBMS_AQADM.GRANT_PRIV_FOR_RM_PLAN('user');
-
-
Set the correct database configuration parameter to use TxEventQ.
SET STREAMS_POOL_SIZE=400M
Note:
Set the size appropriately based on your workload.
STREAMS_POOL_SIZE
cannot be set for Autonomous Database Shared. It is automatically configured. If set, then it is ignored. -
Set
LOCAL_LISTENER
database parameterSET LOCAL_LISTENER= (ADDRESS=(PROTOCOL=TCP)(HOST=<HOST_NAME.DOMAIN_NAME/ IP> )(PORT=<PORT NUMBER>))
Connection Configuration
The Kafka API library connects to the Oracle Database using the JDBC Thin Driver. To set up this connection, the Kafka application can provide username and password in plain text, or applications can configure SSL. To run a Kafka application against Oracle Autonomous Database (ADB) on OCI, only SSL configuration is supported. For other deployments, you can connect to the Oracle Database using PLAINTEXT or SSL.
-
PLAINTEXT: In this security protocol, JDBC connections to an Oracle Database are set up using the TCP protocol with username and password provided in plaintext in the
ojdbc.properties
file.To use the PLAINTEXT protocol, the application must set the following properties:
-
oracle.service.name = <name of the service running on the instance>
-
bootstrap.servers = <host:port>
-
security.protocol
=PLAINTEXT -
oracle.net.tns_admin
= <location of ojdbc.properties file
>
The following properties should be present in the
ojdbc.properties
file:-
user = <nameofdatabaseuser>
-
password = <userpassword>
-
-
SSL: To use the SSL security protocol to connect to an ATP database, perform the following additional steps.
-
JDBC Thin Driver Connection prerequisites for SSL security:
-
JDK8u162 or higher.
-
oraclepki.jar
,osdt_cert.jar
, andosdt_core.jar
-
18.3 or higher JDBC Thin Driver
-
-
To leverage the JDBC SSL security to connect to the Oracle Database instance, the user has to provide the following properties. JDBC supports SSL secured connections to Oracle Database in two ways.
-
Using wallets. To use wallets:
-
Add the required dependant jars for using Oracle Wallets in classpath.
Download
oraclepki.jar
,osdt_cert.jar
, andosdt_core.jar
files along with the JDBC Thin Driver, and add these jars to classpath. -
Enable Oracle PKI provider
Add
OraclePKIProvider
at the end of the filejava.security
(located at$JRE_HOME/jre/lib/security/java.security
) if SSO wallet, that is,cwallet.sso
is used for providing SSL security. For example:security.provider.1=sun.security.provider.Sun security.provider.2=sun.security.rsa.SunRsaSign security.provider.3=com.sun.net.ssl.internal.ssl.Provider security.provider.4=com.sun.crypto.provider.SunJCE security.provider.5=sun.security.jgss.SunProvider security.provider.6=com.sun.security.sasl.Provider security.provider.7=oracle.security.pki.OraclePKIProvider
To use
ewallet.p12
for SSL security then placeOraclePKIProvider
before sun provider in filejava.security
. For example:security.provider.1=sun.security.provider.Sun security.provider.2=sun.security.rsa.SunRsaSign security.provider.3=oracle.security.pki.OraclePKIProvider security.provider.4=com.sun.net.ssl.internal.ssl.Provider security.provider.5=com.sun.crypto.provider.SunJCE security.provider.6=sun.security.jgss.SunProvider security.provider.7=com.sun.security.sasl.Provider
-
Set the following properties in the application.
security.protocol=SSL oracle.net.tns_admin=<location of tnsnames.ora file> tns.alias=<alias of connection string in tnsnames.ora>
Set the following properties in the
ojdbc.properties
file. This file must be available at the location specified by theoracle.net.tns_admin
property.user(in smallletters)=nameofdatabaseuser password(in smallletters)=userpassword oracle.net.ssl_server_dn_match=true oracle.net.wallet_location=“(SOURCE=(METHOD=FILE) (METHOD_DATA=(DIRECTORY=/location../wallet_dbname)))”
-
-
To use JDBC SSL security with a Java KeyStore, provide the following properties in the application:
security.protocol=SSL oracle.net.tns_admin=<location of tnsnames.ora file> tns.alias=<alias of connection string in tnsnames.ora>
Set the following properties in the
ojdbc.properties
file. This file must be available at the location specified by theoracle.net.tns_admin
property.user(in smallletters)=nameofdatabaseuser password(in smallletters)=userpassword oracle.net.ssl_server_dn_match=true javax.net.ssl.trustStore==${TNS_ADMIN}/truststore.jks javax.net.ssl.trustStorePassword=password javax.net.ssl.keyStore=${TNS_ADMIN}/keystore.jks javax.net.ssl.keyStorePassword=password
Note:
tnsnames.ora
file in wallet downloaded from ATP contains a JDBC connection string which is used for establishing a JDBC connection.
-
-
See Also:
ALTER USER in Oracle Database SQL Language Reference
Kafka Client Interfaces
Kafka applications mainly use Producer, Consumer, and Admin APIs to communicate with a Kafka cluster. This version of Kafka client for TxEventQ supports only a subset of Apache Kafka 2.8.0's Producer, Consumer and Admin APIs and properties. With the okafka.jar
client library, Kafka applications will be able to use the Oracle TxEventQ platform. The okafka.jar
library requires JRE 9 or above.
We first illustrate the use of the Kafka Client APIs by way of simple examples, and later describe more details on the same.
Kafka API Examples
Example: Creating an Oracle Kafka Topic
import java.util.Arrays; import java.util.Properties; import java.util.concurrent.ExecutionException; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.CreateTopicsResult; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.common.KafkaFuture; import org.oracle.okafka.clients.admin.AdminClient; public class SimpleAdminOKafka { public static void main(String[] args) { Properties props = new Properties(); //IP or Host name where Oracle Database 23ai is running and Database Listener's Port props.put("bootstrap.servers", "localhost:1521"); //name of the service running on the database instance props.put("oracle.service.name", "freepdb1"); props.put("security.protocol","PLAINTEXT"); // location for ojdbc.properties file where user and password properties are saved props.put("oracle.net.tns_admin","."); try (Admin admin = AdminClient.create(props)) { //Create Topic named TEQ with 10 Partitions. CreateTopicsResult result = admin.createTopics( Arrays.asList(new NewTopic("TEQ", 10, (short)0))); try { KafkaFuture<Void> ftr = result.all(); ftr.get(); } catch ( InterruptedException | ExecutionException e ) { throw new IllegalStateException(e); } System.out.println("Closing OKafka admin now"); } catch(Exception e) { System.out.println("Exception while creating topic " + e); e.printStackTrace(); } } }
Example: Kafka Consumer API
import java.util.Properties; import java.time.Duration; import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.List; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.oracle.okafka.clients.consumer.KafkaConsumer; public class SimpleConsumerOKafka { // Dummy implementation of ConsumerRebalanceListener interface // It only maintains the list of assigned partitions in assignedPartitions list static class ConsumerRebalance implements ConsumerRebalanceListener { public List<TopicPartition> assignedPartitions = new ArrayList<>(); @Override public synchronized void onPartitionsAssigned(Collection<TopicPartition> partitions) { System.out.println("Newly Assigned Partitions:"); for (TopicPartition tp :partitions ) { System.out.println(tp); assignedPartitions.add(tp); } } @Override public synchronized void onPartitionsRevoked(Collection<TopicPartition> partitions) { System.out.println("Revoked previously assigned partitions. "); for (TopicPartition tp :assignedPartitions ) { System.out.println(tp); } assignedPartitions.clear(); } } public static void main(String[] args) { //System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "TRACE"); Properties props = new Properties(); //IP or Host name where Oracle Database 23ai is running and Database Listener's Port props.put("bootstrap.servers", "localhost:1521"); //name of the service running on the database instance props.put("oracle.service.name", "freepdb1"); props.put("security.protocol","PLAINTEXT"); // location for ojdbc.properties file where user and password properties are saved props.put("oracle.net.tns_admin","."); //Consumer Group Name props.put("group.id" , "CG1"); props.put("enable.auto.commit","false"); // Maximum number of records fetched in single poll call props.put("max.poll.records", 2000); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); Consumer<String , String> consumer = new KafkaConsumer<String, String>(props); ConsumerRebalanceListener rebalanceListener = new ConsumerRebalance(); //Subscribe to a single topic named 'TEQ'. consumer.subscribe(Arrays.asList("TEQ"), rebalanceListener); int expectedMsgCnt = 40000; int msgCnt = 0; Instant startTime = Instant.now(); try { while(true) { try { //Consumes records from the assigned partitions of 'TEQ' topic ConsumerRecords <String, String> records = consumer.poll(Duration.ofMillis(10000)); //Print consumed records for (ConsumerRecord<String, String> record : records) { System.out.printf("partition = %d, offset = %d, key = %s, value =%s\n ", record.partition(), record.offset(), record.key(), record.value()); for(Header h: record.headers()) { System.out.println("Header: " +h.toString()); } } //Commit all the consumed records if(records != null && records.count() > 0) { msgCnt += records.count(); System.out.println("Committing records " + records.count()); try { consumer.commitSync(); }catch(Exception e) { System.out.println("Exception in commit " + e.getMessage()); continue; } if(msgCnt >= expectedMsgCnt ) { System.out.println("Received " + msgCnt + " Expected " + expectedMsgCnt +". Exiting Now."); break; } } else { System.out.println("No Record Fetched. Retrying in 1 second"); Thread.sleep(1000); } }catch(Exception e) { System.out.println("Inner Exception " + e.getMessage()); throw e; } } }catch(Exception e) { System.out.println("Exception from OKafka consumer " + e); e.printStackTrace(); }finally { long runDuration = Duration.between(startTime, Instant.now()).toMillis(); System.out.println("Closing OKafka Consumer. Received "+ msgCnt +" records. Run Duration " + runDuration); consumer.close(); } } }
Example: Kafka Producer API
import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.header.internals.RecordHeader; import org.oracle.okafka.clients.producer.KafkaProducer; import java.time.Duration; import java.time.Instant; import java.util.Properties; import java.util.concurrent.Future; public class SimpleProducerOKafka { public static void main(String[] args) { try { Properties props = new Properties(); //IP or Host name where Oracle Database 23ai is running and Database Listener's Port props.put("bootstrap.servers", "localhost:1521"); //name of the service running on the database instance props.put("oracle.service.name", "freepdb1"); props.put("security.protocol","PLAINTEXT"); // location for ojdbc.properties file where user and password properties are saved props.put("oracle.net.tns_admin","."); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); String baseMsg = "This is a test message "; // Creates OKafka Producer Producer<String, String> producer = new KafkaProducer<String, String>(props); Future<RecordMetadata> lastFuture = null; int msgCnt = 40000; Instant startTime = Instant.now(); //Headers, common for all records RecordHeader rH1 = new RecordHeader("CLIENT_ID", "FIRST_CLIENT".getBytes()); RecordHeader rH2 = new RecordHeader("REPLY_TO", "REPLY_TOPIC_NAME".getBytes()); //Produce 40000 messages into topic named "TEQ". for(int i=0;i<msgCnt;i++) { ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("TEQ", ""+i, baseMsg + i); producerRecord.headers().add(rH1).add(rH2); lastFuture =producer.send(producerRecord); } //Waits until the last message is acknowledged lastFuture.get(); long runTime = Duration.between( startTime, Instant.now()).toMillis(); System.out.println("Produced "+ msgCnt +" messages. Run Duration " + runTime); //Closes the OKafka producer producer.close(); } catch(Exception e) { System.out.println("Exception in Main " + e ); e.printStackTrace(); } } }
Example: Deleting an Oracle Kafka Topic
import java.util.Collections; import java.util.Properties; import org.apache.kafka.clients.admin.Admin; import org.oracle.okafka.clients.admin.AdminClient; public class SimpleAdminDeleteTopic { public static void main(String[] args) { Properties props = new Properties(); //IP or Host name where Oracle Database 23ai is running and Database Listener's Port props.put("bootstrap.servers", "localhost:1521"); //name of the service running on the database instance props.put("oracle.service.name", "freepdb1"); props.put("security.protocol","PLAINTEXT"); // location for ojdbc.properties file where user and password properties are saved props.put("oracle.net.tns_admin","."); try (Admin admin = AdminClient.create(props)) { //Throws Exception if failed to delete the topic. Returns null on successful deletion. org.apache.kafka.clients.admin.DeleteTopicsResult delResult = admin.deleteTopics(Collections.singletonList("TEQ")); Thread.sleep(1000); System.out.println("Closing admin now"); } catch(Exception e) { System.out.println("Exception while creating topic " + e); e.printStackTrace(); } } }
Note:
-
Topics created using the KafkaAdmin interface can be accessed only by KafkaProducer or KafkaConsumer interfaces.
-
Similarly, the KafkaProducer and KafkaConsumer interfaces can produce or consume records only from topics which are created using the KafkaAdmin interface.
Kafka REST APIs for TxEventQ
The TxEventQ REST APIs allow common operations to produce and consume from topics and partitions, and are implemented using Oracle REST Data Services (ORDS) in the Oracle Database. Common operations include creating and deleting topics, producing and consuming messages, and operational APIs for getting consumer lag on a topic, seeking to an offset, among many others.
The following three APIs for Kafka allow TxEventQ to co-exist with Kafka deployments, and provide the advantages of transactional outbox, JMS messaging and pub/sub in the database and high throughput streaming of events to the event queue in the Oracle Database.
See Also:
Oracle Transactional Event Queues REST Endpoints for the Oracle REST Data Services API documentation
Overview of Kafka Producer Implementation for TxEventQ
Producer APIs allow a Kafka application to publish messages into Oracle Transactional Event Queues (TxEventQ). A Kafka application needs to provide Oracle specific properties: bootstrap.servers
, oracle.servicename
, and oracle.net.tns_admin
. More details about these properties are mentioned in the configuration section. These properties are used to set up the database connection and produce the message into TxEventQ. In the current release, Oracle's implementation of KafkaProducer
supports only a subset of the Producer APIs.
Internally, an Oracle Kafka Producer object encapsulates an AQ JMS producer object
which is used to publish messages into Oracle TxEventQ. Similar to Apache Kafka
Producer, each Producer send()
call will append a Kafka Record into
a batch based on its topic and partition. Based on Apache Kafka's internal
algorithm, a background thread will publish the entire batch to an Oracle
TxEventQ.
The following KafkaProducer
APIs are supported in Oracle Database 23ai.
-
Constructor:
KafkaProducer
: Creates a producer object and internal AQ JMS objects. TheKafkaProducer
class has four types of constructors defined, which all take configuration parameters as input. -
Methods:
-
send(ProducerRecord) , send(ProducerRecord, Callback)
:The
send
method asynchronously publishes a message into TxEventQ. This method returns immediately once a Kafka Record has been stored in the buffer of records waiting to be sent. If the buffer is full, then the send call blocks for a maximum time ofmax.block.ms
. Records will be published into the topic using AQ JMS.The method returns a
Future<RecordMetadata>
, which contains the partition, offset, and publish timestamp of the record. Both thesend(ProducerRecord)
andsend(ProducerRecord, Callback)
versions are supported. -
close
: Closes the producer and frees the memory. It will also close the internal connection to the Oracle Database.
-
-
Classes
-
ProducerRecord
: A class that represents a message in the Kafka platform. The Kafka API library translates aProducerRecord
into a JMS BytesMessage for the TxEventQ platform. -
RecordMetadata
: This contains metadata of the record like topic, partition, offset, timestamp etc. of the Record in the Kafka platform. This is assigned values relevant for TxEventQs. A message id of TxEventQ is converted into an offset ofRecordMetadata
. -
Callback Interface: A callback function which is executed once a Record is successfully published into a Kafka topic.
-
Partitioner Interface: Defines methods which map a
Key
of the message to a partition number of the topic. A partition number is analogous to a stream id of TxEventQs.
-
-
Properties
-
key.serializer
andvalue.serializer
: Converts Key and payload into byte array respectively. -
acks
: For Kafka APIs, the only value relevant for theacks
property isall
. Any other field set by the user is ignored. -
linger.ms
: Time in miliseconds for which the sender thread waits before publishing the records in TxEventQ. -
batch.size
: Total size of records to be batched in bytes for which the sender thread waits before publishing records in TxEventQ. -
buffer.memory
: Total memory in bytes the accumulator can hold. -
max.block.ms
: Ifbuffer.memory
size is full in the accumulator, then wait formax.block.ms
amount of time before thesend()
method can receive an out of memory error. -
retries
: This property enables a producer to resend a record in case of transient errors. This value limits the number of retries per batch. -
retry.backoff.ms
: The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios. -
bootstrap.servers
: IP address and port of a machine where an instance of the database is running.
-
Overview of Kafka Consumer implementation for TxEventQ
The Consumer API allows applications to read streams of data from a Transactional Event Queue (TxEventQ). Kafka consumer for TxEventQ uses AQ JMS APIs and a JDBC driver to consume messages in batches from Oracle TxEventQ. For Oracle Kafka, consuming messages from a topic is achieved by dequeuing messages from a Transactional Event Queue.
Similar to Apache Kafka, in TxEventQ's implementation, a consumer group (subscriber) may contains many consumer instances (unique database sessions that are consuming for the subscriber). Each consumer group has a unique group-id (subscriber name). Each consumer instance internally maintains a single connection/session to an Oracle Database instance provided by the bootstrap.servers
property. Oracle Database 23ai introduces Kafka API support for Consumer Group Rebalancing. Partitions of a topic will be distributed among the active consumers of a consumer group such that no two consumers from the same consumer group are assigned the same partition of the topic simultaneously. Whenever new consumers join a consumer group or an existing consumer leaves the group, the partitions will be redistributed among the active consumers.
For the 23ai release of Kafka APIs, a consumer can subscribe to only one topic.
The following KafkaConsumer
APIs are supported in Oracle Database 23ai.
-
Constructor:
KafkaConsumer
: Creates a consumer that allows the application to consume messages from a key based TxEventQ. Internal client side TxEventQ objects created are not visible to a client application. All variations of theKafkaConsumer
constructor are supported in Oracle Database 23ai. -
Methods:
-
Subscribe
: This method takes a list of topics to subscribe to. In Oracle Database 23ai, only the first topic of the list will be subscribed to. An exception is thrown if the size of the list is greater than 1. This method creates a durable subscriber on TxEventQ server side with Group-Id as subscriber name. An application can also implement theConsumerRebalanceListener
interface and pass an object of the implemented class to the subscribe method. This allows a consumer to execute callbacks when a partition is revoked or assigned. -
Poll
: Thepoll
method returns a batch of messages from assigned partitions from TxEventQ. It attempts to dequeue a message from the key based TxEventQ for the subscriber. TxEventQ uses the array dequeue API of AQ JMS to receive a batch of messages from the queue. The size of the batch depends on the parametermax.poll.records
set by the Kafka client application.Poll
takes time in milliseconds as an argument. The AQ JMS API for array dequeue can pass this timeout as a dequeue option to the TxEventQ server and make the dequeue call, which will wait for messages till the timeout if the full array batch is not complete.When poll is invoked for the first time by a consumer, it triggers consumer rebalancing for all the alive consumers of the consumer group. At the end of the consumer rebalancing, all alive consumers are assigned topic partitions, and subsequent poll requests will fetch messages from the assigned partitions only.
An application can participate and influence rebalancing using the
ConsumerRebalanceListener
interface andpartition.assignment.strategy
configuration.The
partition.assignment.strategy
configuration allows an application to select a strategy for assigning partitions to consumer streams. OKafka supports all values for this configuration parameter which are documented in the Apache Kafka 2.8.0 documentation.The default value for this configuration is
org.oracle.okafka.clients.consumer.TXEQAssignor
which is aware of Oracle RAC and implements a strategy that is best for achieving higher throughput from Oracle TxEventQ.This strategy prioritizes fair distribution of partitions and local consumption of messages while distributing partitions among alive sessions.
ConsumerRebalanceListener
allows an application to invoke callbacks when partitions are revoked or assigned to a consumer.The database view
USER_QUEUE_PARTITION_ASSIGNMENT_TABLE
allows a developer to view the current distribution of partitions among the alive consumers. -
commitSync
: Commits all consumed messages. Commit to an offset is not supported in Oracle Database 23ai. This call directly calls commit on the database which commits all consumed messages from TxEventQ. -
commitAsync
: This call is translated intocommitSync
. A callback function passed as an argument gets executed once the commit is successful. -
Unsubscribe
: Unsubscribes the topic that a consumer has subscribed to. A consumer can no longer consume messages from unsubscribed topics. This call does not remove a subscriber group from the TxEventQ metadata. Other consumer applications can still continue to consume for the same consumer group. -
close
: Closes the consumer and unsubscribes the topic it has subscribed to.
-
-
Class:
ConsumerRecord
: A class that represents a consumed record in the Kafka platform. Kafka APIs receive AQ JMS messages from TxEventQ and convert each of them into aConsumerRecord
and deliver it to the application. -
Properties:
-
auto.offset.reset
: When there is no initial offset found for this consumer group, then the value of this property controls whether to consume the messages from the beginning of the topic partition or to only consume new messages. Values for this property and its usage are as follows:-
earliest
: Consume from the beginning of the topic partition. -
latest
: Consume from the end of the topic partition (default). -
none
: Throw an exception if no offset present for the consumer group.
-
-
key.deserializer
andvalue.deserializer
: For Oracle Kafka messaging platform, key and value are stored as byte arrays in a JMS Message in Oracle's TxEventQ. On consuming, these byte arrays are deserialized into a key and a value usingkey.deserializer
andvalue.deserializer
respectively. These properties allow an application to convertKey
andValue
, which are stored in byte array format, into application specified data types. -
group.id
: This is a consumer group name for which messages are consumed from the Kafka topic. This property is used as a durable subscriber name for key based TxEventQs. max.poll.records
: Maximum number of records to fetch in a single array dequeue call from an Oracle TxEventQ server.-
enable.auto.commit
: Enables auto commit of consumed messages for every specified interval. -
auto.commit.interval.ms
: Interval in milliseconds for auto commit of messages. -
bootstrap.servers
: IP address and port of a machine where a database instance is running.
-
Overview of Kafka Admin Implementation for TxEventQ
The Kafka admininistrative API allows applications to perform administrative tasks like creating a topic, deleting a topic, adding a partition to a topic and so on. Oracle Database 23ai release supports only the following administrative APIs.
-
Methods
-
create(props)
andcreate(config)
: Creates an object ofKafkaAdmin
class that uses passed parameters. The method creates a database session which is used for further operations. An application has to provide connection configuration parameters as explained in the Connection Configuration section. -
createTopics()
: Allows an application to create a Kafka Topic. This creates a TxEventQ in the user's schema. -
close()
: Closes a database session and Admin client. -
deleteTopic
: Deletes a Kafka Topic. This returnsnull
when a topic is deleted successfully. Otherwise, the method throws an exception. The method does not return until the topic is successfully deleted or any error is encountered.
-
-
Classes:
NewTopic
: Class used for creating a new topic. This class contains parameters with which a transactional event queue is created.
Kafka REST APIs for TxEventQ
The TxEventQ REST APIs allow common operations to produce and consume from topics and partitions, and are implemented using Oracle REST Data Services (ORDS) in the Oracle Database. Common operations include creating and deleting topics, producing and consuming messages, and operational APIs for getting consumer lag on a topic, seeking to an offset, among many others.
The following three APIs for Kafka allow TxEventQ to co-exist with Kafka deployments, and provide the advantages of transactional outbox, JMS messaging and pub/sub in the database and high throughput streaming of events to the event queue in the Oracle Database.
See Also:
Oracle Transactional Event Queues REST Endpoints for the Oracle REST Data Services API documentation
Kafka Connectors for TxEventQ
The Kafka Sink and Source Connector requires a minimum Oracle Database version of 21c in order to create a Transactional Event Queue. To use the application, Kafka with a minimum version number of 3.1.0 will need to be downloaded and installed on a server.
See Also:
https://github.com/oracle/okafka/tree/master/connectors for more information.
Monitoring Message Transfer
The Sink/Source connector messages transfer can be monitored from Oracle TxEventQ.
See Also:
Monitoring Transactional Event Queues to startup TxEventQ Monitor System to check enqueue/dequeue rate, TxEventQ depth, and more DB/System Level statistics.