6 Oracle Transactional Event Queues対応のKafka Javaクライアント・インタフェース

この章の内容は次のとおりです。

Apache Kafkaの概要

Apache Kafkaは、通信分散型イベント・ストリーミング・プラットフォームです。このプラットフォームは、水平方向のスケーラビリティと耐障害性を備えています。

Kafkaは、1つ以上のサーバーでクラスタとして実行されます。各Kafkaクラスタには、トピックというカテゴリでレコードのストリームが格納されます。各レコードは、キー、値、およびタイムスタンプで構成されます。Kafka APIにより、アプリケーションはKafkaクラスタに接続して、Kafkaメッセージング・プラットフォームを使用できるようになります。

Transactional Event Queues対応のKafka Javaクライアント

Oracle Database 20cには、KafkaアプリケーションのOracleデータベースとの互換性が導入されています。これにより、Kafka JavaアプリケーションのTransaction Event Queues (TEQ)への移行が簡単になります。The Kafka Java APIは、Oracleデータベース・サーバーに接続し、メッセージング・プラットフォームとしてTEQを使用できるようになりました。

図6-1 KafkaアプリケーションのTransactional Event Queueとの統合



この図は、KafkaのJava APIのうちOracle固有の実装が含まれているOKafkaライブラリを示しています。この実装では、Oracle Databaseと通信するためにJDBCドライバを使用するAQ-JMS APIを内部的に呼び出します。

開発者は、Kafkaを使用する既存のJavaアプリケーションをOracleデータベースに移行できるようになりました。Oracle Database 20cには、KafkaアプリケーションがKafkaクラスタのかわりにOracle Databaseデータベースに接続して、TEQのメッセージング・プラットフォームを透過的に使用できるようになるクライアント側ライブラリが用意されています。

Transactional Event Queues対応のKafka Javaクライアントの構成

KafkaアプリケーションをTEQメッセージング・プラットフォームに移行するには、2レベルの構成が必要になります。

  • データベース・レベルの構成

  • アプリケーション・レベルの構成。

Kafkaアプリケーションでは、OKafkaライブラリがOracle Databaseを見つけられるようにする特定のプロパティを設定する必要があります。これは、Kafkaアプリケーションがzoo keeper情報を提供する方法と類似しています。こうした接続プロパティは、次の2つの方法で設定できます。

  • プレーン・テキストで提供されるデータベース・ユーザーとパスワードの使用

  • JDBCウォレットの使用。

前提条件

次に、Oracle DatabaseのTEQに対応するKafka Javaクライアントを構成および実行するための前提条件を示します。

  1. データベース・ユーザーを作成します。

  2. 次のユーザー権限を付与します。

    • userに対するgrant connectresource

    • userに対するgrant execute on dbms_aq

    • userに対するgrant execute on dbms_aqadm

    • userに対するgrant execute on dbms_aqin

    • userに対するgrant execute on dbms_aqjms

    • userに対するgrant select_catalog_role

  3. TEQを使用するための適切なデータベース構成パラメータを設定します。

    set streams_pool_size=400M
  4. LOCAL_LISTENERデータベース・パラメータを設定します

    set LOCAL_LISTENER= (ADDRESS=(PROTOCOL=TCP)(HOST=<HOST NAME/ IP> )(PORT=<PORT NUMBER>))

接続構成

