5 Oracle Transactional Event QueuesのKafka API
Oracle Transactional Event Queue (TxEventQ)を使用すると、イベントベースのアプリケーションが簡単に実装できます。さらに、ScalaとJavaで作成されたオープンソースのストリーム処理ソフトウェア・プラットフォームであるApache Kafkaと高度に統合されています。このプラットフォームは、LinkedInによって開発され、Apache Software Foundationに寄贈されたものです。Kafka APIを使用するアプリは、透過的にOracle TxEventQの操作ができるようになります。さらに、Oracle TxEventQでは、TxEventQとKafkaの双方向の情報フローもサポートしているため、変更内容はほぼリアルタイムでTxEventQまたはKafkaですぐに利用できるようになります。
Apache Kafka Connectは、Kafkaとその他のシステムを統合するApache Kafkaに組み込まれているフレームワークです。Oracle TxEventQは、標準JMSパッケージおよび関連するJDBCのTransactionパッケージを提供することで、接続を確立してトランザクション・データ・フローを完了します。Oracle TxEventQは、標準のKafka JMSコネクタを構成することで、相互運用性を確立し、2つのメッセージング・システム間のデータ・フローを完成させます。
この章の内容は次のとおりです。
Apache Kafkaの概要
Apache Kafkaは、通信分散型イベント・ストリーミング・プラットフォームです。このプラットフォームは、水平方向のスケーラビリティと耐障害性を備えています。
Kafkaは、1つ以上のサーバーのクラスタにデプロイされます。各Kafkaクラスタには、トピックというカテゴリでレコードのストリームが格納されます。各レコードは、キー、値、およびタイムスタンプで構成されます。Kafka APIにより、アプリケーションはKafkaクラスタに接続し、Kafkaメッセージング・プラットフォームを使用できるようになります。
Transactional Event Queues対応のKafka Javaクライアント
Oracle Database 21cで、KafkaアプリケーションのOracle Databaseとの互換性が導入されました。Oracle Database 23cでは、KafkaアプリケーションとOracle Databaseの互換性がより洗練されています。これにより、Kafka JavaアプリケーションのTransactional Event Queues (TxEventQ)への移行が簡単になります。Kafka Java APIは、Oracleデータベース・サーバーに接続し、メッセージング・プラットフォームとしてTxEventQを使用できるようになりました。
図5-1 KafkaアプリケーションのTransactional Event Queueとの統合

