5 Oracle Transactional Event QueuesのKafka API
Oracle Transactional Event Queue (TxEventQ)を使用すると、イベントベースのアプリケーションが簡単に実装できます。さらに、ScalaとJavaで作成されたオープンソースのストリーム処理ソフトウェア・プラットフォームであるApache Kafkaと高度に統合されています。このプラットフォームは、LinkedInによって開発され、Apache Software Foundationに寄贈されたものです。Kafka APIを使用するアプリは、透過的にOracle TxEventQの操作ができるようになります。さらに、Oracle TxEventQでは、TxEventQとKafkaの双方向の情報フローもサポートしているため、変更内容はほぼリアルタイムでTxEventQまたはKafkaですぐに利用できるようになります。
Apache Kafka Connectは、Kafkaとその他のシステムを統合するApache Kafkaに組み込まれているフレームワークです。Oracle TxEventQは、標準JMSパッケージおよび関連するJDBCのTransactionパッケージを提供することで、接続を確立してトランザクション・データ・フローを完了します。Oracle TxEventQは、標準のKafka JMSコネクタを構成することで、相互運用性を確立し、2つのメッセージング・システム間のデータ・フローを完成させます。
この章の内容は次のとおりです。
Apache Kafkaの概要
Apache Kafkaは、通信分散型イベント・ストリーミング・プラットフォームです。このプラットフォームは、水平方向のスケーラビリティと耐障害性を備えています。
Kafkaは、1つ以上のサーバーのクラスタにデプロイされます。各Kafkaクラスタには、トピックというカテゴリでレコードのストリームが格納されます。各レコードは、キー、値、およびタイムスタンプで構成されます。Kafka APIにより、アプリケーションはKafkaクラスタに接続し、Kafkaメッセージング・プラットフォームを使用できるようになります。
Transactional Event Queues対応のKafka Javaクライアント
Oracle Database 21cで、KafkaアプリケーションのOracle Databaseとの互換性が導入されました。Oracle Database 23aiは、KafkaアプリケーションとOracle Databaseとのより洗練された互換性を提供します。これにより、Kafka JavaアプリケーションのTransactional Event Queues (TxEventQ)への移行が簡単になります。Kafka Java APIは、Oracleデータベース・サーバーに接続し、メッセージング・プラットフォームとしてTxEventQを使用できるようになりました。
図5-1 KafkaアプリケーションのTransactional Event Queueとの統合

