24 リアクティブ・ストリームの収集のためのJavaライブラリのサポート

Oracle Databaseリリース21c以降には、Oracle Databaseへのデータの効率的なストリーミングを可能にする、リアクティブ・ストリームの収集(RSI)のサポートを提供するJavaライブラリがあります。新しいJavaライブラリを使用すると、Javaアプリケーションは、大規模なクライアント・グループからのデータを継続的に受信して収集できるようになります。

Oracle Databaseのダイレクト・パス・ロード方式をデータの挿入に使用すると、新しいJavaライブラリによって収集プロセスはブロックされることなく非常に高速なものになります。これは既存のUCP APIの拡張を使用しています。これにより、表パーティション、Oracle RAC接続アフィニティおよびシャーディングのサポートなどのデータベースの高可用性機能およびスケーラビリティ機能を収集プロセスに与えることができます。

24.1 リアクティブ・ストリームの収集のためのJavaライブラリの概要

リアクティブ・ストリームの収集(RSI)のJavaライブラリを使用すると、非ブロック方式でOracle Databaseへの効率的なデータ・ストリーミングが可能になります。

このライブラリは、多数のクライアントが表の行の形式で情報を永続化するためにデータベースを使用する場合で、データベースからの同期レスポンスを待機する際にブロックされないようにする場合に特に役立ちます。そのため、次のようなユースケースの場合、アプリケーションがストリーミング・データを非常に高速でデータベースに取り込み、Oracle Database表の行形式で永続化する必要があるときは、このライブラリを使用できます。

  • Internet of Things (IoT)センサー
  • 株式取引用の時系列データ
  • コール詳細レコード(CDR)
  • 地理空間アクティビティ
  • ビデオWebサイト
  • ソーシャル・メディア掲載

このライブラリを使用するには、次のJARファイルをCLASSPATHに追加する必要があります。

  • rsi.jar
  • ojdbc11.jar
  • ucp.jar
  • ons.jar

24.2 リアクティブ・ストリームの収集のためのJavaライブラリの機能

リアクティブ・ストリームの収集(RSI)のJavaライブラリは、データの挿入にOracle Databaseのダイレクト・パス・ロード方式を使用します。また、Oracle Universal Connection Pool (UCP)を使用して、表パーティション、Oracle RAC接続アフィニティ、シャーディングなどの高可用性およびスケーラビリティ機能もいくつか提供します。そのため、このライブラリには次のような機能の利点があります。

24.2.1 リアクティブ・ストリームの収集

これは、リアクティブ・ストリームの収集のJavaライブラリのコア機能であり、ロジックを処理するAPIが提供されます。

ライブラリには、Java.util.concurrent.Flowサブスクライバ・インタフェースが実装されます。サブスクライバ・インタフェースのメソッドを起動するには、Java.util.concurrent.Flowパブリッシャ・インタフェースを実装する必要があります。サブスクライバ・インタフェースには、次のメソッドがあります。

  • onSubscribe

    このメソッドは、サブスクライバ・インタフェースとパブリッシャ・インタフェースの初期関係を確立するために1回のみ起動します。

  • onNext

    このメソッドは、ライブラリが提供するサブスクライバ・インタフェースの実装に新しい行を作成するために起動します。

  • onError

    このメソッドは、収集プロセス中に発生する可能性があるエラーの場合に起動します。

  • onComplete

    このメソッドは、収集ジョブの完了時に起動します。

サブスクライバ・インタフェースは、Java.util.concurrent.Flowサブスクライバ・インタフェースのrequest(n)メソッドまたはcancelメソッドをコールして、さらにデータを受け入れることができるか、データの収集を停止する必要があるかを示します。

24.2.2 ダイレクト・パス・ロード

リアクティブ・ストリームの収集のJavaライブラリは、非ブロックのデータ収集にOracle Databaseのダイレクト・パス・ロード方式を使用します。この方法では、Oracleデータ・ブロックをフォーマットし、そのデータ・ブロックをデータベース・ファイルに直接書き込むため、SQLレイヤーのオーバーヘッドが大幅に削減されます。

ダイレクト・パス・ロード・メソッド・コール中、表内の既存データの後ろに挿入データが追加されます。このメソッドは、データをデータファイルに直接書き込み、バッファ・キャッシュを回避します。表内の空き領域の再利用は行わず、参照整合性制約は無視します。そのため、ダイレクト・パス・ロードは従来型の挿入よりもパフォーマンスが大幅に優れています。

24.2.3 ユニバーサル接続プール

リアクティブ・ストリームの収集(RSI)のJavaライブラリは、シャーディング・トポロジの知識、Oracle Real Application Cluster (Oracle RAC)データベースの高速アプリケーション通知(FAN)の認識など、様々な接続プーリングおよび管理アクティビティのためにユニバーサル接続プール(UCP)を使用します。

RSIライブラリは、UCPシャーディングAPIを使用して、指定されたシャーディング・キーのための適切な接続を確立します。RSI内の各レコードを一意のチャンクIDにマップしてから、一意のチャンクIDを使用してこれらのレコードをグループ化できます。RSIライブラリにデータベースに送信する十分なレコードがある場合は、UCPからチャンク固有の接続を借用してシャード・データベースにレコードを挿入します。

