5 Oracle Transactional Event QueuesのKafka API

Oracle Transactional Event Queue (TxEventQ)を使用すると、イベントベースのアプリケーションが簡単に実装できます。さらに、ScalaとJavaで作成されたオープンソースのストリーム処理ソフトウェア・プラットフォームであるApache Kafkaと高度に統合されています。このプラットフォームは、LinkedInによって開発され、Apache Software Foundationに寄贈されたものです。Kafka APIを使用するアプリは、透過的にOracle TxEventQの操作ができるようになります。さらに、Oracle TxEventQでは、TxEventQとKafkaの双方向の情報フローもサポートしているため、変更内容はほぼリアルタイムでTxEventQまたはKafkaですぐに利用できるようになります。

Apache Kafka Connectは、Kafkaとその他のシステムを統合するApache Kafkaに組み込まれているフレームワークです。Oracle TxEventQは、標準JMSパッケージおよび関連するJDBCのTransactionパッケージを提供することで、接続を確立してトランザクション・データ・フローを完了します。Oracle TxEventQは、標準のKafka JMSコネクタを構成することで、相互運用性を確立し、2つのメッセージング・システム間のデータ・フローを完成させます。

この章の内容は次のとおりです。

Apache Kafkaの概要

Apache Kafkaは、通信分散型イベント・ストリーミング・プラットフォームです。このプラットフォームは、水平方向のスケーラビリティと耐障害性を備えています。

Kafkaは、1つ以上のサーバーのクラスタにデプロイされます。各Kafkaクラスタには、トピックというカテゴリでレコードのストリームが格納されます。各レコードは、キー、値、およびタイムスタンプで構成されます。Kafka APIにより、アプリケーションはKafkaクラスタに接続し、Kafkaメッセージング・プラットフォームを使用できるようになります。

Transactional Event Queues対応のKafka Javaクライアント

Oracle Database 21cで、KafkaアプリケーションのOracle Databaseとの互換性が導入されました。Oracle Database 23cでは、KafkaアプリケーションとOracle Databaseの互換性がより洗練されています。これにより、Kafka JavaアプリケーションのTransactional Event Queues (TxEventQ)への移行が簡単になります。Kafka Java APIは、Oracleデータベース・サーバーに接続し、メッセージング・プラットフォームとしてTxEventQを使用できるようになりました。

図5-1 KafkaアプリケーションのTransactional Event Queueとの統合

図5-1の説明が続きます
「図5-1 KafkaアプリケーションのTransactional Event Queueとの統合」の説明

この図は、kafka-clients-2.8.0.jarファイルに依存するKafkaのJava APIのOracle固有の実装が含まれているKafka APIライブラリを示しています。この実装では、Oracle Databaseと通信するためにJDBCドライバを使用するAQ-JMS APIを内部的に呼び出します。

開発者は、Kafkaを使用する既存のJavaアプリケーションを、okafka.jarを使用してOracleデータベースに移行できるようになりました。このクライアント側ライブラリを使用すると、KafkaアプリケーションはKafkaクラスタのかわりにOracle Databaseデータベースに接続して、透過的にTxEventQのメッセージング・プラットフォームを使用できるようになります。

Transactional Event Queues対応のKafka Javaクライアントの構成

前提条件

次に、Oracle DatabaseのTxEventQに対応するKafka Javaクライアントを構成および実行するための前提条件を示します。

  1. データベース・ユーザーを作成します。

  2. ユーザーに次の権限を付与します。

    ノート:

    デフォルト表領域で無制限の割当て制限を付与するかわりに、表領域の特定の割当て制限をデータベース・ユーザーに割り当てたり付与することが一般的です。表領域を作成し、次のコマンドを使用して特定の表領域の割当て制限をデータベース・ユーザーに付与できます。

    ALTER USER user QUOTA  UNLIMITED /* or size-clause */ on tablespace_name
    • GRANT EXECUTE on DBMS_AQ to user

    • GRANT EXECUTE on DBMS_AQADM to user

    • GRANT SELECT on GV_$SESSION to user;

    • GRANT SELECT on V_$SESSION to user;

    • GRANT SELECT on GV_$INSTANCE to user;

    • GRANT SELECT on GV_$LISTENER_NETWORK to user;

    • GRANT SELECT on GV_$PDBS to user;

    • GRANT SELECT on USER_QUEUE_PARTITION_ASSIGNMENT_TABLE to user;

    • exec DBMS_AQADM.GRANT_PRIV_FOR_RM_PLAN('user');

  3. TxEventQを使用するための適切なデータベース構成パラメータを設定します。

    SET STREAMS_POOL_SIZE=400M

    ノート:

    ワークロードに基づいてサイズを適切に設定します。STREAMS_POOL_SIZEはAutonomous Database Sharedに設定できません。これは自動的に構成されます。設定しても無視されます。

  4. LOCAL_LISTENERデータベース・パラメータを設定します

    SET LOCAL_LISTENER= (ADDRESS=(PROTOCOL=TCP)(HOST=<HOST_NAME.DOMAIN_NAME/ IP> )(PORT=<PORT NUMBER>))