OKafkaライブラリは、JDBC Thinドライバを使用してOracle Databaseに接続します。この接続を設定するために、Kafkaアプリケーションはプレーン・テキストでユーザー名とパスワードを提供できます。また、アプリケーションでSSLを構成することもできます。クラウドのOracle Autonomous Transaction Processing (ATP) Databaseに対してKafkaアプリケーションを実行する場合は、SSL構成のみがサポートされます。Oracle Databaseへの接続にはPLAINTEXTまたはSSLを使用できます。

  • PLAINTEXT: このプロトコルのJDBC接続では、Oracleインスタンスへの接続にユーザー名とパスワードを使用します。

    プレーンテキスト・プロトコルを使用する場合、ユーザーはアプリケーションから次のプロパティを提供する必要があります

    • oracle.service.name = <インスタンスで実行しているサービスの名前>

    • oracle.instance.name = <Oracle Databaseインスタンスの名前>

    • bootstrap.servers = <ホスト:ポート>

    ojdbc.propertiesファイルとojdbc.propertiesファイルの次のプロパティは、oracle.net.tns_adminに存在している必要があります。

    • user = <nameofdatabaseuser>

    • password = <userpassword>

  • SSL: ATP Databaseへの接続にSSLで保護された接続を使用するには、次のステップを実行します。

    1. SSLセキュリティのためのJDBC Thinドライバ接続の要件は、次のとおりです。
      • JDK8u162以降。

      • oraclepki.jarosdt_cert.jar、およびosdt_core.jar

      • 18.3 JDBC Thinドライバ以降(推奨)

    2. Oracle Databaseインスタンスへの接続にJDBCのSSLセキュリティを利用するには、ユーザーは次のプロパティを指定する必要があります。JDBCは、2通りの方法でOracle DatabaseへのSSLで保護された接続をサポートします。

      • ウォレットの使用。ウォレットを使用するには:

        1. Oracleウォレットを使用するために必須の依存jarをクラスパスに追加します。

          oraclepki.jarosdt_cert.jar、およびosdt_core.jarのファイルをダウンロードして、それらのjarをJDBC thinドライバとともにクラスパスに追加します。

        2. Oracle PKIプロバイダを有効にします

          SSLセキュリティを提供するためにSSOウォレット(cwallet.sso)を使用する場合は、ファイルjava.security (場所: $JRE_HOME/jre/lib/security/java.security)の末尾にOraclePKIProviderを追加します。次に、java.securityの例を示します。

                           security.provider.1=sun.security.provider.Sun
                           security.provider.2=sun.security.rsa.SunRsaSign
                           security.provider.3=com.sun.net.ssl.internal.ssl.Provider
                           security.provider.4=com.sun.crypto.provider.SunJCE
                           security.provider.5=sun.security.jgss.SunProvider
                           security.provider.6=com.sun.security.sasl.Provider
                           security.provider.7=oracle.security.pki.OraclePKIProvider

          SSLセキュリティのためにewallet.p12を使用する場合は、ファイルjava.securityでsunプロバイダの前にOraclePKIProviderを配置します。次に、java.securityの例を示します。

                          security.provider.1=sun.security.provider.Sun
                          security.provider.2=sun.security.rsa.SunRsaSign
                          security.provider.3=oracle.security.pki.OraclePKIProvider
                          security.provider.4=com.sun.net.ssl.internal.ssl.Provider
                          security.provider.5=com.sun.crypto.provider.SunJCE
                          security.provider.6=sun.security.jgss.SunProvider
                          security.provider.7=com.sun.security.sasl.Provider
        3. 次のプロパティをアプリケーションから指定します。

                        security.protocol = “”SSL”
                        oracle.net.tns_admin = “location of tnsnames.ora file” (for parsing jdbc connection string)
                        tns.alias = “alias of connection string in tnsnames.ora”
                        

          また、ojdbc.propertiesファイルとojdbc.propertiesファイルの次のプロパティは、oracle.net.tns_adminに存在している必要があります。

                        user(in smallletters)=nameofdatabaseuser
                        password(in smallletters)=userpassword
                        oracle.net.ssl_server_dn_match=true
                        oracle.net.wallet_location=“(SOURCE=(METHOD=FILE)
                                                   (METHOD_DATA=(DIRECTORY=/location../wallet_dbname)))”
      • Javaキー・ストアを使用する場合。JDBC SSLセキュリティをJavaキー・ストアに提供するには、アプリケーションから次のプロパティを指定します。

               security.protocol = "SSL"              
               oracle.net.tns_admin = "location of tnsnames.ora file"              
               tns.alias = "alias of connection string in tnsnames.ora"

        また、ojdbc.propertiesファイルとojdbc.propertiesファイルの次のプロパティは、oracle.net.tns_adminに存在している必要があります。

                      user(in smallletters)=nameofdatabaseuser
                      password(in smallletters)=userpassword
                      oracle.net.ssl_server_dn_match=true
                      javax.net.ssl.trustStore==${TNS_ADMIN}/truststore.jks
                      javax.net.ssl.trustStorePassword = password
                      javax.net.ssl.keyStore= ${TNS_ADMIN}/keystore.jks
                      javax.net.ssl.keyStorePassword="password" 

        ノート:

        ATPからダウンロードしたウォレット内のtnsnames.oraファイルには、jdbcの確立に使用されるjdbc接続文字列が格納されています。