24.3 リアクティブ・ストリームの収集(RSI)モードについて

現在のリリースで導入されている新しいDataLoadモードは、Javaライブラリをリアクティブ・ストリームの収集(RSI)に使用して、大規模INSERTバッチをデータベースに対して実行する場合に便利です。

Oracle Database 23cリリース以降では、ビジネス・ユース・ケースに応じて、リアクティブ・ストリームの収集(RSI)のJavaライブラリを次のモードで使用できます。

  • ストリーム・モード:挿入する行数が無限であるが、一度に大量の行を挿入する必要がないサーバーでRSIを使用する場合に、このデフォルト・モードを使用します。
  • DataLoadモード: データベースに一度に挿入される既知の大規模なレコード・リストがある場合は、DataLoadモードを使用します。

DataLoadモードとストリーム・モードの主な違いは、次のとおりです。

  • DataLoadモードでは、RSIインスタンスがクローズされるまで変更はコミットされません。デフォルトのストリーム・モードでは、変更は定期的にコミットされます。これは、大規模なINSERTバッチをデータベースに対して実行する場合に、スループットに悪影響を及ぼす可能性があります。
  • DataLoadモードでは、各ワーカー・スレッドに独自のJDBC接続があります。そのため、挿入タスクの実行に必要なJDBC接続の数を減らす作業はありません。この動作は、ワーカー・スレッドがJDBC接続のプールを共有するデフォルトのストリーム・モードとは異なります。

24.3.1 DataLoadモードの有効化

リアクティブ・ストリームの収集のためのJavaライブラリでは、ストリーム・モードがデフォルトで有効になっています。DataLoadモードを有効にするには、useDataLoadModeメソッドを使用する必要があります。

次のコード・スニペットに示すように、DataLoadモードを有効にします。
ReactiveStreamsIngestion.Builder rsiBuilder = ReactiveStreamsIngestion.builder()
    .useDataLoadMode()
    .username("<user_name>")
    .password("<password>")
    .url("jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS=(PROTOCOL=tcp)
(HOST=myhost.com)(PORT=5521))(CONNECT_DATA=(SERVICE_NAME=myservice.com)))")
    .table("customers")
    .columns (new String[] { "id", "name", "region" });
// Use try-with-resource statement to ensure that RSI instance is closed at the
// end of the statement.
try (ReactiveStreamsIngestion rsi = rsiBuilder.build()){
   // Publish Records.
}

24.4 コード・サンプル: リアクティブ・ストリームの収集のためのJavaライブラリ

この項では、リアクティブ・ストリームの収集ライブラリの使用方法を表すコード・サンプルを示します。

24.4.1 PushPublisher

これは、リアクティブ・ストリームの収集(RSI)ライブラリを使用する最も簡単な方法です。RSIライブラリはjava.util.concurrent.Flow.Subscriberインタフェースを実装し、アプリケーション内のJavaコードはjava.util.concurrent.Flow.Publisherインタフェースを実装します。

次の例に、パブリッシャを作成し、RSIライブラリがそのパブリッシャをサブスクライブするこの実装を示します。

例24-1 PushPublisher

package oracle.rsi.demos;
import java.sql.SQLException;
import java.time.Duration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import oracle.rsi.ReactiveStreamsIngestion;
import oracle.rsi.PushPublisher;

public class SimplePushPublisher {

  public static void main(String[] args) throws SQLException {

    ExecutorService workerThreadPool = Executors.newFixedThreadPool(2);

    ReactiveStreamsIngestion rsi = ReactiveStreamsIngestion
        .builder()
        .url(
            "jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS=(PROTOCOL=tcp)
(HOST=example.com)(PORT=5521))(CONNECT_DATA=(SERVICE_NAME=myservice.com)))")
        .username(<user_name>)
        .password(<password>)
        .schema(<schema_name>)
        .executor(workerThreadPool)
        .bufferRows(10)
        .bufferInterval(Duration.ofSeconds(20))
        .table("customers")
        .columns(new String[] { "id", "name", "region" })
        .build();

    PushPublisher<Object[]> pushPublisher = ReactiveStreamsIngestion.pushPublisher();
    pushPublisher.subscribe(rsi.subscriber());

    //Ingests byte arrays using the accept method
    pushPublisher.accept(new Object[] { 1, "John Doe", "North" });
    pushPublisher.accept(new Object[] { 2, "Jane Doe", "North" });
    pushPublisher.accept(new Object[] { 3, "John Smith", "South" });

    try {
      pushPublisher.close();
    } catch (Exception e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }

    rsi.close();

    workerThreadPool.shutdown();

  }

}

24.4.2 Flow.Publisherの動的実装

次の例に、アプリケーションでFlow.Publisherインタフェースを実装してライブラリにサブスクライブする場合に、RSIライブラリを使用する方法を示します。

例24-2 Flow.Publisherの動的実装

package oracle.rsi.demos;
import java.sql.SQLException;
import java.time.Duration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.function.Consumer;