接続構成

Kafka APIライブラリは、JDBC Thinドライバを使用してOracle Databaseに接続します。この接続を設定するために、Kafkaアプリケーションはプレーン・テキストでユーザー名とパスワードを提供できます。また、アプリケーションでSSLを構成することもできます。OCIのOracle Autonomous Database (ADB)に対してKafkaアプリケーションを実行する場合は、SSL構成のみがサポートされます。その他のデプロイメントの場合は、Oracle Databaseへの接続にはPLAINTEXTまたはSSLを使用できます。

  • PLAINTEXT: このセキュリティ・プロトコルでは、Oracle DatabaseへのJDBCコネクションは、ojdbc.propertiesファイルのプレーン・テキストで指定したユーザー名とパスワードを使用したTCPプロトコルを使用して設定されます。

    PLAINTEXTプロトコルを使用するには、アプリケーションは次のプロパティを設定する必要があります。

    • oracle.service.name = <インスタンスで実行しているサービスの名前>

    • bootstrap.servers = <ホスト:ポート>

    • security.protocol=PLAINTEXT

    • oracle.net.tns_admin = <location of ojdbc.properties file>

    次のプロパティが ojdbc.propertiesファイルに存在している必要があります。

    • user = <nameofdatabaseuser>

    • password = <userpassword>

  • SSL: ATPデータベースへの接続にSSLセキュリティ・プロトコルを使用するには、次の追加ステップを実行します。

    1. SSLセキュリティのためのJDBC Thinドライバ接続の要件は、次のとおりです。
      • JDK8u162以降。

      • oraclepki.jarosdt_cert.jar、およびosdt_core.jar

      • 18.3以降のJDBC Thinドライバ

    2. Oracle Databaseインスタンスへの接続にJDBCのSSLセキュリティを利用するには、ユーザーは次のプロパティを指定する必要があります。JDBCは、2通りの方法でOracle DatabaseへのSSLで保護された接続をサポートします。

      • ウォレットの使用。ウォレットを使用するには:

        1. Oracleウォレットを使用するために必須の依存jarをクラスパスに追加します。

          oraclepki.jarosdt_cert.jarおよびosdt_core.jarのファイルをJDBC Thinドライバとともにダウンロードし、それらのjarをクラスパスに追加します。

        2. Oracle PKIプロバイダを有効にします

          SSLセキュリティを提供するためにSSOウォレット(cwallet.sso)を使用する場合は、ファイルjava.security (場所: $JRE_HOME/jre/lib/security/java.security)の末尾にOraclePKIProviderを追加します。たとえば:

                           security.provider.1=sun.security.provider.Sun
                           security.provider.2=sun.security.rsa.SunRsaSign
                           security.provider.3=com.sun.net.ssl.internal.ssl.Provider
                           security.provider.4=com.sun.crypto.provider.SunJCE
                           security.provider.5=sun.security.jgss.SunProvider
                           security.provider.6=com.sun.security.sasl.Provider
                           security.provider.7=oracle.security.pki.OraclePKIProvider

          SSLセキュリティのためにewallet.p12を使用する場合は、ファイルjava.securityでsunプロバイダの前にOraclePKIProviderを配置します。たとえば:

                          security.provider.1=sun.security.provider.Sun
                          security.provider.2=sun.security.rsa.SunRsaSign
                          security.provider.3=oracle.security.pki.OraclePKIProvider
                          security.provider.4=com.sun.net.ssl.internal.ssl.Provider
                          security.provider.5=com.sun.crypto.provider.SunJCE
                          security.provider.6=sun.security.jgss.SunProvider
                          security.provider.7=com.sun.security.sasl.Provider
        3. アプリケーションで次のプロパティを設定します。

                    
                        security.protocol=SSL
                        oracle.net.tns_admin=<location of tnsnames.ora file>
                        tns.alias=<alias of connection string in tnsnames.ora>
                        

          ojdbc.propertiesファイルで次のプロパティを設定します。このファイルは、oracle.net.tns_adminプロパティで指定された場所で使用可能である必要があります。

                        user(in smallletters)=nameofdatabaseuser
                        password(in smallletters)=userpassword
                        oracle.net.ssl_server_dn_match=true
                        oracle.net.wallet_location=“(SOURCE=(METHOD=FILE)
                                                   (METHOD_DATA=(DIRECTORY=/location../wallet_dbname)))”
      • Java KeyStoreでJDBCのSSLセキュリティを使用するには、アプリケーションに次のプロパティを指定します。

               security.protocol=SSL             
               oracle.net.tns_admin=<location of tnsnames.ora file>              
               tns.alias=<alias of connection string in tnsnames.ora>

        ojdbc.propertiesファイルで次のプロパティを設定します。このファイルは、oracle.net.tns_adminプロパティで指定された場所で使用可能である必要があります。

                      user(in smallletters)=nameofdatabaseuser
                      password(in smallletters)=userpassword
                      oracle.net.ssl_server_dn_match=true
                      javax.net.ssl.trustStore==${TNS_ADMIN}/truststore.jks
                      javax.net.ssl.trustStorePassword=password
                      javax.net.ssl.keyStore=${TNS_ADMIN}/keystore.jks
                      javax.net.ssl.keyStorePassword=password

        ノート:

        ATPからダウンロードしたウォレット内のtnsnames.oraファイルには、JDBCコネクションの確立に使用されるJDBC接続文字列が格納されています。