「図5-1 KafkaアプリケーションのTransactional Event Queueとの統合」の説明
この図は、kafka-clients-2.8.0.jarファイルに依存するKafkaのJava APIのOracle固有の実装が含まれているKafka APIライブラリを示しています。この実装では、Oracle Databaseと通信するためにJDBCドライバを使用するAQ-JMS APIを内部的に呼び出します。
開発者は、Kafkaを使用する既存のJavaアプリケーションを、okafka.jarを使用してOracleデータベースに移行できるようになりました。このクライアント側ライブラリを使用すると、KafkaアプリケーションはKafkaクラスタのかわりにOracle Databaseデータベースに接続して、透過的にTxEventQのメッセージング・プラットフォームを使用できるようになります。
Transactional Event Queues対応のKafka Javaクライアントの構成
前提条件
次に、Oracle DatabaseのTxEventQに対応するKafka Javaクライアントを構成および実行するための前提条件を示します。
-
データベース・ユーザーを作成します。
-
ユーザーに次の権限を付与します。
ノート:
デフォルト表領域で無制限の割当て制限を付与するかわりに、表領域の特定の割当て制限をデータベース・ユーザーに割り当てたり付与することが一般的です。表領域を作成し、次のコマンドを使用して特定の表領域の割当て制限をデータベース・ユーザーに付与できます。
ALTER USER user QUOTA UNLIMITED /* or size-clause */ on tablespace_name
-
GRANT EXECUTE on DBMS_AQ to user。 -
GRANT EXECUTE on DBMS_AQADM to user。 -
GRANT SELECT on GV_$SESSION to user; -
GRANT SELECT on V_$SESSION to user; -
GRANT SELECT on GV_$INSTANCE to user; -
GRANT SELECT on GV_$LISTENER_NETWORK to user; -
GRANT SELECT on GV_$PDBS to user; -
GRANT SELECT on USER_QUEUE_PARTITION_ASSIGNMENT_TABLE to user; -
exec DBMS_AQADM.GRANT_PRIV_FOR_RM_PLAN('user');
-
-
TxEventQを使用するための適切なデータベース構成パラメータを設定します。
SET STREAMS_POOL_SIZE=400M
ノート:
ワークロードに基づいてサイズを適切に設定します。
STREAMS_POOL_SIZEはAutonomous Database Sharedに設定できません。これは自動的に構成されます。 -
LOCAL_LISTENERデータベース・パラメータを設定しますSET LOCAL_LISTENER= (ADDRESS=(PROTOCOL=TCP)(HOST=<HOST_NAME.DOMAIN_NAME/ IP> )(PORT=<PORT NUMBER>))
ノート:
LOCAL_LISTENERは、次に対しては設定する必要がありません:-
Autonomous Database、および
-
1つのデータベース・インスタンスのみを含むOracleデータベース(つまり、非クラスタ・デプロイメントの場合)。
-
接続構成
OKafkaは、JDBC (Thinドライバ)接続を使用し、2つのセキュリティ・プロトコルのいずれかを使用してOracle Databaseインスタンスに接続します。
-
PLAINTEXT
-
SSL
PLAINTEXT: このプロトコルでは、ojdbc.prpertiesファイルのプレーン・テキストでユーザー名とパスワードを指定することで、JDBC接続が設定されます。PLAINTEXTプロトコルを使用するには、ユーザーはアプリケーションから次のプロパティを指定する必要があります。
security.protocol = "PLAINTEXT"
bootstrap.servers = "host:port"
oracle.service.name = "name of the service running on the instance"
oracle.net.tns_admin = "location of ojdbc.properties file"ojdbc.propertiesファイルには、次のプロパティが必要です。
user(in lowercase)=DatabaseUserName
password(in lowercase)=PasswordSSL: このプロトコルでは、JDBCドライバはOracle Walletを使用してOracleデータベースに接続します。このプロトコルは、通常、Oracle Autonomous CloudでOracle Database 23aiインスタンスに接続するために使用されます。このプロトコルを使用するには、Okafkaアプリケーションで次のプロパティを指定する必要があります。
security.protocol = "SSL"
oracle.net.tns_admin = "location containing Oracle Wallet, tnsname.ora and ojdbc.properties file"
tns.alias = "alias of connection string in tnsnames.ora"oracle.net.tns_adminプロパティで指定されたディレクトリの場所には次が含まれます:
-
Oracle Wallet
-
tnsnames.oraファイル -
ojdbc.propertiesファイル(オプション)
これは、Oracle Walletの構成方法によって異なります。
関連項目:
セキュアなJDBC接続を確立する方法の詳細は、ウォレットを使用したJDBC Thin接続(mTLS)を参照してください
Kafkaクライアント・インタフェース
Kafkaアプリケーションは、Kafkaクラスタとの通信にプロデューサ、コンシューマ、および管理のAPIを主に使用します。このバージョンのTxEventQ対応Kafkaクライアントは、Apache Kafka 2.8.0のプロデューサ、コンシューマ、および管理のAPIとプロパティのサブセットのみをサポートしています。okafka.jarクライアント・ライブラリを使用すると、KafkaアプリケーションはOracle TxEventQプラットフォームを使用できるようになります。okafka.jarライブラリにはJRE 9以降が必要です。
最初にKafkaクライアントAPIの使用方法を簡単な例で示し、後でその詳細を説明します。
Kafka APIの例
例: Oracle Kafkaトピックの作成
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.KafkaFuture;
import org.oracle.okafka.clients.admin.AdminClient;
public class SimpleAdminOKafka {
public static void main(String[] args) {
Properties props = new Properties();
//IP or Host name where Oracle Database 23ai is running and Database Listener's Port
props.put("bootstrap.servers", "localhost:1521");
//name of the service running on the database instance
props.put("oracle.service.name", "freepdb1");
props.put("security.protocol","PLAINTEXT");
// location for ojdbc.properties file where user and password properties are saved
props.put("oracle.net.tns_admin",".");
try (Admin admin = AdminClient.create(props)) {
//Create Topic named TEQ with 10 Partitions.
CreateTopicsResult result = admin.createTopics(
Arrays.asList(new NewTopic("TEQ", 10, (short)0)));
try {
KafkaFuture<Void> ftr = result.all();
ftr.get();
} catch ( InterruptedException | ExecutionException e ) {
throw new IllegalStateException(e);
}
System.out.println("Closing OKafka admin now");
}
catch(Exception e)
{
System.out.println("Exception while creating topic " + e);
e.printStackTrace();
}
}
}
例: 単純なOKafkaコンシューマの作成
import java.util.Properties;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.oracle.okafka.clients.consumer.KafkaConsumer;
public class SimpleConsumerOKafka {
// Dummy implementation of ConsumerRebalanceListener interface
// It only maintains the list of assigned partitions in assignedPartitions list
static class ConsumerRebalance implements ConsumerRebalanceListener {
public List<TopicPartition> assignedPartitions = new ArrayList<>();
@Override
public synchronized void onPartitionsAssigned(Collection<TopicPartition> partitions) {
System.out.println("Newly Assigned Partitions:");
for (TopicPartition tp :partitions ) {
System.out.println(tp);
assignedPartitions.add(tp);
}
}
@Override
public synchronized void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.println("Revoked previously assigned partitions. ");
for (TopicPartition tp :assignedPartitions ) {
System.out.println(tp);
}
assignedPartitions.clear();
}
}
public static void main(String[] args) {
//System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "TRACE");
Properties props = new Properties();
//IP or Host name where Oracle Database 23ai is running and Database Listener's Port
props.put("bootstrap.servers", "localhost:1521");
//name of the service running on the database instance
props.put("oracle.service.name", "freepdb1");
props.put("security.protocol","PLAINTEXT");
// location for ojdbc.properties file where user and password properties are saved
props.put("oracle.net.tns_admin",".");
//Consumer Group Name
props.put("group.id" , "CG1");
props.put("enable.auto.commit","false");
// Maximum number of records fetched in single poll call
props.put("max.poll.records", 2000);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
Consumer<String , String> consumer = new KafkaConsumer<String, String>(props);
ConsumerRebalanceListener rebalanceListener = new ConsumerRebalance();
//Subscribe to a single topic named 'TEQ'.
consumer.subscribe(Arrays.asList("TEQ"), rebalanceListener);
int expectedMsgCnt = 40000;
int msgCnt = 0;
Instant startTime = Instant.now();
try {
while(true) {
try {
//Consumes records from the assigned partitions of 'TEQ' topic
ConsumerRecords <String, String> records = consumer.poll(Duration.ofMillis(10000));
//Print consumed records
for (ConsumerRecord<String, String> record : records)
{
System.out.printf("partition = %d, offset = %d, key = %s, value =%s\n ", record.partition(), record.offset(), record.key(), record.value());
for(Header h: record.headers())
{
System.out.println("Header: " +h.toString());
}
}
//Commit all the consumed records
if(records != null && records.count() > 0) {
msgCnt += records.count();
System.out.println("Committing records " + records.count());
try {
consumer.commitSync();
}catch(Exception e)
{
System.out.println("Exception in commit " + e.getMessage());
continue;
}
if(msgCnt >= expectedMsgCnt )
{
System.out.println("Received " + msgCnt + " Expected " + expectedMsgCnt +". Exiting Now.");
break;
}
}
else {
System.out.println("No Record Fetched. Retrying in 1 second");
Thread.sleep(1000);
}
}catch(Exception e)
{
System.out.println("Inner Exception " + e.getMessage());
throw e;
}
}
}catch(Exception e)
{
System.out.println("Exception from OKafka consumer " + e);
e.printStackTrace();
}finally {
long runDuration = Duration.between(startTime, Instant.now()).toMillis();
System.out.println("Closing OKafka Consumer. Received "+ msgCnt +" records. Run Duration " + runDuration);
consumer.close();
}
}
}
例: 単純なOKafkaプロデューサの作成
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.oracle.okafka.clients.producer.KafkaProducer;
import java.time.Duration;
import java.time.Instant;
import java.util.Properties;
import java.util.concurrent.Future;
public class SimpleProducerOKafka {
public static void main(String[] args) {
try {
Properties props = new Properties();
//IP or Host name where Oracle Database 23ai is running and Database Listener's Port
props.put("bootstrap.servers", "localhost:1521");
//name of the service running on the database instance
props.put("oracle.service.name", "freepdb1");
props.put("security.protocol","PLAINTEXT");
// location for ojdbc.properties file where user and password properties are saved
props.put("oracle.net.tns_admin",".");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
String baseMsg = "This is a test message ";
// Creates OKafka Producer
Producer<String, String> producer = new KafkaProducer<String, String>(props);
Future<RecordMetadata> lastFuture = null;
int msgCnt = 40000;
Instant startTime = Instant.now();
//Headers, common for all records
RecordHeader rH1 = new RecordHeader("CLIENT_ID", "FIRST_CLIENT".getBytes());
RecordHeader rH2 = new RecordHeader("REPLY_TO", "REPLY_TOPIC_NAME".getBytes());
//Produce 40000 messages into topic named "TEQ".
for(int i=0;i<msgCnt;i++) {
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("TEQ", ""+i, baseMsg + i);
producerRecord.headers().add(rH1).add(rH2);
lastFuture =producer.send(producerRecord);
}
//Waits until the last message is acknowledged
lastFuture.get();
long runTime = Duration.between( startTime, Instant.now()).toMillis();
System.out.println("Produced "+ msgCnt +" messages. Run Duration " + runTime);
//Closes the OKafka producer
producer.close();
}
catch(Exception e)
{
System.out.println("Exception in Main " + e );
e.printStackTrace();
}
}
}
例: Oracle Kafkaトピックの削除
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.clients.admin.Admin;
import org.oracle.okafka.clients.admin.AdminClient;
public class SimpleAdminDeleteTopic {
public static void main(String[] args) {
Properties props = new Properties();
//IP or Host name where Oracle Database 23ai is running and Database Listener's Port
props.put("bootstrap.servers", "localhost:1521");
//name of the service running on the database instance
props.put("oracle.service.name", "freepdb1");
props.put("security.protocol","PLAINTEXT");
// location for ojdbc.properties file where user and password properties are saved
props.put("oracle.net.tns_admin",".");
try (Admin admin = AdminClient.create(props)) {
//Throws Exception if failed to delete the topic. Returns null on successful deletion.
org.apache.kafka.clients.admin.DeleteTopicsResult delResult =
admin.deleteTopics(Collections.singletonList("TEQ"));
Thread.sleep(1000);
System.out.println("Closing admin now");
}
catch(Exception e)
{
System.out.println("Exception while creating topic " + e);
e.printStackTrace();
}
}
}
例: トランザクションOKafkaプロデューサ
import org.oracle.okafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.header.internals.RecordHeader;
import java.sql.Connection;
import java.util.Properties;
public class TransactionalProducerOKafka {
public static void main(String[] args) {
Producer<String, String> producer = null;
try {
Properties props = new Properties();
// Option 1: Connect to Oracle Database with database username and password
props.put("security.protocol","PLAINTEXT");
//IP or Host name where Oracle Database 23ai is running and Database Listener's Port
props.put("bootstrap.servers", "localhost:1521");
props.put("oracle.service.name", "freepdb1"); //name of the service running on the database instance
// location for ojdbc.properties file where user and password properties are saved
props.put("oracle.net.tns_admin",".");
/*
//Option 2: Connect to Oracle Autonomous Database using Oracle Wallet
//This option to be used when connecting to Oracle autonomous database instance on OracleCloud
props.put("security.protocol","SSL");
// location for Oracle Wallet, tnsnames.ora file and ojdbc.properties file
props.put("oracle.net.tns_admin",".");
props.put("tns.alias","Oracle23ai_high");
*/
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//Property to create a Transactional Producer
props.put("oracle.transactional.producer", "true");
producer = new KafkaProducer<String, String>(props);
int msgCnt = 100;
String jsonPayload = "{\"name\":\"Programmer"+msgCnt+"\",\"status\":\"classy\",\"catagory\":\"general\",\"region\":\"north\",\"title\":\"programmer\"}";
System.out.println(jsonPayload);
producer.initTransactions();
Connection conn = ((KafkaProducer<String, String> )producer).getDBConnection();
String topicName = "TXEQ";
// Produce 100 records in a transaction and commit.
try {
producer.beginTransaction();
boolean fail = false;
for( int i=0;i<msgCnt;i++) {
//Optionally set RecordHeaders
RecordHeader rH1 = new RecordHeader("CLIENT_ID", "FIRST_CLIENT".getBytes());
RecordHeader rH2 = new RecordHeader("REPLY_TO", "TXEQ_2".getBytes());
ProducerRecord<String, String> producerRecord =
new ProducerRecord<String, String>(topicName, i+"", jsonPayload);
producerRecord.headers().add(rH1).add(rH2);
try {
processRecord(conn, producerRecord);
} catch(Exception e) {
//Retry processRecord or abort the Okafka transaction and close the producer
fail = true;
break;
}
producer.send(producerRecord);
}
if(fail) // Failed to process the records. Abort Okafka transaction
producer.abortTransaction();
else // Successfully process all the records. Commit OKafka transaction
producer.commitTransaction();
System.out.println("Produced 100 messages.");
}catch( DisconnectException dcE) {
producer.close();
}catch (KafkaException e) {
producer.abortTransaction();
}
}
catch(Exception e)
{
System.out.println("Exception in Main " + e );
e.printStackTrace();
}
finally {
try {
if(producer != null)
producer.close();
}catch(Exception e)
{
System.out.println("Exception while closing producer " + e);
e.printStackTrace();
}
System.out.println("Producer Closed");
}
}
private static void processRecord(Connection conn, ProducerRecord<String, String> record) throws Exception
{
//Application specific logic
}
}例: トランザクションOKafkaコンシューマ
import java.util.Properties;
import java.sql.Connection;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.oracle.okafka.clients.consumer.KafkaConsumer;
public class TransactionalConsumerOKafka {
// Dummy implementation of ConsumerRebalanceListener interface
// It only maintains the list of assigned partitions in assignedPartitions list
static class ConsumerRebalance implements ConsumerRebalanceListener {
public List<TopicPartition> assignedPartitions = new ArrayList();
@Override
public synchronized void onPartitionsAssigned(Collection<TopicPartition> partitions) {
System.out.println("Newly Assigned Partitions:");
for (TopicPartition tp :partitions ) {
System.out.println(tp);
assignedPartitions.add(tp);
}
}
@Override
public synchronized void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.println("Revoked previously assigned partitions. ");
for (TopicPartition tp :assignedPartitions ) {
System.out.println(tp);
}
assignedPartitions.clear();
}
}
public static void main(String[] args) {
Properties props = new Properties();
// Option 1: Connect to Oracle Database with database username and password
props.put("security.protocol","PLAINTEXT");
//IP or Host name where Oracle Database 23ai is running and Database Listener's Port
props.put("bootstrap.servers", "localhost:1521");
props.put("oracle.service.name", "freepdb1"); //name of the service running on the database instance
// location for ojdbc.properties file where user and password properties are saved
props.put("oracle.net.tns_admin",".");
/*
//Option 2: Connect to Oracle Autonomous Database using Oracle Wallet
//This option to be used when connecting to Oracle autonomous database instance on OracleCloud
props.put("security.protocol","SSL");
// location for Oracle Wallet, tnsnames.ora file and ojdbc.properties file
props.put("oracle.net.tns_admin",".");
props.put("tns.alias","Oracle23ai_high");
*/
//Consumer Group Name
props.put("group.id" , "CG1");
props.put("enable.auto.commit","false");
// Maximum number of records fetched in single poll call
props.put("max.poll.records", 10);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
Consumer<String , String> consumer = new KafkaConsumer<String, String>(props);
ConsumerRebalanceListener rebalanceListener = new ConsumerRebalance();
consumer.subscribe(Arrays.asList("TXEQ"), rebalanceListener);
int expectedMsgCnt = 100;
int msgCnt = 0;
Connection conn = null;
boolean fail = false;
try {
while(true) {
try {
//Consumes records from the assigned partitions of 'TXEQ' topic
ConsumerRecords <String, String> records = consumer.poll(Duration.ofMillis(10000));
if (records.count() > 0 )
{
conn = ((KafkaConsumer<String, String>)consumer).getDBConnection();
fail = false;
for (ConsumerRecord<String, String> record : records)
{
System.out.printf("partition = %d, offset = %d, key = %s, value =%s\n ", record.partition(), record.offset(), record.key(), record.value());
for(Header h: record.headers())
{
System.out.println("Header: " +h.toString());
}
try {
processRecord(conn, record);
} catch(Exception e) {
fail = true;
break;
}
}
if(fail){
conn.rollback();
}
else {
msgCnt += records.count();
consumer.commitSync();
}
if(msgCnt >= (expectedMsgCnt )) {
System.out.println("Received " + msgCnt + " Expected " + expectedMsgCnt +". Exiting Now.");
break;
}
}
else {
System.out.println("No Record Fetched. Retrying in 1 second");
Thread.sleep(1000);
}
}catch(Exception e)
{
System.out.println("Exception while consuming messages: " + e.getMessage());
throw e;
}
}
}catch(Exception e)
{
System.out.println("Exception from OKafka consumer " + e);
e.printStackTrace();
}finally {
System.out.println("Closing OKafka Consumer. Received "+ msgCnt +" records.");
consumer.close();
}
}
private static void processRecord(Connection conn, ConsumerRecord<String, String> record)
{
//Application specific logic to process the message
}
}
例: OKafka消費-変換-生成
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Future;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.oracle.okafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.header.Header;
import org.oracle.okafka.clients.consumer.KafkaConsumer;
public class TransactionalConsumerProducer {
static int msgNo =0;
static PreparedStatement instCStmt = null;
static PreparedStatement instPStmt = null;
public static void main(String[] args) {
Properties commonProps = new Properties();
Properties cProps = new Properties();
Properties pProps =new Properties();
// Option 1: Connect to Oracle Database with database username and password
commonProps.put("security.protocol","PLAINTEXT");
//IP or Host name where Oracle Database 23ai is running and Database Listener's Port
commonProps.put("bootstrap.servers", "localhost:1521");
commonProps.put("oracle.service.name", "freepdb1"); //name of the service running on the database instance
// directory location where ojdbc.properties file is stored which contains user and password properties
commonProps.put("oracle.net.tns_admin",".");
/*
//Option 2: Connect to Oracle Autonomous Database using Oracle Wallet
//This option to be used when connecting to Oracle autonomous database instance on OracleCloud
commonProps.put("security.protocol","SSL");
// location for Oracle Wallet, tnsnames.ora file and ojdbc.properties file
commonProps.put("oracle.net.tns_admin",".");
commonProps.put("tns.alias","Oracle23ai_high");
*/
cProps.putAll(commonProps);
pProps.putAll(commonProps);
//Consumer Group Name
cProps.put("group.id" , "CG1");
cProps.put("enable.auto.commit","false");
// Maximum number of records fetched in single poll call
cProps.put("max.poll.records", 10);
cProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
cProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
pProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
pProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
pProps.put("oracle.transactional.producer", "true");
Consumer<String , String> consumer = new KafkaConsumer<String, String>(cProps);
ConsumerRebalanceListener rebalanceListener = new ConsumerRebalance();
consumer.subscribe(Arrays.asList("TXEQ"), rebalanceListener);
int expectedMsgCnt = 100;
int msgCnt = 0;
Connection conn = null;
Producer<String, String> producer = null;
try {
conn = ((KafkaConsumer<String, String>)consumer).getDBConnection();
producer = new KafkaProducer<String,String>(pProps, conn);
producer.initTransactions();
while(true) {
try {
//Consumes records from the assigned partitions of 'TXEQ' topic
ConsumerRecords <String, String> records = consumer.poll(Duration.ofMillis(10000));
if(records != null && records.count() > 0) {
msgCnt += records.count();
producer.beginTransaction();
boolean fail =false;
for (ConsumerRecord<String, String> record : records) {
ProducerRecord<String,String> pr = null;
try {
String outRecord = processConsumerRecord(conn, record);
pr = new ProducerRecord<String,String>("TXEQ_2", record.key(), outRecord);
processProducerRecord(conn, pr);
}catch(Exception e)
{
// Stop processing of this batch
fail =true;
break;
}
producer.send(pr);
}
if(fail) {
//Abort consumed and produced records along with any DML operations done using connection object.
//Next consumer.poll will fetch the same records again.
producer.abortTransaction();
}
else {
//Commit consumed and produced records along with any DML operations done using connection object
producer.commitTransaction();
}
}
else {
System.out.println("No Record Fetched. Retrying in 1 second");
Thread.sleep(1000);
}
if(msgCnt >= expectedMsgCnt )
{
System.out.println("Received " + msgCnt + " Expected " + expectedMsgCnt +". Exiting Now.");
break;
}
}catch(DisconnectException dcE) {
System.out.println("Disconnect Exception while committing or aborting records "+ dcE);
throw dcE;
}
catch(KafkaException e)
{
System.out.println("Re-triable Exception while committing records "+ e);
producer.abortTransaction();
}
catch(Exception e)
{
System.out.println("Exception while processing records " + e.getMessage());
throw e;
}
}
}catch(Exception e)
{
System.out.println("Exception from OKafka consumer " + e);
e.printStackTrace();
}finally {
System.out.println("Closing OKafka Consumer. Received "+ msgCnt);
producer.close();
consumer.close();
}
}
static String processConsumerRecord(Connection conn, ConsumerRecord <String, String> record) throws Exception
{
//Application specific logic to process the record
System.out.println("Received: " + record.partition() +"," + record.offset() +":" + record.value());
return record.value();
}
static void processProducerRecord(Connection conn, ProducerRecord <String, String> records) throws Exception
{
//Application specific logic to process the record
}
static void processRecords(Producer<String,String> porducer, Consumer<String,String> consumer, ConsumerRecords <String, String> records) throws Exception
{
Connection conn = ((KafkaProducer<String,String>)porducer).getDBConnection();
String jsonPayload = null;
ProducerRecord<String,String> pr = null;
Future<RecordMetadata> lastFuture = null;
for (ConsumerRecord<String, String> record : records)
{
msgNo++;
System.out.println("Processing " + msgNo + " record.value() " + record.value());
System.out.printf("partition = %d, offset = %d, key = %s, value =%s\n ", record.partition(), record.offset(), record.key(), record.value());
for(Header h: record.headers())
{
System.out.println("Header: " +h.toString());
}
jsonPayload = "{\"name\":\"Programmer"+msgNo+"\",\"status\":\"classy\",\"catagory\":\"general\",\"region\":\"north\",\"title\":\"programmer\"}";
pr = new ProducerRecord<String,String>("KTOPIC1", record.key(), jsonPayload);
lastFuture = porducer.send(pr);
RecordMetadata metadata = lastFuture.get();
}
}
// Dummy implementation of ConsumerRebalanceListener interface
// It only maintains the list of assigned partitions in assignedPartitions list
static class ConsumerRebalance implements ConsumerRebalanceListener {
public List<TopicPartition> assignedPartitions = new ArrayList<TopicPartition>();
@Override
public synchronized void onPartitionsAssigned(Collection<TopicPartition> partitions) {
System.out.println("Newly Assigned Partitions:");
for (TopicPartition tp :partitions ) {
System.out.println(tp);
assignedPartitions.add(tp);
}
}
@Override
public synchronized void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.println("Revoked previously assigned partitions. ");
for (TopicPartition tp :assignedPartitions ) {
System.out.println(tp);
}
assignedPartitions.clear();
}
}
}
ノート:
-
KafkaAdminインタフェースを使用して作成されたトピックには、KafkaProducerまたはKafkaConsumerインタフェースでのみアクセスできます。
-
KafkaProducerは、通常のJMSトランザクション・イベント・キューのトピック/キューにレコードを送信できます。ただし、KafkaConsumerは、KafkaAdminインタフェースまたは
DBMS_AQADM.CREATE_DATABASE_KAFKA_TOPICプロシージャを使用して作成されたトピックからのみレコードを消費できます。
TxEventQのKafka REST API
TxEventQ REST APIを使用すると、トピックおよびパーティションから一般的な操作を生成および消費でき、Oracle DatabaseでOracle REST Data Services (ORDS)を使用して実装されます。一般的な操作には、トピックの作成と削除、メッセージの生成と消費、トピック上のコンシューマ・ラグの取得、オフセットへの検索などのための多くの操作APIが含まれます。
Kafkaの次の3つのAPIにより、TxEventQはKafkaデプロイメントと共存できるようになり、トランザクション送信ボックス、JMSメッセージングおよびデータベース内のpub/sub、およびOracle Databaseのイベント・キューへの高スループットのイベント・ストリーミングの利点が提供されます。
関連項目:
Oracle REST Data Services APIドキュメントのOracle Transactional Event Queues RESTエンドポイント
TxEventQ対応のKafkaプロデューサ実装の概要
プロデューサAPIにより、KafkaアプリケーションはメッセージをOracle Transactional Event Queues (TxEventQ)に発行できるようになります。Kafkaアプリケーションでは、Oracle固有のプロパティ(bootstrap.servers、oracle.servicenameおよびoracle.net.tns_admin)を指定する必要があります。これらのプロパティの詳細は、構成の項で説明します。これらのプロパティは、データベース接続を設定し、TxEventQにメッセージを生成するために使用されます。現在のリリースでは、OracleのKafkaProducerの実装はプロデューサAPIのサブセットのみをサポートしています。
内部的には、Oracle Kafkaプロデューサ・オブジェクトが、Oracle TxEventQにメッセージをパブリッシュするために使用されるAQ JMSプロデューサ・オブジェクトをカプセル化します。Apache Kafkaプロデューサと同様に、各プロデューサのsend()コールでは、そのトピックとパーティションに基づいて、Kafkaレコードがバッチに追加されます。Apache Kafkaの内部アルゴリズムに基づいて、バックグラウンド・スレッドがバッチ全体をOracle TxEventQにパブリッシュします。
次のKafkaProducer APIは、Oracle Database 23aiでサポートされています。
-
コンストラクタ:
KafkaProducer: OKafkaプロデューサおよび内部サポート・オブジェクトを作成します。アプリケーションは、使用可能な4つのコンストラクタのいずれかを使用して、OKafkaプロデューサを作成できます。各コンストラクタには、引数としてOracle接続オブジェクトを使用するオーバーロードされたバージョンがあります。アプリケーションが事前作成された接続オブジェクトを渡すと、OKafkaプロデューサはそれを使用してOracle Transaction Event Queueにレコードを送信します。OKafkaプロデューサで外部データベース接続を使用できるようにするには、アプリケーションでoracle.transactional.producerプロパティをtrueに設定する必要があります。 -
メソッド:
-
send(ProducerRecord)、send(ProducerRecord, Callback):sendメソッドは、非同期的にメッセージをTxEventQにパブリッシュします。このメソッドは、送信を待機しているレコードのバッファーにKafkaレコードが格納された直後に復帰します。バッファがいっぱいの場合、送信コールは最大max.block.msの間ブロックされます。レコードは、AQ JMSを使用してトピックに発行されます。このメソッドは、レコードのパーティション、オフセットおよびパブリッシュ・タイムスタンプを含む
Future<RecordMetadata>を返します。send(ProducerRecord)とsend(ProducerRecord, Callback)の両方のバージョンがサポートされます。 -
getDBConnection: このメソッドは、このKafkaProducer.OKafkaで使用されるデータベース接続を返します。このメソッドを使用してデータベース接続をフェッチするには、プロデューサ・プロパティoracle.transactional.producerをtrueに設定する必要があります。 -
close: プロデューサを閉じてメモリーを解放します。また、Oracle Databaseへの内部接続も閉じます。
-
-
クラス
-
ProducerRecord: Kafkaプラットフォームでメッセージを表すクラス。Kafka APIライブラリは、ProducerRecordをTxEventQプラットフォームのJMS BytesMessageに変換します。 -
RecordMetadata: Kafkaプラットフォームのレコードのトピック、パーティション、オフセット、タイムスタンプなどのメタデータが含まれます。TxEventQに関連する値が割り当てられます。TxEventQのメッセージIDは、RecordMetadataのオフセットに変換されます。 -
Callbackインタフェース: レコードがKafkaトピックに正常にパブリッシュされたときに1回実行されるコールバック関数。
-
パーティショナ・インタフェース: メッセージの
Keyをトピックのパーティション番号にマップするメソッドを定義します。パーティション番号は、TxEventQのストリームIDと同様のものです。
-
-
プロパティ
-
key.serializerおよびvalue.serializer: キーとペイロードをそれぞれバイト配列に変換します。 -
acks: Kafka APIの場合、acksプロパティに関連する値はallのみです。ユーザーが設定したその他のフィールドは無視されます。 -
linger.ms: 送信側スレッドがTxEventQのレコードをパブリッシュするまでに待機する時間(ミリ秒単位)。 -
batch.size: バッチ処理されるレコードの合計サイズ(バイト単位)。送信側スレッドは、このサイズになるまで待機してからTxEventQのレコードをパブリッシュします。 -
buffer.memory: アキュムレータが保持できる合計メモリー(バイト単位)。 -
max.block.ms: アキュムレータでbuffer.memoryサイズがいっぱいになったときに、send()メソッドがメモリー不足エラーを受信できるようになるまでmax.block.msの時間待機します。 -
retries: このプロパティにより、プロデューサは一時的な障害の発生時にレコードを再送信できます。この値は、バッチ当たりの再試行回数を制限します。 -
retry.backoff.ms: 特定のトピック・パーティションに対する失敗した要求を再試行するまでに待機する時間。これにより、一部の障害シナリオで発生する隙間のないループによるリクエストの繰り返し送信を回避します -
bootstrap.servers: データベースのインスタンスを実行しているマシンのIPアドレスとポート。 -
enable.idempotence: TrueまたはFalse。冪等プロデューサによって、OKafkaの配信セマンティクスが少なくとも1回から必ず1回の配信に強化されます。特定のプロデューサでは、再試行による重複は発生しなくなります。 -
oracle.transactional.producer: TrueまたはFalse。このプロパティは、トランザクションOKafkaプロデューサを作成します。トランザクション・プロデューサは、アプリケーションが複数のパーティションおよびトピックにアトミックにメッセージを送信できるようにします。また、アプリケーションが同じトランザクション内で任意のDML操作を実行することもできるようにします。トランザクション・プロデューサは、
getDBConnection()メソッドを使用して、OracleのTransactional Event Queueブローカにレコードを送信するために使用されるデータベース接続をフェッチできます。アプリケーションは、この接続を使用して、DML操作を実行できます。commitTransaction()メソッドは、現在のトランザクション内で実行されたDML操作および送信操作をアトミックにコミットします。同様に、abortTransaction()は、DML操作をアトミックにロールバックし、現在のトランザクション内で送信された生成済レコードを中止します。トランザクション・プロデューサはスレッド・セーフではありません。アプリケーションは、トランザクション・プロデューサの同時アクセスを管理する必要があります。トランザクション・プロデューサはバッチ処理のメリットを得られません。各メッセージは、個別のリクエストでOracle Transactional Event Queueブローカに送信されます。
-
TxEventQ対応のKafkaコンシューマ実装の概要
コンシューマAPIにより、アプリケーションはTransactional Event Queue (TxEventQ)からのデータのストリームを読み取れるようになります。TxEventQ対応のKafkaコンシューマはAQ JMS APIを使用し、Oracle TxEventQからのメッセージをバッチで利用するためにJDBCドライバを使用します。Oracle Kafkaの場合、Transactional Event Queueからメッセージをデキューすることで、トピックからのメッセージを消費できます。
Apache Kafkaと同様に、TxEventQの実装では、コンシューマ・グループ(サブスクライバ)に多数のコンシューマ・インスタンス(サブスクライバに対して消費している一意のデータベース・セッション)が含まれる場合があります。コンシューマ・グループごとに、一意のグループID (サブスクライバ名)があります。各コンシューマ・インスタンスは、bootstrap.serversプロパティで指定されるOracle Databaseに対する単一の接続/セッションを内部的に維持します。Oracle Database 23aiでは、コンシューマ・グループ・リバランスのKafka APIサポートが導入されています。トピックのパーティションは、同じコンシューマ・グループの2つのコンシューマに同時にトピックの同じパーティションが割り当てられないように、コンシューマ・グループのアクティブなコンシューマ間に分配されます。新しいコンシューマがコンシューマ・グループに参加したり、既存のコンシューマがグループから退出すると、パーティションはアクティブなコンシューマ間で再分配されます。
Kafka APIの23aiリリースでは、コンシューマは1つのトピックにのみサブスクライブできます。
Oracle Database 23aiでは、次のKafkaConsumer APIがサポートされています。
-
コンストラクタ:
KafkaConsumer: アプリケーションがキー・ベースのTxEventQからのメッセージを利用できるようになるコンシューマを作成します。作成された内部クライアント側TxEventQオブジェクトは、クライアント・アプリケーションから認識されません。KafkaConsumerコンストラクタのすべてのバリエーションが、Oracle Database 23aiでサポートされています。 -
メソッド:
-
Subscribe: このメソッドは、サブスクライブ先のトピックのリストを受け取ります。Oracle Database 23aiでは、リストの最初のトピックのみがサブスクライブされます。リストのサイズが1を超えると、例外がスローされます。このメソッドは、TxEventQサーバー側に、サブスクライバ名としてグループIDが設定された永続サブスクライバを作成します。アプリケーションは、ConsumerRebalanceListenerインタフェースを実装し、実装されたクラスのオブジェクトをサブスクライブ・メソッドに渡すこともできます。これにより、コンシューマは、パーティションが取り消されたとき、または割り当てられたときにコールバックを実行できます。 -
Poll:pollメソッドは、TxEventQの割当て済パーティションからメッセージのバッチを返します。このメソッドは、サブスクライバに対応するキー・ベースのTxEventQからメッセージをデキューしようとします。TxEventQは、AQ JMSの配列デキューAPIを使用してキューからメッセージのバッチを受信します。バッチのサイズは、Kafkaクライアント・アプリケーションで設定したパラメータmax.poll.recordsによって決まります。Pollは、引数としてミリ秒単位の時間を受け取ります。配列デキューのAQ JMS APIは、このタイムアウトをデキュー・オプションとしてTxEventQサーバーに渡し、デキュー・コールを実行できます。これにより、完全な配列バッチが完了していない場合はタイムアウトまでメッセージを待機します。コンシューマによって初めてpollが起動されると、コンシューマ・グループの存続しているすべてのコンシューマに対してコンシューマ・リバランスがトリガーされます。コンシューマ・リバランスの終了時に、存続しているすべてのコンシューマにトピック・パーティションが割り当てられ、後続のpollリクエストでは、割り当てられたパーティションからのみメッセージがフェッチされます。
アプリケーションは、
ConsumerRebalanceListenerインタフェースおよびpartition.assignment.strategy構成を使用してリバランスに参加し、影響を与えることができます。partition.assignment.strategy構成を使用すると、アプリケーションはコンシューマ・ストリームにパーティションを割り当てるための戦略を選択できます。OKafkaは、Apache Kafka 2.8.0ドキュメントに記載されている、この構成パラメータのすべての値をサポートしています。この構成のデフォルト値は
org.oracle.okafka.clients.consumer.TXEQAssignorで、Oracle RACを認識し、Oracle TxEventQからのスループットを向上させるのに最適な戦略が実装されています。この戦略は、ライブ・セッション間でパーティションを分散しながら、パーティションの公平な分散とメッセージのローカル消費を優先します。
ConsumerRebalanceListenerを使用すると、アプリケーションは、パーティションが取り消されたとき、またはコンシューマに割り当てられたときにコールバックを起動できます。データベース・ビュー
USER_QUEUE_PARTITION_ASSIGNMENT_TABLEにより、開発者は存続しているコンシューマ間のパーティションの現在の分散を表示できます。 -
commitSync: すべての処理済メッセージをコミットします。Oracle Database 23aiでは、オフセットに対するコミットはサポートされていません。この呼び出しは、TxEventQからのすべての処理済メッセージをコミットするデータベースのcommitを直接呼び出します。 -
commitAsync: この呼び出しは、commitSyncに変換されます。引数として渡されたコールバック関数は、コミットが正常に完了したときに1回実行されます。 -
Unsubscribe: コンシューマがサブスクライブしていたトピックのサブスクライブを解除します。コンシューマは、サブスクライブが解除されたトピックからのメッセージを処理できなくなります。この呼び出しによって、TxEventQメタデータからサブスクライバ・グループが削除されることはありません。他のコンシューマ・アプリケーションは、同じコンシューマ・グループに対して消費を続行できます。 -
getDBConnection: Oracle Transactional Event Queueからレコードを消費するために使用されるOracleデータベース接続を取得します。 -
close: コンシューマをクローズして、サブスクライブしていたトピックのサブスクライブを解除します。
-
-
Class:
ConsumerRecord: Kafkaプラットフォームで消費されたレコードを表すクラス。Kafka APIは、TxEventQからAQ JMSメッセージを受信し、それぞれをConsumerRecordに変換してアプリケーションに配信します。 -
プロパティ:
-
auto.offset.reset: このコンシューマ・グループに対する初期オフセットが見つからない場合、このプロパティの値は、トピック・パーティションの先頭からメッセージを消費するか、新しいメッセージのみを消費するかを制御します。このプロパティとその使用方法の値は、次のとおりです:-
earliest: トピック・パーティションの先頭から消費します。 -
latest: トピック・パーティションの最後から消費します(デフォルト)。 -
none: コンシューマ・グループにオフセットが存在しない場合は例外をスローします。
-
-
key.deserializerおよびvalue.deserializer: Oracle Kafkaメッセージング・プラットフォームの場合、キーおよび値はOracleのTxEventQのJMSメッセージにバイト配列として格納されます。消費時には、これらのバイト配列はそれぞれkey.deserializerおよびvalue.deserializerを使用してキーおよび値にデシリアライズされます。これらのプロパティを使用すると、アプリケーションはバイト配列形式で格納されているKeyおよびValueを、アプリケーション指定のデータ型に変換できます。 -
group.id: Kafkaトピックからのメッセージが処理されるコンシューマ・グループの名前です。このプロパティは、キー・ベースのTxEventQの永続サブスクライバ名として使用されます。 max.poll.records: Oracle TxEventQサーバーからの単一の配列デキュー呼出しでフェッチするレコードの最大数。-
enable.auto.commit: 指定した間隔で処理済メッセージの自動コミットを有効にします。 -
auto.commit.interval.ms: メッセージの自動コミットの間隔(ミリ秒単位)。 -
bootstrap.servers: データベース・インスタンスを実行しているマシンのIPアドレスとポート。
-
TxEventQ対応のKafka Admin実装の概要
Kafka管理APIにより、アプリケーションで管理タスク(トピックの作成、トピックの削除、トピックへのパーティションの追加など)を実行できるようになります。Oracle Database 23aiリリースは、次の管理APIのみをサポートします。
-
メソッド
-
create(props)およびcreate(config): 渡されたパラメータを使用するKafkaAdminクラスのオブジェクトを作成します。このメソッドは、それ以降の操作に使用するデータベース・セッションを作成します。アプリケーションは、接続構成の項の説明に従って、接続構成パラメータを指定する必要があります。 -
createTopics(): アプリケーションでKafkaトピックを作成できます。これにより、ユーザーのスキーマにTxEventQが作成されます。 -
close(): データベース・セッションと管理クライアントをクローズします。 -
deleteTopic: Kafkaトピックを削除します。これにより、トピックが正常に削除されるとnullが返されます。それ以外の場合、メソッドは例外をスローします。このメソッドは、トピックが正常に削除されるかエラーが発生するまで戻りません。
-
-
クラス:
NewTopic: 新しいトピックの作成に使用するクラス。このクラスには、トランザクション・イベント・キューの作成に使用するパラメータが含まれています。
TxEventQのKafka REST API
TxEventQ REST APIを使用すると、トピックおよびパーティションから一般的な操作を生成および消費でき、Oracle DatabaseでOracle REST Data Services (ORDS)を使用して実装されます。一般的な操作には、トピックの作成と削除、メッセージの生成と消費、トピック上のコンシューマ・ラグの取得、オフセットへの検索などのための多くの操作APIが含まれます。
Kafkaの次の3つのAPIにより、TxEventQはKafkaデプロイメントと共存できるようになり、トランザクション送信ボックス、JMSメッセージングおよびデータベース内のpub/sub、およびOracle Databaseのイベント・キューへの高スループットのイベント・ストリーミングの利点が提供されます。
関連項目:
Oracle REST Data Services APIドキュメントのOracle Transactional Event Queues RESTエンドポイント
TxEventQのKafkaコネクタ
Kafka SinkコネクタおよびSourceコネクタには、トランザクション・イベント・キューを作成するために、最低21cのOracle Databaseバージョンが必要です。アプリケーションを使用するには、最低3.1.0のバージョン番号のKafkaをサーバーにダウンロードし、インストールする必要があります。
関連項目:
詳細は、https://github.com/oracle/okafka/tree/master/connectorsを参照してください。
メッセージ転送の監視
Sink/Sourceコネクタのメッセージ転送は、Oracle TxEventQから監視できます。
関連項目:
Monitoring Transactional Event Queuesの監視: TxEventQ Monitorシステムを起動してエンキュー率/デキュー率、TxEventQの深さ、および詳細なDB/システム・レベルの統計情報を確認します。