Kafkaクライアント・インタフェース

Kafkaアプリケーションは、Kafkaクラスタとの通信にプロデューサ、コンシューマ、および管理のAPIを主に使用します。このバージョンのTEQ対応Kafkaクライアントは、Kafka 2.0のプロデューサ、コンシューマ、および管理のAPIとプロパティのサブセットのみをサポートしています。

TEQ対応のKafkaプロデューサ実装の概要

プロデューサAPIにより、KafkaアプリケーションはメッセージをOracle Transaction Event Queuesに発行できるようになります。Kafkaアプリケーションでは、Oracle固有のプロパティ(oracle.hostoracle.portoracle.servicename、およびoracle.instancename)を指定する必要があります。このプロパティの詳細は、構成のセクションで説明します。これらのプロパティは、データベース接続の設定と、TEQにメッセージを生成するために使用されます。現行リリースでは、OracleのKafkaProducerの実装はAPIのサブセットのみをサポートしています。

内部的には、Oracle Kafkaプロデューサ・オブジェクトは、Oracle TEQにメッセージを発行するために使用されるAQJMSプロデューサ・オブジェクトをカプセル化します。Kafkaプロデューサと同様に、プロデューサはバッチにもメッセージを格納します。send()の呼び出しごとに、Kafkaレコードがトピックとパーティションに基づいて特定のバッチに追加されます。バックグラウンド・スレッドにより、バッチ全体が一度にOracle TEQに発行されます。それぞれのバッチの発行は、プロデューサによってコミットされます。現行リリースでは、トピックに保持できるパーティションは1つのみのため、すべてのKafkaRecordsがTEQの単一パーティションに発行されます。

次のKafkaProducer APIは、Oracle Database 20cでサポートされています。

  • コンストラクタ:

    KafkaProducer: プロシージャ・オブジェクトと内部AQ JMSオブジェクトを作成します。KafkaProducerクラスでは、4種類のコンストラクタが定義されています。それらのすべては、入力として構成パラメータを受け取ります。

  • メソッド:

    • send(ProducerRecord)、send(ProducerRecord, Callback):

      sendメソッドは、非同期的にメッセージをTEQに発行します。このメソッドは、送信を待機しているレコードのバッファーにKafkaレコードが格納された直後に復帰します。バッファ・メモリーが満杯の場合、max.block.msの最大時間までコール・ブロックを送信します。これにより、個別の応答の待機のためにブロックされることなく、多数のレコードをパラレルに送信できるようになります。レコードは、AQ JMSを使用してトピックに発行されます。

      送信結果のFuture<RecordMetadata>では、レコードの送信先になったパーティション、そのレコードが割り当てられたオフセット、およびレコードのタイムスタンプが指定されます。send(ProducerRecord)send(ProducerRecord, Callback)の両方のバージョンがサポートされます。

    • close: プロデューサとその送信元スレッドをクローズして、アキュムレータを解放します。また、内部AQ JMSオブジェクト(接続やセッションJMSプロデューサなど)もクローズします。

  • クラス

    • ProducerRecord: Kafkaプラットフォームでメッセージを表すクラス。これは、AQ JMSメッセージというTEQプラットフォーム用のメッセージに変換されます。PayloadKeyなどの関連フィールドは、TEQペイロードとTEQ用のメッセージ・キーに直接変換できます。

    • RecordMetadata: レコードのメタデータ(KafkaPlatformのレコードのトピック、パーティション、オフセット、タイムスタンプなど)が格納されます。TEQに関連する値が割り当てられます。TEQのメッセージIDは、RecordMetadataのオフセットに変換されます。

    • Callbackインタフェース: レコードがKafkaトピックに正常に発行されたときに1回実行されるコールバック関数。

    • パーティショナ・インタフェース: メッセージのKeyをトピックのパーティション番号にマップするメソッドを定義します。パーティション番号は、TEQのストリームIDと同様のものです。

  • プロパティ

    • キー・シリアライザおよび値シリアライザ: キーとペイロードのそれぞれをバイト配列に変換します。アキュムレータ・モジュールは、バイト配列形式でペイロードを格納します。その後、送信側スレッドはAQjmsBytesメッセージを形成し、AQ JMS配列エンキューAPIを使用してメッセージを発行します。

    • acks: okafkaの場合、acksプロパティに関連する値はallのみです。ユーザーが設定したその他のフィールドは無視されます。

    • linger.ms: 送信側スレッドがTEQのレコードを発行するまでに待機する時間(ミリ秒単位)。

    • batch.size: バッチ処理されるレコードの合計サイズ(バイト単位)。送信側スレッドは、このサイズになるまで待機してからTEQのレコードを発行します。

    • buffer.memory: アキュムレータが保持できる合計メモリー(バイト単位)。

    • max.block.ms: アキュムレータでbuffer.memoryサイズが満杯になったときに、send()メソッドがメモリー不足エラーを受信できるようになるまでmax.block.msの時間待機します。

    • retries: このプロパティにより、プロデューサは一時的な障害の発生時にレコードを再送信できるようになります。この値は、再送信回数の上限です。

    • retry.backoff.ms : 特定のトピック・パーティションに対する失敗した要求を再試行するまでに待機する時間。これにより、一部の障害シナリオで発生する隙間のないループによるリクエストの繰り返し送信を回避します

    • bootstrap.servers: データベース・インスタンスを実行しているマシンのIPアドレスとポート。