関連項目:

Oracle Database SQL言語リファレンスALTER USER

Kafkaクライアント・インタフェース

Kafkaアプリケーションは、Kafkaクラスタとの通信にプロデューサ、コンシューマ、および管理のAPIを主に使用します。このバージョンのTxEventQ対応Kafkaクライアントは、Apache Kafka 2.8.0のプロデューサ、コンシューマ、および管理のAPIとプロパティのサブセットのみをサポートしています。okafka.jarクライアント・ライブラリを使用すると、KafkaアプリケーションはOracle TxEventQプラットフォームを使用できるようになります。okafka.jarライブラリにはJRE 9以降が必要です。

最初にKafkaクライアントAPIの使用方法を簡単な例で示し、後でその詳細を説明します。

Kafka APIの例

例: Oracle Kafkaトピックの作成


import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.KafkaFuture;

import org.oracle.okafka.clients.admin.AdminClient;

public class SimpleAdminOKafka {

	public static void main(String[] args) {
		Properties props = new Properties();
		//IP or Host name where Oracle Database 23c is running and Database Listener's Port
		props.put("bootstrap.servers", "localhost:1521");

		//name of the service running on the database instance
		props.put("oracle.service.name", "freepdb1");
		props.put("security.protocol","PLAINTEXT");

		// location for ojdbc.properties file where user and password properties are saved
		props.put("oracle.net.tns_admin","."); 

		try (Admin admin = AdminClient.create(props)) {
			//Create Topic named TEQ with 10 Partitions.
			CreateTopicsResult result = admin.createTopics(
					Arrays.asList(new NewTopic("TEQ", 10, (short)0)));
			try {
				KafkaFuture<Void> ftr =  result.all();
				ftr.get();
			} catch ( InterruptedException | ExecutionException e ) {

				throw new IllegalStateException(e);
			}
			System.out.println("Closing  OKafka admin now");
		}
		catch(Exception e)
		{
			System.out.println("Exception while creating topic " + e);
			e.printStackTrace();
		}
	}
}

例: KafkaコンシューマAPI


import java.util.Properties;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.oracle.okafka.clients.consumer.KafkaConsumer;

public class SimpleConsumerOKafka {

	// Dummy implementation of ConsumerRebalanceListener interface
	// It only maintains the list of assigned partitions in assignedPartitions list
	static class ConsumerRebalance implements ConsumerRebalanceListener {

		public List<TopicPartition> assignedPartitions = new ArrayList<>();
		
		@Override
		public synchronized void onPartitionsAssigned(Collection<TopicPartition>  partitions) { 
			System.out.println("Newly Assigned Partitions:");
			for (TopicPartition tp :partitions ) {
				System.out.println(tp);
				assignedPartitions.add(tp);
			}
		} 
		