「図5-1 KafkaアプリケーションのTransactional Event Queueとの統合」の説明
この図は、kafka-clients-2.8.0.jar
ファイルに依存するKafkaのJava APIのOracle固有の実装が含まれているKafka APIライブラリを示しています。この実装では、Oracle Databaseと通信するためにJDBCドライバを使用するAQ-JMS APIを内部的に呼び出します。
開発者は、Kafkaを使用する既存のJavaアプリケーションを、okafka.jar
を使用してOracleデータベースに移行できるようになりました。このクライアント側ライブラリを使用すると、KafkaアプリケーションはKafkaクラスタのかわりにOracle Databaseデータベースに接続して、透過的にTxEventQのメッセージング・プラットフォームを使用できるようになります。
Transactional Event Queues対応のKafka Javaクライアントの構成
前提条件
次に、Oracle DatabaseのTxEventQに対応するKafka Javaクライアントを構成および実行するための前提条件を示します。
-
データベース・ユーザーを作成します。
-
ユーザーに次の権限を付与します。
ノート:
デフォルト表領域で無制限の割当て制限を付与するかわりに、表領域の特定の割当て制限をデータベース・ユーザーに割り当てたり付与することが一般的です。表領域を作成し、次のコマンドを使用して特定の表領域の割当て制限をデータベース・ユーザーに付与できます。
ALTER USER user QUOTA UNLIMITED /* or size-clause */ on tablespace_name
-
GRANT EXECUTE on DBMS_AQ to user
。 -
GRANT EXECUTE on DBMS_AQADM to user
。 -
GRANT SELECT on GV_$SESSION to user;
-
GRANT SELECT on V_$SESSION to user;
-
GRANT SELECT on GV_$INSTANCE to user;
-
GRANT SELECT on GV_$LISTENER_NETWORK to user;
-
GRANT SELECT on GV_$PDBS to user;
-
GRANT SELECT on USER_QUEUE_PARTITION_ASSIGNMENT_TABLE to user;
-
exec DBMS_AQADM.GRANT_PRIV_FOR_RM_PLAN('user');
-
-
TxEventQを使用するための適切なデータベース構成パラメータを設定します。
SET STREAMS_POOL_SIZE=400M
ノート:
ワークロードに基づいてサイズを適切に設定します。
STREAMS_POOL_SIZE
はAutonomous Database Sharedに設定できません。これは自動的に構成されます。設定しても無視されます。 -
LOCAL_LISTENER
データベース・パラメータを設定しますSET LOCAL_LISTENER= (ADDRESS=(PROTOCOL=TCP)(HOST=<HOST_NAME.DOMAIN_NAME/ IP> )(PORT=<PORT NUMBER>))
接続構成
Kafka APIライブラリは、JDBC Thinドライバを使用してOracle Databaseに接続します。この接続を設定するために、Kafkaアプリケーションはプレーン・テキストでユーザー名とパスワードを提供できます。また、アプリケーションでSSLを構成することもできます。OCIのOracle Autonomous Database (ADB)に対してKafkaアプリケーションを実行する場合は、SSL構成のみがサポートされます。その他のデプロイメントの場合は、Oracle Databaseへの接続にはPLAINTEXTまたはSSLを使用できます。
-
PLAINTEXT: このセキュリティ・プロトコルでは、Oracle DatabaseへのJDBCコネクションは、
ojdbc.properties
ファイルのプレーン・テキストで指定したユーザー名とパスワードを使用したTCPプロトコルを使用して設定されます。PLAINTEXTプロトコルを使用するには、アプリケーションは次のプロパティを設定する必要があります。
-
oracle.service.name = <インスタンスで実行しているサービスの名前>
-
bootstrap.servers = <ホスト:ポート>
-
security.protocol
=PLAINTEXT -
oracle.net.tns_admin
= <location of ojdbc.properties file
>
次のプロパティが
ojdbc.properties
ファイルに存在している必要があります。-
user = <nameofdatabaseuser>
-
password = <userpassword>
-
-
SSL: ATPデータベースへの接続にSSLセキュリティ・プロトコルを使用するには、次の追加ステップを実行します。
-
SSLセキュリティのためのJDBC Thinドライバ接続の要件は、次のとおりです。
-
JDK8u162以降。
-
oraclepki.jar
、osdt_cert.jar
、およびosdt_core.jar
-
18.3以降のJDBC Thinドライバ
-
-
Oracle Databaseインスタンスへの接続にJDBCのSSLセキュリティを利用するには、ユーザーは次のプロパティを指定する必要があります。JDBCは、2通りの方法でOracle DatabaseへのSSLで保護された接続をサポートします。
-
ウォレットの使用。ウォレットを使用するには:
-
Oracleウォレットを使用するために必須の依存jarをクラスパスに追加します。
oraclepki.jar
、osdt_cert.jar
およびosdt_core.jar
のファイルをJDBC Thinドライバとともにダウンロードし、それらのjarをクラスパスに追加します。 -
Oracle PKIプロバイダを有効にします
SSLセキュリティを提供するためにSSOウォレット(
cwallet.sso
)を使用する場合は、ファイルjava.security
(場所:$JRE_HOME/jre/lib/security/java.security
)の末尾にOraclePKIProvider
を追加します。たとえば:security.provider.1=sun.security.provider.Sun security.provider.2=sun.security.rsa.SunRsaSign security.provider.3=com.sun.net.ssl.internal.ssl.Provider security.provider.4=com.sun.crypto.provider.SunJCE security.provider.5=sun.security.jgss.SunProvider security.provider.6=com.sun.security.sasl.Provider security.provider.7=oracle.security.pki.OraclePKIProvider
SSLセキュリティのために
ewallet.p12
を使用する場合は、ファイルjava.security
でsunプロバイダの前にOraclePKIProvider
を配置します。たとえば:security.provider.1=sun.security.provider.Sun security.provider.2=sun.security.rsa.SunRsaSign security.provider.3=oracle.security.pki.OraclePKIProvider security.provider.4=com.sun.net.ssl.internal.ssl.Provider security.provider.5=com.sun.crypto.provider.SunJCE security.provider.6=sun.security.jgss.SunProvider security.provider.7=com.sun.security.sasl.Provider
-
アプリケーションで次のプロパティを設定します。
security.protocol=SSL oracle.net.tns_admin=<location of tnsnames.ora file> tns.alias=<alias of connection string in tnsnames.ora>
ojdbc.properties
ファイルで次のプロパティを設定します。このファイルは、oracle.net.tns_admin
プロパティで指定された場所で使用可能である必要があります。user(in smallletters)=nameofdatabaseuser password(in smallletters)=userpassword oracle.net.ssl_server_dn_match=true oracle.net.wallet_location=“(SOURCE=(METHOD=FILE) (METHOD_DATA=(DIRECTORY=/location../wallet_dbname)))”
-
-
Java KeyStoreでJDBCのSSLセキュリティを使用するには、アプリケーションに次のプロパティを指定します。
security.protocol=SSL oracle.net.tns_admin=<location of tnsnames.ora file> tns.alias=<alias of connection string in tnsnames.ora>
ojdbc.properties
ファイルで次のプロパティを設定します。このファイルは、oracle.net.tns_admin
プロパティで指定された場所で使用可能である必要があります。user(in smallletters)=nameofdatabaseuser password(in smallletters)=userpassword oracle.net.ssl_server_dn_match=true javax.net.ssl.trustStore==${TNS_ADMIN}/truststore.jks javax.net.ssl.trustStorePassword=password javax.net.ssl.keyStore=${TNS_ADMIN}/keystore.jks javax.net.ssl.keyStorePassword=password
ノート:
ATPからダウンロードしたウォレット内の
tnsnames.ora
ファイルには、JDBCコネクションの確立に使用されるJDBC接続文字列が格納されています。
-
-
関連項目:
Oracle Database SQL言語リファレンスのALTER USER
Kafkaクライアント・インタフェース
Kafkaアプリケーションは、Kafkaクラスタとの通信にプロデューサ、コンシューマ、および管理のAPIを主に使用します。このバージョンのTxEventQ対応Kafkaクライアントは、Apache Kafka 2.8.0のプロデューサ、コンシューマ、および管理のAPIとプロパティのサブセットのみをサポートしています。okafka.jar
クライアント・ライブラリを使用すると、KafkaアプリケーションはOracle TxEventQプラットフォームを使用できるようになります。okafka.jar
ライブラリにはJRE 9以降が必要です。
最初にKafkaクライアントAPIの使用方法を簡単な例で示し、後でその詳細を説明します。
Kafka APIの例
例: Oracle Kafkaトピックの作成
import java.util.Arrays; import java.util.Properties; import java.util.concurrent.ExecutionException; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.CreateTopicsResult; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.common.KafkaFuture; import org.oracle.okafka.clients.admin.AdminClient; public class SimpleAdminOKafka { public static void main(String[] args) { Properties props = new Properties(); //IP or Host name where Oracle Database 23c is running and Database Listener's Port props.put("bootstrap.servers", "localhost:1521"); //name of the service running on the database instance props.put("oracle.service.name", "freepdb1"); props.put("security.protocol","PLAINTEXT"); // location for ojdbc.properties file where user and password properties are saved props.put("oracle.net.tns_admin","."); try (Admin admin = AdminClient.create(props)) { //Create Topic named TEQ with 10 Partitions. CreateTopicsResult result = admin.createTopics( Arrays.asList(new NewTopic("TEQ", 10, (short)0))); try { KafkaFuture<Void> ftr = result.all(); ftr.get(); } catch ( InterruptedException | ExecutionException e ) { throw new IllegalStateException(e); } System.out.println("Closing OKafka admin now"); } catch(Exception e) { System.out.println("Exception while creating topic " + e); e.printStackTrace(); } } }
例: KafkaコンシューマAPI
import java.util.Properties; import java.time.Duration; import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.List; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.oracle.okafka.clients.consumer.KafkaConsumer; public class SimpleConsumerOKafka { // Dummy implementation of ConsumerRebalanceListener interface // It only maintains the list of assigned partitions in assignedPartitions list static class ConsumerRebalance implements ConsumerRebalanceListener { public List<TopicPartition> assignedPartitions = new ArrayList<>(); @Override public synchronized void onPartitionsAssigned(Collection<TopicPartition> partitions) { System.out.println("Newly Assigned Partitions:"); for (TopicPartition tp :partitions ) { System.out.println(tp); assignedPartitions.add(tp); } } @Override public synchronized void onPartitionsRevoked(Collection<TopicPartition> partitions) { System.out.println("Revoked previously assigned partitions. "); for (TopicPartition tp :assignedPartitions ) { System.out.println(tp); } assignedPartitions.clear(); } } public static void main(String[] args) { //System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "TRACE"); Properties props = new Properties(); //IP or Host name where Oracle Database 23c is running and Database Listener's Port props.put("bootstrap.servers", "localhost:1521"); //name of the service running on the database instance props.put("oracle.service.name", "freepdb1"); props.put("security.protocol","PLAINTEXT"); // location for ojdbc.properties file where user and password properties are saved props.put("oracle.net.tns_admin","."); //Consumer Group Name props.put("group.id" , "CG1"); props.put("enable.auto.commit","false"); // Maximum number of records fetched in single poll call props.put("max.poll.records", 2000); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); Consumer<String , String> consumer = new KafkaConsumer<String, String>(props); ConsumerRebalanceListener rebalanceListener = new ConsumerRebalance(); //Subscribe to a single topic named 'TEQ'. consumer.subscribe(Arrays.asList("TEQ"), rebalanceListener); int expectedMsgCnt = 40000; int msgCnt = 0; Instant startTime = Instant.now(); try { while(true) { try { //Consumes records from the assigned partitions of 'TEQ' topic ConsumerRecords <String, String> records = consumer.poll(Duration.ofMillis(10000)); //Print consumed records for (ConsumerRecord<String, String> record : records) { System.out.printf("partition = %d, offset = %d, key = %s, value =%s\n ", record.partition(), record.offset(), record.key(), record.value()); for(Header h: record.headers()) { System.out.println("Header: " +h.toString()); } } //Commit all the consumed records if(records != null && records.count() > 0) { msgCnt += records.count(); System.out.println("Committing records " + records.count()); try { consumer.commitSync(); }catch(Exception e) { System.out.println("Exception in commit " + e.getMessage()); continue; } if(msgCnt >= expectedMsgCnt ) { System.out.println("Received " + msgCnt + " Expected " + expectedMsgCnt +". Exiting Now."); break; } } else { System.out.println("No Record Fetched. Retrying in 1 second"); Thread.sleep(1000); } }catch(Exception e) { System.out.println("Inner Exception " + e.getMessage()); throw e; } } }catch(Exception e) { System.out.println("Exception from OKafka consumer " + e); e.printStackTrace(); }finally { long runDuration = Duration.between(startTime, Instant.now()).toMillis(); System.out.println("Closing OKafka Consumer. Received "+ msgCnt +" records. Run Duration " + runDuration); consumer.close(); } } }
例: KafkaプロデューサAPI
import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.header.internals.RecordHeader; import org.oracle.okafka.clients.producer.KafkaProducer; import java.time.Duration; import java.time.Instant; import java.util.Properties; import java.util.concurrent.Future; public class SimpleProducerOKafka { public static void main(String[] args) { try { Properties props = new Properties(); //IP or Host name where Oracle Database 23c is running and Database Listener's Port props.put("bootstrap.servers", "localhost:1521"); //name of the service running on the database instance props.put("oracle.service.name", "freepdb1"); props.put("security.protocol","PLAINTEXT"); // location for ojdbc.properties file where user and password properties are saved props.put("oracle.net.tns_admin","."); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); String baseMsg = "This is a test message "; // Creates OKafka Producer Producer<String, String> producer = new KafkaProducer<String, String>(props); Future<RecordMetadata> lastFuture = null; int msgCnt = 40000; Instant startTime = Instant.now(); //Headers, common for all records RecordHeader rH1 = new RecordHeader("CLIENT_ID", "FIRST_CLIENT".getBytes()); RecordHeader rH2 = new RecordHeader("REPLY_TO", "REPLY_TOPIC_NAME".getBytes()); //Produce 40000 messages into topic named "TEQ". for(int i=0;i<msgCnt;i++) { ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("TEQ", ""+i, baseMsg + i); producerRecord.headers().add(rH1).add(rH2); lastFuture =producer.send(producerRecord); } //Waits until the last message is acknowledged lastFuture.get(); long runTime = Duration.between( startTime, Instant.now()).toMillis(); System.out.println("Produced "+ msgCnt +" messages. Run Duration " + runTime); //Closes the OKafka producer producer.close(); } catch(Exception e) { System.out.println("Exception in Main " + e ); e.printStackTrace(); } } }
例: Oracle Kafkaトピックの削除
import java.util.Collections; import java.util.Properties; import org.apache.kafka.clients.admin.Admin; import org.oracle.okafka.clients.admin.AdminClient; public class SimpleAdminDeleteTopic { public static void main(String[] args) { Properties props = new Properties(); //IP or Host name where Oracle Database 23c is running and Database Listener's Port props.put("bootstrap.servers", "localhost:1521"); //name of the service running on the database instance props.put("oracle.service.name", "freepdb1"); props.put("security.protocol","PLAINTEXT"); // location for ojdbc.properties file where user and password properties are saved props.put("oracle.net.tns_admin","."); try (Admin admin = AdminClient.create(props)) { //Throws Exception if failed to delete the topic. Returns null on successful deletion. org.apache.kafka.clients.admin.DeleteTopicsResult delResult = admin.deleteTopics(Collections.singletonList("TEQ")); Thread.sleep(1000); System.out.println("Closing admin now"); } catch(Exception e) { System.out.println("Exception while creating topic " + e); e.printStackTrace(); } } }
ノート:
-
KafkaAdminインタフェースを使用して作成されたトピックには、KafkaProducerまたはKafkaConsumerインタフェースでのみアクセスできます。
-
同様に、KafkaProducerおよびKafkaConsumerインタフェースは、KafkaAdminインタフェースを使用して作成されたトピックからのみレコードを生成または消費できます。
TxEventQのKafka REST API
TxEventQ REST APIを使用すると、トピックおよびパーティションから一般的な操作を生成および消費でき、Oracle DatabaseでOracle REST Data Services (ORDS)を使用して実装されます。一般的な操作には、トピックの作成と削除、メッセージの生成と消費、トピック上のコンシューマ・ラグの取得、オフセットへの検索などのための多くの操作APIが含まれます。
Kafkaの次の3つのAPIにより、TxEventQはKafkaデプロイメントと共存できるようになり、トランザクション送信ボックス、JMSメッセージングおよびデータベース内のpub/sub、およびOracle Databaseのイベント・キューへの高スループットのイベント・ストリーミングの利点が提供されます。
関連項目:
Oracle REST Data Services APIドキュメントのOracle Transactional Event Queues RESTエンドポイント
TxEventQ対応のKafkaプロデューサ実装の概要
プロデューサAPIにより、KafkaアプリケーションはメッセージをOracle Transactional Event Queues (TxEventQ)に発行できるようになります。Kafkaアプリケーションでは、Oracle固有のプロパティ(bootstrap.servers
、oracle.servicename
およびoracle.net.tns_admin
)を指定する必要があります。これらのプロパティの詳細は、構成の項で説明します。これらのプロパティは、データベース接続を設定し、TxEventQにメッセージを生成するために使用されます。現在のリリースでは、OracleのKafkaProducer
の実装はプロデューサAPIのサブセットのみをサポートしています。
内部的には、Oracle Kafkaプロデューサ・オブジェクトが、Oracle TxEventQにメッセージをパブリッシュするために使用されるAQ JMSプロデューサ・オブジェクトをカプセル化します。Apache Kafkaプロデューサと同様に、各プロデューサのsend()
コールでは、そのトピックとパーティションに基づいて、Kafkaレコードがバッチに追加されます。Apache Kafkaの内部アルゴリズムに基づいて、バックグラウンド・スレッドがバッチ全体をOracle TxEventQにパブリッシュします。
次のKafkaProducer
APIは、Oracle Database 23cでサポートされています。
-
コンストラクタ:
KafkaProducer
: プロシージャ・オブジェクトと内部AQ JMSオブジェクトを作成します。KafkaProducer
クラスでは、4種類のコンストラクタが定義されています。これらはすべて、入力として構成パラメータを受け取ります。 -
メソッド:
-
send(ProducerRecord)、send(ProducerRecord, Callback)
:send
メソッドは、非同期的にメッセージをTxEventQにパブリッシュします。このメソッドは、送信を待機しているレコードのバッファーにKafkaレコードが格納された直後に復帰します。バッファがいっぱいの場合、送信コールは最大max.block.ms
の間ブロックされます。レコードは、AQ JMSを使用してトピックに発行されます。このメソッドは、レコードのパーティション、オフセットおよびパブリッシュ・タイムスタンプを含む
Future<RecordMetadata>
を返します。send(ProducerRecord)
とsend(ProducerRecord, Callback)
の両方のバージョンがサポートされます。 -
close
: プロデューサを閉じてメモリーを解放します。また、Oracle Databaseへの内部接続も閉じます。
-
-
クラス
-
ProducerRecord
: Kafkaプラットフォームでメッセージを表すクラス。Kafka APIライブラリは、ProducerRecord
をTxEventQプラットフォームのJMS BytesMessageに変換します。 -
RecordMetadata
: Kafkaプラットフォームのレコードのトピック、パーティション、オフセット、タイムスタンプなどのメタデータが含まれます。TxEventQに関連する値が割り当てられます。TxEventQのメッセージIDは、RecordMetadata
のオフセットに変換されます。 -
Callbackインタフェース: レコードがKafkaトピックに正常にパブリッシュされたときに1回実行されるコールバック関数。
-
パーティショナ・インタフェース: メッセージの
Key
をトピックのパーティション番号にマップするメソッドを定義します。パーティション番号は、TxEventQのストリームIDと同様のものです。
-
-
プロパティ
-
key.serializer
およびvalue.serializer
: キーとペイロードをそれぞれバイト配列に変換します。 -
acks
: Kafka APIの場合、acks
プロパティに関連する値はall
のみです。ユーザーが設定したその他のフィールドは無視されます。 -
linger.ms
: 送信側スレッドがTxEventQのレコードをパブリッシュするまでに待機する時間(ミリ秒単位)。 -
batch.size
: バッチ処理されるレコードの合計サイズ(バイト単位)。送信側スレッドは、このサイズになるまで待機してからTxEventQのレコードをパブリッシュします。 -
buffer.memory
: アキュムレータが保持できる合計メモリー(バイト単位)。 -
max.block.ms
: アキュムレータでbuffer.memory
サイズがいっぱいになったときに、send()
メソッドがメモリー不足エラーを受信できるようになるまでmax.block.ms
の時間待機します。 -
retries
: このプロパティにより、プロデューサは一時的な障害の発生時にレコードを再送信できます。この値は、バッチ当たりの再試行回数を制限します。 -
retry.backoff.ms
: 特定のトピック・パーティションに対する失敗した要求を再試行するまでに待機する時間。これにより、一部の障害シナリオで発生する隙間のないループによるリクエストの繰り返し送信を回避します -
bootstrap.servers
: データベースのインスタンスを実行しているマシンのIPアドレスとポート。
-
TxEventQ対応のKafkaコンシューマ実装の概要
コンシューマAPIにより、アプリケーションはTransactional Event Queue (TxEventQ)からのデータのストリームを読み取れるようになります。TxEventQ対応のKafkaコンシューマはAQ JMS APIを使用し、Oracle TxEventQからのメッセージをバッチで利用するためにJDBCドライバを使用します。Oracle Kafkaの場合、Transactional Event Queueからメッセージをデキューすることで、トピックからのメッセージを消費できます。
Apache Kafkaと同様に、TxEventQの実装では、コンシューマ・グループ(サブスクライバ)に多数のコンシューマ・インスタンス(サブスクライバに対して消費している一意のデータベース・セッション)が含まれる場合があります。コンシューマ・グループごとに、一意のグループID (サブスクライバ名)があります。各コンシューマ・インスタンスは、bootstrap.servers
プロパティで指定されるOracle Databaseに対する単一の接続/セッションを内部的に維持します。Oracle Database 23cでは、コンシューマ・グループ・リバランスのKafka APIサポートが導入されています。トピックのパーティションは、同じコンシューマ・グループの2つのコンシューマに同時にトピックの同じパーティションが割り当てられないように、コンシューマ・グループのアクティブなコンシューマ間に分配されます。新しいコンシューマがコンシューマ・グループに参加したり、既存のコンシューマがグループから退出すると、パーティションはアクティブなコンシューマ間で再分配されます。
Kafka APIの23cリリースでは、コンシューマは1つのトピックにのみサブスクライブできます。
Oracle Database 23cでは、次のKafkaConsumer
APIがサポートされています。
-
コンストラクタ:
KafkaConsumer
: アプリケーションがキー・ベースのTxEventQからのメッセージを利用できるようになるコンシューマを作成します。作成された内部クライアント側TxEventQオブジェクトは、クライアント・アプリケーションから認識されません。KafkaConsumer
コンストラクタのすべてのバリエーションが、Oracle Database 23cでサポートされています。 -
メソッド:
-
Subscribe
: このメソッドは、サブスクライブ先のトピックのリストを受け取ります。Oracle Database 23cでは、リストの最初のトピックのみがサブスクライブされます。リストのサイズが1を超えると、例外がスローされます。このメソッドは、TxEventQサーバー側に、サブスクライバ名としてグループIDが設定された永続サブスクライバを作成します。アプリケーションは、ConsumerRebalanceListener
インタフェースを実装し、実装されたクラスのオブジェクトをサブスクライブ・メソッドに渡すこともできます。これにより、コンシューマは、パーティションが取り消されたとき、または割り当てられたときにコールバックを実行できます。 -
Poll
:poll
メソッドは、TxEventQの割当て済パーティションからメッセージのバッチを返します。このメソッドは、サブスクライバに対応するキー・ベースのTxEventQからメッセージをデキューしようとします。TxEventQは、AQ JMSの配列デキューAPIを使用してキューからメッセージのバッチを受信します。バッチのサイズは、Kafkaクライアント・アプリケーションで設定したパラメータmax.poll.records
によって決まります。Poll
は、引数としてミリ秒単位の時間を受け取ります。配列デキューのAQ JMS APIは、このタイムアウトをデキュー・オプションとしてTxEventQサーバーに渡し、デキュー・コールを実行できます。これにより、完全な配列バッチが完了していない場合はタイムアウトまでメッセージを待機します。コンシューマによって初めてpollが起動されると、コンシューマ・グループの存続しているすべてのコンシューマに対してコンシューマ・リバランスがトリガーされます。コンシューマ・リバランスの終了時に、存続しているすべてのコンシューマにトピック・パーティションが割り当てられ、後続のpollリクエストでは、割り当てられたパーティションからのみメッセージがフェッチされます。
アプリケーションは、
ConsumerRebalanceListener
インタフェースおよびpartition.assignment.strategy
構成を使用してリバランスに参加し、影響を与えることができます。partition.assignment.strategy
構成を使用すると、アプリケーションはコンシューマ・ストリームにパーティションを割り当てるための戦略を選択できます。OKafkaは、Apache Kafka 2.8.0ドキュメントに記載されている、この構成パラメータのすべての値をサポートしています。この構成のデフォルト値は
org.oracle.okafka.clients.consumer.TXEQAssignor
で、Oracle RACを認識し、Oracle TxEventQからのスループットを向上させるのに最適な戦略が実装されています。この戦略は、ライブ・セッション間でパーティションを分散しながら、パーティションの公平な分散とメッセージのローカル消費を優先します。
ConsumerRebalanceListener
を使用すると、アプリケーションは、パーティションが取り消されたとき、またはコンシューマに割り当てられたときにコールバックを起動できます。データベース・ビュー
USER_QUEUE_PARTITION_ASSIGNMENT_TABLE
により、開発者は存続しているコンシューマ間のパーティションの現在の分散を表示できます。 -
commitSync
: すべての処理済メッセージをコミットします。Oracle Database 23cでは、オフセットに対するコミットはサポートされていません。この呼び出しは、TxEventQからのすべての処理済メッセージをコミットするデータベースのcommitを直接呼び出します。 -
commitAsync
: この呼び出しは、commitSync
に変換されます。引数として渡されたコールバック関数は、コミットが正常に完了したときに1回実行されます。 -
Unsubscribe
: コンシューマがサブスクライブしていたトピックのサブスクライブを解除します。コンシューマは、サブスクライブが解除されたトピックからのメッセージを処理できなくなります。この呼び出しによって、TxEventQメタデータからサブスクライバ・グループが削除されることはありません。他のコンシューマ・アプリケーションは、同じコンシューマ・グループに対して消費を続行できます。 -
close
: コンシューマをクローズして、サブスクライブしていたトピックのサブスクライブを解除します。
-
-
Class:
ConsumerRecord
: Kafkaプラットフォームで消費されたレコードを表すクラス。Kafka APIは、TxEventQからAQ JMSメッセージを受信し、それぞれをConsumerRecord
に変換してアプリケーションに配信します。 -
プロパティ:
-
auto.offset.reset
: このコンシューマ・グループに対する初期オフセットが見つからない場合、このプロパティの値は、トピック・パーティションの先頭からメッセージを消費するか、新しいメッセージのみを消費するかを制御します。このプロパティとその使用方法の値は、次のとおりです:-
earliest
: トピック・パーティションの先頭から消費します。 -
latest
: トピック・パーティションの最後から消費します(デフォルト)。 -
none
: コンシューマ・グループにオフセットが存在しない場合は例外をスローします。
-
-
key.deserializer
およびvalue.deserializer
: Oracle Kafkaメッセージング・プラットフォームの場合、キーおよび値はOracleのTxEventQのJMSメッセージにバイト配列として格納されます。消費時には、これらのバイト配列はそれぞれkey.deserializer
およびvalue.deserializer
を使用してキーおよび値にデシリアライズされます。これらのプロパティを使用すると、アプリケーションはバイト配列形式で格納されているKey
およびValue
を、アプリケーション指定のデータ型に変換できます。 -
group.id
: Kafkaトピックからのメッセージが処理されるコンシューマ・グループの名前です。このプロパティは、キー・ベースのTxEventQの永続サブスクライバ名として使用されます。 max.poll.records
: Oracle TxEventQサーバーからの単一の配列デキュー呼出しでフェッチするレコードの最大数。-
enable.auto.commit
: 指定した間隔で処理済メッセージの自動コミットを有効にします。 -
auto.commit.interval.ms
: メッセージの自動コミットの間隔(ミリ秒単位)。 -
bootstrap.servers
: データベース・インスタンスを実行しているマシンのIPアドレスとポート。
-
TxEventQ対応のKafka Admin実装の概要
Kafka管理APIにより、アプリケーションで管理タスク(トピックの作成、トピックの削除、トピックへのパーティションの追加など)を実行できるようになります。Oracle Database 23cリリースは、次の管理APIのみをサポートします
-
メソッド
-
create(props)
およびcreate(config)
: 渡されたパラメータを使用するKafkaAdmin
クラスのオブジェクトを作成します。このメソッドは、それ以降の操作に使用するデータベース・セッションを作成します。アプリケーションは、接続構成の項の説明に従って、接続構成パラメータを指定する必要があります。 -
createTopics()
: アプリケーションでKafkaトピックを作成できます。これにより、ユーザーのスキーマにTxEventQが作成されます。 -
close()
: データベース・セッションと管理クライアントをクローズします。 -
deleteTopic
: Kafkaトピックを削除します。これにより、トピックが正常に削除されるとnull
が返されます。それ以外の場合、メソッドは例外をスローします。このメソッドは、トピックが正常に削除されるかエラーが発生するまで戻りません。
-
-
クラス:
NewTopic
: 新しいトピックの作成に使用するクラス。このクラスには、トランザクション・イベント・キューの作成に使用するパラメータが含まれています。
TxEventQのKafka REST API
TxEventQ REST APIを使用すると、トピックおよびパーティションから一般的な操作を生成および消費でき、Oracle DatabaseでOracle REST Data Services (ORDS)を使用して実装されます。一般的な操作には、トピックの作成と削除、メッセージの生成と消費、トピック上のコンシューマ・ラグの取得、オフセットへの検索などのための多くの操作APIが含まれます。
Kafkaの次の3つのAPIにより、TxEventQはKafkaデプロイメントと共存できるようになり、トランザクション送信ボックス、JMSメッセージングおよびデータベース内のpub/sub、およびOracle Databaseのイベント・キューへの高スループットのイベント・ストリーミングの利点が提供されます。
関連項目:
Oracle REST Data Services APIドキュメントのOracle Transactional Event Queues RESTエンドポイント
TxEventQのKafkaコネクタ
Kafka SinkコネクタおよびSourceコネクタには、トランザクション・イベント・キューを作成するために、最低21cのOracle Databaseバージョンが必要です。アプリケーションを使用するには、最低3.1.0のバージョン番号のKafkaをサーバーにダウンロードし、インストールする必要があります。
関連項目:
詳細は、https://github.com/oracle/okafka/tree/master/connectorsを参照してください。
メッセージ転送の監視
Sink/Sourceコネクタのメッセージ転送は、Oracle TxEventQから監視できます。
関連項目:
Monitoring Transactional Event Queuesの監視: TxEventQ Monitorシステムを起動してエンキュー率/デキュー率、TxEventQの深さ、および詳細なDB/システム・レベルの統計情報を確認します。