TEQ対応のKafkaコンシューマ実装の概要

コンシューマAPIにより、アプリケーションはTransactional Event Queueからのデータのストリームを読み取れるようになります。TEQ対応のKafkaコンシューマはAQ JMS APIを使用し、Oracle TEQからのメッセージを利用するためにJDBCドライバを使用します。Oracle Kafkaの場合、トピックからのメッセージの利用は、Transactional Event Queueからメッセージをデキューすることを意味します。

Kafkaと同様に、TEQの実装では、コンシューマ・グループに多数のコンシューマ・インスタンスが含まれています。コンシューマ・グループごとに、一意のグループIDがあります。各コンシューマは、bootstrap.serversプロパティで指定されるOracle Databaseに対する単一の接続/セッションを内部的に維持します。このリリースでは、1つのトピックが保持できるパーティションは1つに限定されるため、この単一パーティションに割り当てられるコンシューマ・インスタンスは1つのみになります。コンシューマ・グループの1つのコンシューマに割り当てられたパーティションは、セッションが閉じられるまで、そのコンシューマに保持されます。同じグループの2つのコンシューマに、トピックの同じパーティションが割り当てられることはありません。

Oracle Database 20cでは、次のKafkaConsumer APIがサポートされています。

  • コンストラクタ: KafkaConsumer: アプリケーションがキー・ベースのTEQからのメッセージを利用できるようになるコンシューマを作成します。作成された内部クライアント側TEQオブジェクトは、クライアント・アプリケーションから認識できません。KafkaConsumerコンストラクタのすべてのバリエーションが、Oracle Database 20cでサポートされています。

  • メソッド:

    • Subscribe: このメソッドは、サブスクライブ先のトピックのリストを受け取ります。Oracle Database 20cでは、リストの最初のトピックのみがサブスクライブされます。リストのサイズが1未満の場合は、例外がスローされます。このメソッドは、TEQサーバー側に、サブスクライバ名としてグループIDが設定された永続サブスクライバを作成します。

    • Poll: pollメソッドは、TEQの割当て済パーティションからメッセージのバッチを返します。このメソッドは、サブスクライバに対応するキー・ベースのTEQからメッセージをデキューしようとします。TEQは、AQ JMSの配列デキューAPIを使用してキューからデキューしたメッセージのバッチを取得します。バッチのサイズは、kafkaクライアント・アプリケーションで設定したパラメータmax.poll.recordsによって決まります。Pollは、引数としてミリ秒単位の時間を受け取ります。配列デキューのAQ JMS APIは、このタイムアウト時間をデキュー・オプションとしてTEQサーバーに渡して、完全な配列バッチが完了していないときには、タイムアウト時間までデキュー呼び出しがメッセージを待機するようになります。

      初めてpollが呼び出されたときに、Oracle TEQは、このKafkaコンシューマに単一の使用可能パーティションを割り当てます。この割当ては、そのKafkaコンシューマの存続期間が終わるまで維持されます。返されるメッセージは、コンシューマに割り当てられたパーティションに属しています。Kafkaコンシューマごとに、1つのキュー・パーティションが割り当てられます。キューのパーティション数と同数のコンシューマを開始することは、アプリケーション開発者の責任になります。Kafkaコンシューマの数がパーティションの数より少ない場合、未割当てのパーティションからのメッセージが利用されることはありません。Kafkaコンシューマの数がパーティションの数よりも多い場合、余分なコンシューマはパーティションに割り当てられることがないため、メッセージの利用はできなくなります。2つのコンシューマ・アプリケーションが、同じパーティションから同時に処理することはありません。

    • commitSync: すべての処理済メッセージをコミットします。Oracle Database 20cでは、オフセットに対するコミットはサポートされていません。この呼び出しは、TEQからのすべての処理済メッセージをコミットするデータベースのcommitを直接呼び出します。

    • commitAsync: この呼び出しは、commitSyncに変換されます。引数として渡されたコールバック関数は、コミットが正常に完了したときに1回実行されます。

    • Unsubscribe: サブスクライブされていたトピックのサブスクライブを解除します。コンシューマは、サブスクライブが解除されたトピックからのメッセージを処理できなくなります。この呼び出しによって、TEQメタデータからサブスクライバ・グループが削除されることはありません。別のコンシューマ・アプリケーションは、そのまま処理を継続できます。

    • close: コンシューマをクローズして、サブスクライブしていたトピックのサブスクライブを解除します。

  • Class: ConsumerRecord: Kafkaプラットフォームで処理されたレコードを表すクラス。Oracle Dataase 20cでは、AQ JMSメッセージがConsumerRecordに変換されます。

  • プロパティ:

    • key.deserializerおよびvalue.deserialzer: Oracle TEQのキー・ベースのパーティション化キューでは、キーと値がそれぞれユーザー・プロパティとJMSメッセージのペイロードにバイト配列として保存されます。これらのバイト配列は、処理時にユーザーが指定した形式のキーと値に内部的にデシリアライズされます。そのために、コンシューマではkey.deserializevalue.deserializerが使用されます。

    • group.id: Kafkaトピックからのメッセージが処理されるコンシューマ・グループの名前です。このプロパティは、キー・ベースのTEQの永続サブスクライバ名として使用されます。

    • max.poll.records: Oracle TEQサーバーからの単一の配列デキュー呼び出しでフェッチするレコードの最大数。
    • fetch.max.wait.ms: 取得できなかったメッセージのフェッチを待機する時間(ミリ秒)。

    • enable.auto.commit: 指定した間隔で処理済メッセージの自動コミットを有効にします。

    • auto.commit.interval.ms: メッセージの自動コミットの間隔(ミリ秒単位)。

    • bootstrap.servers: データベース・インスタンスを実行しているマシンのIPアドレスとポート。