		@Override
		public synchronized void onPartitionsRevoked(Collection<TopicPartition> partitions) {
			System.out.println("Revoked previously assigned partitions. ");
			for (TopicPartition tp :assignedPartitions ) {
				System.out.println(tp);
			}
			assignedPartitions.clear();
		}
	}

	public static void main(String[] args) {
		//System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "TRACE");
		Properties props = new Properties();
		//IP or Host name where Oracle Database 23c is running and Database Listener's Port
		props.put("bootstrap.servers", "localhost:1521");
		
		//name of the service running on the database instance
		props.put("oracle.service.name", "freepdb1");
		props.put("security.protocol","PLAINTEXT");
		
		// location for ojdbc.properties file where user and password properties are saved
		props.put("oracle.net.tns_admin","."); 
		
		//Consumer Group Name
		props.put("group.id" , "CG1");
		props.put("enable.auto.commit","false");
		
		// Maximum number of records fetched in single poll call
		props.put("max.poll.records", 2000);

		props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

		Consumer<String , String> consumer = new KafkaConsumer<String, String>(props);		
		ConsumerRebalanceListener rebalanceListener = new ConsumerRebalance();
		
		//Subscribe to a single topic named 'TEQ'.
		consumer.subscribe(Arrays.asList("TEQ"), rebalanceListener);
		
		int expectedMsgCnt = 40000;
		int msgCnt = 0;
		Instant startTime = Instant.now();
		try {
			while(true) {
				try {
					//Consumes records from the assigned partitions of 'TEQ' topic
					ConsumerRecords <String, String> records = consumer.poll(Duration.ofMillis(10000));
					//Print consumed records
					for (ConsumerRecord<String, String> record : records)	
					{
						System.out.printf("partition = %d, offset = %d, key = %s, value =%s\n ", record.partition(), record.offset(), record.key(), record.value());
						for(Header h: record.headers())
						{
							System.out.println("Header: " +h.toString());
						} 
					}
					//Commit all the consumed records
					if(records != null && records.count() > 0) {
						msgCnt += records.count();
						System.out.println("Committing records " + records.count());
						try {
							consumer.commitSync();
						}catch(Exception e)
						{
							System.out.println("Exception in commit " + e.getMessage());
							continue;
						}
						if(msgCnt >= expectedMsgCnt )
						{
							System.out.println("Received " + msgCnt + " Expected " + expectedMsgCnt +". Exiting Now.");
							break;
						}
					}
					else {
						System.out.println("No Record Fetched. Retrying in 1 second");
						Thread.sleep(1000);
					}

				}catch(Exception e)
				{
					System.out.println("Inner Exception " + e.getMessage());
					throw e;
				}			
			}
		}catch(Exception e)
		{
			System.out.println("Exception from OKafka consumer " + e);
			e.printStackTrace();
		}finally {
			long runDuration = Duration.between(startTime, Instant.now()).toMillis();
			System.out.println("Closing OKafka Consumer. Received "+ msgCnt +" records. Run Duration " + runDuration);
			consumer.close();
		}
	}
}

例: KafkaプロデューサAPI


import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.internals.RecordHeader;

import org.oracle.okafka.clients.producer.KafkaProducer;

import java.time.Duration;
import java.time.Instant;
import java.util.Properties;
import java.util.concurrent.Future;

public class SimpleProducerOKafka {
	public static void main(String[] args) {
		try {
			Properties props = new Properties();
			//IP or Host name where Oracle Database 23c is running and Database Listener's Port
			props.put("bootstrap.servers", "localhost:1521");
			
			//name of the service running on the database instance
			props.put("oracle.service.name", "freepdb1");
			props.put("security.protocol","PLAINTEXT");
			
			// location for ojdbc.properties file where user and password properties are saved
			props.put("oracle.net.tns_admin","."); 

			
			props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
			props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");		
			
			String baseMsg = "This is a test message ";
			// Creates OKafka Producer
			Producer<String, String> producer = new KafkaProducer<String, String>(props);
			
			Future<RecordMetadata> lastFuture = null;
			int msgCnt = 40000;
			Instant startTime = Instant.now();
			
			//Headers, common for all records
			RecordHeader rH1 = new RecordHeader("CLIENT_ID", "FIRST_CLIENT".getBytes());
			RecordHeader rH2 = new RecordHeader("REPLY_TO", "REPLY_TOPIC_NAME".getBytes());
			
			//Produce 40000 messages into topic named "TEQ".
			for(int i=0;i<msgCnt;i++) {
				ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("TEQ", ""+i, baseMsg + i);
				producerRecord.headers().add(rH1).add(rH2);
				lastFuture =producer.send(producerRecord);
			}
			//Waits until the last message is acknowledged
			lastFuture.get();
			long runTime = Duration.between( startTime, Instant.now()).toMillis();
			System.out.println("Produced "+ msgCnt +" messages. Run Duration " + runTime);
			//Closes the OKafka producer
			producer.close();
		}		
		catch(Exception e)
		{
			System.out.println("Exception in Main " + e );
			e.printStackTrace();
		}
	}
}