import oracle.rsi.ReactiveStreamsIngestion;

public class SimpleFlowPublisher {

  public static void main(String[] args) throws SQLException {

    ExecutorService workerThreadPool = Executors.newFixedThreadPool(2);

    ReactiveStreamsIngestion rsi = ReactiveStreamsIngestion
        .builder()
        .url(
            "jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS=(PROTOCOL=tcp)(HOST=example.com)(PORT=5521))(CONNECT_DATA=(SERVICE_NAME=myservice.com)))")
        .username(<user_name>)
        .password(<password>)
        .schema(<schema_name>)
        .executor(workerThreadPool)
        .bufferRows(1)
        .bufferInterval(Duration.ofMinutes(60))
        .table("customers")
        .columns(new String[] { "id", "name", "region" })
        .build();

    SimpleObjectPublisher<Object[]> publisher = new SimpleObjectPublisher<Object[]>();
    publisher.subscribe(rsi.subscriber());

    SimpleObjectPublisher<Object[]> anotherPublisher = new SimpleObjectPublisher<Object[]>();
    anotherPublisher.subscribe(rsi.subscriber());
    
    publisher.accept(new Object[] { 1, "John Doe", "North" });
    publisher.accept(new Object[] { 2, "Jane Doe", "North" });
    publisher.accept(new Object[] { 3, "John Smith", "South" });

    anotherPublisher.accept(new Object[] { 4, "John Doe", "North" });
    anotherPublisher.accept(new Object[] { 5, "Jane Doe", "North" });
    anotherPublisher.accept(new Object[] { 6, "John Smith", "South" });
    
    rsi.close();

    workerThreadPool.shutdown();

  }

}

class SimpleObjectPublisher<T> implements Publisher<T>, Consumer<T> {

  Subscriber<? super T> subscriber;
  
  Subscription subscription = new SimpleObjectSubscription(); 
 
 //Data streaming starts

  @Override
  public void subscribe(Subscriber<? super T> subscriber) {
    this.subscriber = subscriber;
    this.subscriber.onSubscribe(subscription);
  }
  
  @Override
  public void accept(T t) {
    subscriber.onNext(t);
  }
  
}

 // You must provide this subscription
class SimpleObjectSubscription implements Subscription {

  @Override
  public void request(long n) {
    System.out.println("Library requesting: " + n + " records");
  }

  @Override
  public void cancel() {
    // TODO Auto-generated method stub
  }
  
}

24.4.3 Flow.Publisherのサードパーティ実装

次の例に、サードパーティがFlow.Publisherインタフェースを実装する場合に、RSIライブラリを使用する方法を示します。

次の例では、標準のJDK SubmissionPublisherインタフェースがサードパーティ・インタフェースです。

例24-3 Flow.Publisherのサードパーティ実装

package oracle.rsi.demos;
import java.sql.SQLException;
import java.time.Duration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.SubmissionPublisher;

import oracle.rsi.ReactiveStreamsIngestion;

public class SimpleSubmissionPublisher {

  public static void main(String[] args) throws SQLException {

    ExecutorService workerThreadPool = Executors.newFixedThreadPool(2);

    ReactiveStreamsIngestion rsi = ReactiveStreamsIngestion
        .builder()
        .url(
            "jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS=(PROTOCOL=tcp)
(HOST=example.com)(PORT=5521))(CONNECT_DATA=(SERVICE_NAME=myservice.com)))")
        .username(<user_name>)
        .password(<password>)
        .schema(<schema_name>)
        .executor(workerThreadPool)
        .bufferRows(10)
        .bufferInterval(Duration.ofSeconds(20))
        .table("customers")
        .columns(new String[] { "id", "name", "region" })
        .build();

    SubmissionPublisher<Object[]> publisher = new SubmissionPublisher<>();
    publisher.subscribe(rsi.subscriber());
    
    publisher.submit(new Object[] { 1, "John Doe", "North" });
    publisher.submit(new Object[] { 2, "Jane Doe", "North" });
    publisher.submit(new Object[] { 3, "John Smith", "South" });
    
    while (publisher.estimateMaximumLag() > 0);
    
    try {
      publisher.close();
    } catch (Exception e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }

    rsi.close();

    workerThreadPool.shutdown();

  }

}

24.5 リアクティブ・ストリームの収集のためのJavaライブラリの制限事項

データ・ストリームのリアクティブ・ストリームの収集では、予期しないクラッシュまたはエラーが発生すると、データの一部がデータベースに収集されないことがあります。これは、データベースにレコードを格納しようとするときに一部のレコードが失われる可能性があるためです。

データベースのクラッシュによる潜在的なデータ損失とは別に、このライブラリには次の制限事項があります。

  • データベース・トリガーはサポートされません。
  • 参照整合性はチェックされません。
  • クラスタ表はサポートされません。
  • リモート・オブジェクトのロードはサポートされません。
  • VARRAY列のロードはサポートされません。
  • ストリームを含むLONGデータ型はサポートされません。
  • LONGデータ型がデータベース表の最後の列であると想定します。
  • LOBデータ型の列の前に、すべてのパーティション列が表示されると想定します。