TEQ対応のKafka管理実装の概要

Kafka管理APIにより、アプリケーションで管理タスク(トピックの作成、トピックの削除、トピックへのパーティションの追加など)を実行できるようになります。Oracle Database 20cは、次の管理APIをサポートしています。

  • メソッド

    • create(props)およびcreate(config): 渡されたパラメータを使用するKafkaAdminクラスのオブジェクトを作成します。ユーザーは、それ以降の操作に使用するデータベース・セッションを作成ます。クライアント・アプリケーションでは、oracle.hostoracle.portoracle.servicenameoracle.instancenameoracle.user、およびoracle.passwordを指定する必要があります。これらのOracle Databaseのプロパティは、データベース接続を設定するために使用されます。

    • close(): データベース・セッションと管理クライアントをクローズします。

    • deleteTopic: TEQを停止して削除します。

  • クラス: NewTopic: 新しいトピックの作成に使用するクラス。このクラスには、トランザクション・イベント・キューの作成に使用するパラメータが含まれています。

  • プロパティ

    • bootstrap.servers: データベース・インスタンスを実行しているマシンのIPアドレスとポート。

    • retention.ms: すべてのコンシューマ・グループまたはサブスクライバがメッセージをデキューした後で、キューにメッセージを保持しておく時間(ミリ秒単位)。

    • partitions: クラスNewTopicのパラメータ。新しいトランザクション・イベント・キューの作成に使用したパーティションの数。