例: Oracle Kafkaトピックの削除


import java.util.Collections;
import java.util.Properties;

import org.apache.kafka.clients.admin.Admin;

import org.oracle.okafka.clients.admin.AdminClient;

public class SimpleAdminDeleteTopic {

	public static void main(String[] args) {
		
		Properties props = new Properties();
		//IP or Host name where Oracle Database 23c is running and Database Listener's Port
		props.put("bootstrap.servers", "localhost:1521");

		//name of the service running on the database instance
		props.put("oracle.service.name", "freepdb1");
		props.put("security.protocol","PLAINTEXT");

		// location for ojdbc.properties file where user and password properties are saved
		props.put("oracle.net.tns_admin","."); 

		try (Admin admin = AdminClient.create(props)) {
			//Throws Exception if failed to delete the topic. Returns null on successful deletion.
			org.apache.kafka.clients.admin.DeleteTopicsResult delResult =
					admin.deleteTopics(Collections.singletonList("TEQ"));
			Thread.sleep(1000);
			System.out.println("Closing admin now");
		}
		catch(Exception e)
		{
			System.out.println("Exception while creating topic " + e);
			e.printStackTrace();
		}
	}

}

ノート:

  • KafkaAdminインタフェースを使用して作成されたトピックには、KafkaProducerまたはKafkaConsumerインタフェースでのみアクセスできます。

  • 同様に、KafkaProducerおよびKafkaConsumerインタフェースは、KafkaAdminインタフェースを使用して作成されたトピックからのみレコードを生成または消費できます。

TxEventQのKafka REST API

TxEventQ REST APIを使用すると、トピックおよびパーティションから一般的な操作を生成および消費でき、Oracle DatabaseでOracle REST Data Services (ORDS)を使用して実装されます。一般的な操作には、トピックの作成と削除、メッセージの生成と消費、トピック上のコンシューマ・ラグの取得、オフセットへの検索などのための多くの操作APIが含まれます。

Kafkaの次の3つのAPIにより、TxEventQはKafkaデプロイメントと共存できるようになり、トランザクション送信ボックス、JMSメッセージングおよびデータベース内のpub/sub、およびOracle Databaseのイベント・キューへの高スループットのイベント・ストリーミングの利点が提供されます。

関連項目:

Oracle REST Data Services APIドキュメントのOracle Transactional Event Queues RESTエンドポイント

TxEventQ対応のKafkaプロデューサ実装の概要

プロデューサAPIにより、KafkaアプリケーションはメッセージをOracle Transactional Event Queues (TxEventQ)に発行できるようになります。Kafkaアプリケーションでは、Oracle固有のプロパティ(bootstrap.serversoracle.servicenameおよびoracle.net.tns_admin)を指定する必要があります。これらのプロパティの詳細は、構成の項で説明します。これらのプロパティは、データベース接続を設定し、TxEventQにメッセージを生成するために使用されます。現在のリリースでは、OracleのKafkaProducerの実装はプロデューサAPIのサブセットのみをサポートしています。

内部的には、Oracle Kafkaプロデューサ・オブジェクトが、Oracle TxEventQにメッセージをパブリッシュするために使用されるAQ JMSプロデューサ・オブジェクトをカプセル化します。Apache Kafkaプロデューサと同様に、各プロデューサのsend()コールでは、そのトピックとパーティションに基づいて、Kafkaレコードがバッチに追加されます。Apache Kafkaの内部アルゴリズムに基づいて、バックグラウンド・スレッドがバッチ全体をOracle TxEventQにパブリッシュします。

次のKafkaProducer APIは、Oracle Database 23cでサポートされています。

  • コンストラクタ:

    KafkaProducer: プロシージャ・オブジェクトと内部AQ JMSオブジェクトを作成します。KafkaProducerクラスでは、4種類のコンストラクタが定義されています。これらはすべて、入力として構成パラメータを受け取ります。

  • メソッド:

    • send(ProducerRecord)、send(ProducerRecord, Callback):

      sendメソッドは、非同期的にメッセージをTxEventQにパブリッシュします。このメソッドは、送信を待機しているレコードのバッファーにKafkaレコードが格納された直後に復帰します。バッファがいっぱいの場合、送信コールは最大max.block.msの間ブロックされます。レコードは、AQ JMSを使用してトピックに発行されます。

      このメソッドは、レコードのパーティション、オフセットおよびパブリッシュ・タイムスタンプを含むFuture<RecordMetadata>を返します。send(ProducerRecord)send(ProducerRecord, Callback)の両方のバージョンがサポートされます。

    • close: プロデューサを閉じてメモリーを解放します。また、Oracle Databaseへの内部接続も閉じます。

  • クラス

    • ProducerRecord: Kafkaプラットフォームでメッセージを表すクラス。Kafka APIライブラリは、ProducerRecordをTxEventQプラットフォームのJMS BytesMessageに変換します。

    • RecordMetadata: Kafkaプラットフォームのレコードのトピック、パーティション、オフセット、タイムスタンプなどのメタデータが含まれます。TxEventQに関連する値が割り当てられます。TxEventQのメッセージIDは、RecordMetadataのオフセットに変換されます。

    • Callbackインタフェース: レコードがKafkaトピックに正常にパブリッシュされたときに1回実行されるコールバック関数。

    • パーティショナ・インタフェース: メッセージのKeyをトピックのパーティション番号にマップするメソッドを定義します。パーティション番号は、TxEventQのストリームIDと同様のものです。

  • プロパティ

    • key.serializerおよびvalue.serializer: キーとペイロードをそれぞれバイト配列に変換します。

    • acks: Kafka APIの場合、acksプロパティに関連する値はallのみです。ユーザーが設定したその他のフィールドは無視されます。

    • linger.ms: 送信側スレッドがTxEventQのレコードをパブリッシュするまでに待機する時間(ミリ秒単位)。

    • batch.size: バッチ処理されるレコードの合計サイズ(バイト単位)。送信側スレッドは、このサイズになるまで待機してからTxEventQのレコードをパブリッシュします。

    • buffer.memory: アキュムレータが保持できる合計メモリー(バイト単位)。

    • max.block.ms: アキュムレータでbuffer.memoryサイズがいっぱいになったときに、send()メソッドがメモリー不足エラーを受信できるようになるまでmax.block.msの時間待機します。

    • retries: このプロパティにより、プロデューサは一時的な障害の発生時にレコードを再送信できます。この値は、バッチ当たりの再試行回数を制限します。

    • retry.backoff.ms : 特定のトピック・パーティションに対する失敗した要求を再試行するまでに待機する時間。これにより、一部の障害シナリオで発生する隙間のないループによるリクエストの繰り返し送信を回避します

    • bootstrap.servers: データベースのインスタンスを実行しているマシンのIPアドレスとポート。

TxEventQ対応のKafkaコンシューマ実装の概要

コンシューマAPIにより、アプリケーションはTransactional Event Queue (TxEventQ)からのデータのストリームを読み取れるようになります。TxEventQ対応のKafkaコンシューマはAQ JMS APIを使用し、Oracle TxEventQからのメッセージをバッチで利用するためにJDBCドライバを使用します。Oracle Kafkaの場合、Transactional Event Queueからメッセージをデキューすることで、トピックからのメッセージを消費できます。

Apache Kafkaと同様に、TxEventQの実装では、コンシューマ・グループ(サブスクライバ)に多数のコンシューマ・インスタンス(サブスクライバに対して消費している一意のデータベース・セッション)が含まれる場合があります。コンシューマ・グループごとに、一意のグループID (サブスクライバ名)があります。各コンシューマ・インスタンスは、bootstrap.serversプロパティで指定されるOracle Databaseに対する単一の接続/セッションを内部的に維持します。Oracle Database 23cでは、コンシューマ・グループ・リバランスのKafka APIサポートが導入されています。トピックのパーティションは、同じコンシューマ・グループの2つのコンシューマに同時にトピックの同じパーティションが割り当てられないように、コンシューマ・グループのアクティブなコンシューマ間に分配されます。新しいコンシューマがコンシューマ・グループに参加したり、既存のコンシューマがグループから退出すると、パーティションはアクティブなコンシューマ間で再分配されます。

Kafka APIの23cリリースでは、コンシューマは1つのトピックにのみサブスクライブできます。