例: 使用方法

例6-1 Producer.java

import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.oracle.okafka.clients.admin.AdminClient;
import org.oracle.okafka.clients.admin.CreateTopicsOptions;
import org.oracle.okafka.clients.admin.NewTopic;
import org.oracle.okafka.clients.producer.KafkaProducer;
import org.oracle.okafka.clients.producer.ProducerRecord;
import org.oracle.okafka.common.KafkaFuture;

public class Producer {	
	
public static void main(String[] args) {			
		
		if(args.length != 1) {
			System.out.println("Please provide topic name to produce messages.");
			return ;
		}		
		String topic = args[0].trim(); 			
              KafkaProducer<String,String> prod = null;		
		Properties props = new Properties();
		
		props.put("oracle.instance.name", "kafka");
		props.put("oracle.service.name", "kafka.regress.rdbms.dev.us.oracle.com");	  
              props.put("oracle.user.name", "aq");
	       props.put("oracle.password", "aq");	    
              props.put("bootstrap.servers", "localhost:1521");
		props.put("batch.size", 200);
		props.put("linger.ms", 100);
		props.put("buffer.memory", 335544);
		props.put("key.serializer", "org.oracle.okafka.common.serialization.StringSerializer");
		props.put("value.serializer", "org.oracle.okafka.common.serialization.StringSerializer");	
		
		System.out.println("Creating producer now");		  
		prod=new KafkaProducer<String, String>(props);
		System.out.println("Producer created.");
		
		 try {
			 int i;	
			 for(i = 0; i < 10; i++)				 
			     prod.send(new ProducerRecord<String, String>(topic ,0, i+"000","This is new message"+i));
 		     System.out.println("Sent "+ i + "messages");	 
		 } catch(Exception ex) {			  
			 System.out.println("Failed to send messages:");
			 ex.printStackTrace();
		 }
		 finally {
			 prod.close();
		 }		
	}
}

例6-2 Consumer.java

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import org.oracle.okafka.clients.consumer.ConsumerRecord;
import org.oracle.okafka.clients.consumer.ConsumerRecords;
import org.oracle.okafka.clients.consumer.KafkaConsumer;
public class Consumer {	
	public static void main(String[] args) {		
	    Properties props = new Properties();	    
	    if(args.length != 1) {
			System.out.println("Please provide topic name to consume messages.");
			return ;
		}	  
	    String topic = args[0].trim(); 
           props.put("oracle.service.name", "kafka.regress.rdbms.dev.us.oracle.com");	    	     
           props.put("oracle.instance.name", "kafka");
	    props.put("oracle.user.name", "aq");
	    props.put("oracle.password", "aq");
	    props.put("bootstrap.servers", "localhost:1521");
           props.put("group.id", "kafka");
           props.put("enable.auto.commit", "true");
           props.put("auto.commit.interval.ms", "10000");
           props.put("key.deserializer",  "org.oracle.okafka.common.serialization.StringDeserializer");	      
           props.put("value.deserializer",    "org.oracle.okafka.common.serialization.StringDeserializer");  	    
           props.put("max.poll.records", 100);
      KafkaConsumer<String, String> consumer = null;
	    consumer = new KafkaConsumer<String, String>(props);
     consumer.subscribe(Arrays.asList(topic));
     ConsumerRecords<String, String> records = null;
	   try {
		   records = consumer.poll(Duration.ofMillis(1000));
	 	   for (ConsumerRecord<String, String> record : records) {		 	  	   
                    System.out.println("topic = , partition=  ,key= , value = \n"+ 		 	  	             
                                  record.topic()+ "  "+record.partition()+ "  "+record.key()+"  "+ record.value());                  
                    System.out.println(".......");
 	 	    }
	 	   consumer.commitSync();		 	  	    	 
	     }catch(Exception ex) {
	    	 ex.printStackTrace(); 
	     } finally {
	    	 consumer.close();
	     } 
	}    
}