Oracle Database 23cでは、次のKafkaConsumer APIがサポートされています。

  • コンストラクタ: KafkaConsumer: アプリケーションがキー・ベースのTxEventQからのメッセージを利用できるようになるコンシューマを作成します。作成された内部クライアント側TxEventQオブジェクトは、クライアント・アプリケーションから認識されません。KafkaConsumerコンストラクタのすべてのバリエーションが、Oracle Database 23cでサポートされています。

  • メソッド:

    • Subscribe: このメソッドは、サブスクライブ先のトピックのリストを受け取ります。Oracle Database 23cでは、リストの最初のトピックのみがサブスクライブされます。リストのサイズが1を超えると、例外がスローされます。このメソッドは、TxEventQサーバー側に、サブスクライバ名としてグループIDが設定された永続サブスクライバを作成します。アプリケーションは、ConsumerRebalanceListenerインタフェースを実装し、実装されたクラスのオブジェクトをサブスクライブ・メソッドに渡すこともできます。これにより、コンシューマは、パーティションが取り消されたとき、または割り当てられたときにコールバックを実行できます。

    • Poll: pollメソッドは、TxEventQの割当て済パーティションからメッセージのバッチを返します。このメソッドは、サブスクライバに対応するキー・ベースのTxEventQからメッセージをデキューしようとします。TxEventQは、AQ JMSの配列デキューAPIを使用してキューからメッセージのバッチを受信します。バッチのサイズは、Kafkaクライアント・アプリケーションで設定したパラメータmax.poll.recordsによって決まります。Pollは、引数としてミリ秒単位の時間を受け取ります。配列デキューのAQ JMS APIは、このタイムアウトをデキュー・オプションとしてTxEventQサーバーに渡し、デキュー・コールを実行できます。これにより、完全な配列バッチが完了していない場合はタイムアウトまでメッセージを待機します。

      コンシューマによって初めてpollが起動されると、コンシューマ・グループの存続しているすべてのコンシューマに対してコンシューマ・リバランスがトリガーされます。コンシューマ・リバランスの終了時に、存続しているすべてのコンシューマにトピック・パーティションが割り当てられ、後続のpollリクエストでは、割り当てられたパーティションからのみメッセージがフェッチされます。

      アプリケーションは、ConsumerRebalanceListenerインタフェースおよびpartition.assignment.strategy構成を使用してリバランスに参加し、影響を与えることができます。

      partition.assignment.strategy構成を使用すると、アプリケーションはコンシューマ・ストリームにパーティションを割り当てるための戦略を選択できます。OKafkaは、Apache Kafka 2.8.0ドキュメントに記載されている、この構成パラメータのすべての値をサポートしています。

      この構成のデフォルト値はorg.oracle.okafka.clients.consumer.TXEQAssignorで、Oracle RACを認識し、Oracle TxEventQからのスループットを向上させるのに最適な戦略が実装されています。

      この戦略は、ライブ・セッション間でパーティションを分散しながら、パーティションの公平な分散とメッセージのローカル消費を優先します。

      ConsumerRebalanceListenerを使用すると、アプリケーションは、パーティションが取り消されたとき、またはコンシューマに割り当てられたときにコールバックを起動できます。

      データベース・ビューUSER_QUEUE_PARTITION_ASSIGNMENT_TABLEにより、開発者は存続しているコンシューマ間のパーティションの現在の分散を表示できます。

    • commitSync: すべての処理済メッセージをコミットします。Oracle Database 23cでは、オフセットに対するコミットはサポートされていません。この呼び出しは、TxEventQからのすべての処理済メッセージをコミットするデータベースのcommitを直接呼び出します。

    • commitAsync: この呼び出しは、commitSyncに変換されます。引数として渡されたコールバック関数は、コミットが正常に完了したときに1回実行されます。

    • Unsubscribe: コンシューマがサブスクライブしていたトピックのサブスクライブを解除します。コンシューマは、サブスクライブが解除されたトピックからのメッセージを処理できなくなります。この呼び出しによって、TxEventQメタデータからサブスクライバ・グループが削除されることはありません。他のコンシューマ・アプリケーションは、同じコンシューマ・グループに対して消費を続行できます。

    • close: コンシューマをクローズして、サブスクライブしていたトピックのサブスクライブを解除します。

  • Class: ConsumerRecord: Kafkaプラットフォームで消費されたレコードを表すクラス。Kafka APIは、TxEventQからAQ JMSメッセージを受信し、それぞれをConsumerRecordに変換してアプリケーションに配信します。

  • プロパティ:

    • auto.offset.reset: このコンシューマ・グループに対する初期オフセットが見つからない場合、このプロパティの値は、トピック・パーティションの先頭からメッセージを消費するか、新しいメッセージのみを消費するかを制御します。このプロパティとその使用方法の値は、次のとおりです:

      • earliest: トピック・パーティションの先頭から消費します。

      • latest: トピック・パーティションの最後から消費します(デフォルト)。

      • none: コンシューマ・グループにオフセットが存在しない場合は例外をスローします。

    • key.deserializerおよびvalue.deserializer: Oracle Kafkaメッセージング・プラットフォームの場合、キーおよび値はOracleのTxEventQのJMSメッセージにバイト配列として格納されます。消費時には、これらのバイト配列はそれぞれkey.deserializerおよびvalue.deserializerを使用してキーおよび値にデシリアライズされます。これらのプロパティを使用すると、アプリケーションはバイト配列形式で格納されているKeyおよびValueを、アプリケーション指定のデータ型に変換できます。

    • group.id: Kafkaトピックからのメッセージが処理されるコンシューマ・グループの名前です。このプロパティは、キー・ベースのTxEventQの永続サブスクライバ名として使用されます。

    • max.poll.records: Oracle TxEventQサーバーからの単一の配列デキュー呼出しでフェッチするレコードの最大数。
    • enable.auto.commit: 指定した間隔で処理済メッセージの自動コミットを有効にします。

    • auto.commit.interval.ms: メッセージの自動コミットの間隔(ミリ秒単位)。

    • bootstrap.servers: データベース・インスタンスを実行しているマシンのIPアドレスとポート。

TxEventQ対応のKafka Admin実装の概要

Kafka管理APIにより、アプリケーションで管理タスク(トピックの作成、トピックの削除、トピックへのパーティションの追加など)を実行できるようになります。Oracle Database 23cリリースは、次の管理APIのみをサポートします

  • メソッド

    • create(props)およびcreate(config): 渡されたパラメータを使用するKafkaAdminクラスのオブジェクトを作成します。このメソッドは、それ以降の操作に使用するデータベース・セッションを作成します。アプリケーションは、接続構成の項の説明に従って、接続構成パラメータを指定する必要があります。

    • createTopics(): アプリケーションでKafkaトピックを作成できます。これにより、ユーザーのスキーマにTxEventQが作成されます。

    • close(): データベース・セッションと管理クライアントをクローズします。

    • deleteTopic: Kafkaトピックを削除します。これにより、トピックが正常に削除されるとnullが返されます。それ以外の場合、メソッドは例外をスローします。このメソッドは、トピックが正常に削除されるかエラーが発生するまで戻りません。

  • クラス: NewTopic: 新しいトピックの作成に使用するクラス。このクラスには、トランザクション・イベント・キューの作成に使用するパラメータが含まれています。

TxEventQのKafka REST API

TxEventQ REST APIを使用すると、トピックおよびパーティションから一般的な操作を生成および消費でき、Oracle DatabaseでOracle REST Data Services (ORDS)を使用して実装されます。一般的な操作には、トピックの作成と削除、メッセージの生成と消費、トピック上のコンシューマ・ラグの取得、オフセットへの検索などのための多くの操作APIが含まれます。

Kafkaの次の3つのAPIにより、TxEventQはKafkaデプロイメントと共存できるようになり、トランザクション送信ボックス、JMSメッセージングおよびデータベース内のpub/sub、およびOracle Databaseのイベント・キューへの高スループットのイベント・ストリーミングの利点が提供されます。

関連項目:

Oracle REST Data Services APIドキュメントのOracle Transactional Event Queues RESTエンドポイント

TxEventQのKafkaコネクタ

Kafka SinkコネクタおよびSourceコネクタには、トランザクション・イベント・キューを作成するために、最低21cのOracle Databaseバージョンが必要です。アプリケーションを使用するには、最低3.1.0のバージョン番号のKafkaをサーバーにダウンロードし、インストールする必要があります。

関連項目:

詳細は、https://github.com/oracle/okafka/tree/master/connectorsを参照してください。

メッセージ転送の監視

Sink/Sourceコネクタのメッセージ転送は、Oracle TxEventQから監視できます。

関連項目:

Monitoring Transactional Event Queuesの監視: TxEventQ Monitorシステムを起動してエンキュー率/デキュー率、TxEventQの深さ、および詳細なDB/システム・レベルの統計情報を確認します。