22 JDBCリアクティブ・エクステンション

リアクティブ・エクステンションは、非同期データベース・アクセスを提供するためにJDBC標準を拡張する一連のメソッドです。

リアクティブ・エクステンションは、接続オブジェクトの作成、SQL文の実行、行のフェッチ、トランザクションのコミット、接続オブジェクトのロールバック、およびBFILE、BLOB、CLOBの読取りと書込みに対して非ブロッキング・メカニズムを使用します。

この章の内容は次のとおりです。

22.1 JDBCリアクティブ・エクステンションの概要

リアクティブ・エクステンションは、java.util.concurrent.Flowインタフェースによって定義された、パブリッシャおよびサブスクライバ・タイプを実装します。このインタフェースは、リアクティブ・ストリームの標準のJDK表現です。

リアクティブ・エクステンションでは、データベース操作をブロックしないために単一のJava NIOセレクタが使用されます。

JDBCリアクティブ・エクステンションの使用要件

JDBCリアクティブ・エクステンションを使用する場合は、次を実行する必要があります。

  • 接続の構築にJDBC Thinドライバのみを使用します
  • JDK 11およびojdbc11.jarを使用します
  • Oracle JDBCドライバ20c以降を使用します

22.2 リアクティブ・エクステンションによるアプリケーションの作成について

この項では、リアクティブ・エクステンションを使用してアプリケーションを作成するために従う必要があるステップについて説明します。

リアクティブ・エクステンションを使用してアプリケーションを作成するステップは、標準の方法でアプリケーションを作成する場合と同じです。ただし、リアクティブ・エクステンションの場合は、新しい非同期メソッドを使用します。この項では、次の各項で様々な非同期メソッドを使用する方法について説明します。

22.2.1 非同期メソッドを使用した接続のオープン

OracleConnectionBuilderインタフェースでは、接続を非同期に開くためのメソッドが提供されています。

OracleConnectionBuilder.buildConnectionPublisherOracleメソッドは、Flow.Publisher<OracleConnection>タイプを返します。パブリッシャは、サブスクライバに対して単一の接続を発行します。サブスクライバが要求を発行すると、パブリッシャは非同期に新しい接続を開きます。公開された接続は、ConnectionBuilder.buildメソッドを使用して構築できる接続と同じです。

次の例では、接続を非同期に開く方法を示しています。

  /**
   * Asynchronously opens a new connection
   * @param dataSource Datasource configured with a URL, User, and Password
   * @return A Publisher that emits a single connection
   * @throws SQLException If a database access error occurs before the
   * connection can be opened
   */
  Flow.Publisher<OracleConnection> openConnection(DataSource dataSource)
    throws SQLException {
    return dataSource.unwrap(OracleDataSource.class)
      .createConnectionBuilder()
      .buildConnectionPublisherOracle();
  }

22.2.2 非同期メソッドによるSQL文の実行

この項では、非同期メソッドでSQL文を実行する方法について説明します。

OraclePreparedStatementインタフェースでは、非同期SQL実行のためのメソッドが公開されています。それぞれの非同期メソッドが、対応するSQL実行の同期メソッドに類似した機能を実行します。この関係が、次の表に示されています。

表22-1 メソッドの比較

同期メソッド 非同期メソッド
boolean execute Flow.Publisher<Boolean> executeAsyncOracle
long executeLargeUpdate Flow.Publisher<Long> executeUpdateAsyncOracle
long[] executeLargeBatch Flow.Publisher<Long> executeBatchAsyncOracle
ResultSet executeQuery Flow.Publisher<OracleResultSet> executeQueryAsyncOracle

次の項では、非同期メソッドの詳細を説明します。

22.2.2.1 executeAsyncOracleメソッドによる標準SQL文の実行

この項では、標準のexecuteメソッドに相当するexecuteAsyncOracleメソッドについて説明します。

OraclePreparedStatement.executeAsyncOracleメソッドをコールすると、すべてのタイプのSQL文を実行できます。このコールは、Flow.Publisher<Boolean>タイプを返します。パブリッシャは、単一のBooleanを送信し、複数のサブスクライバをサポートします。Boolean値がTRUEの場合、SQL文によって行データが生成され、OraclePreparedStatement.getResultSetメソッドからアクセスできることを意味します。FALSEの場合、SQL文が更新件数を返したことを意味します。Booleanの結果は、executeメソッドによって返されるbooleanと意味的に同じです。
  /**
   * Asynchronously creates a new table by executing a DDL SQL statement
   * @param connection Connection to a database where the table is created
   * @return A Publisher that emits the result of executing DDL SQL
   * @throws SQLException If a database access error occurs before the DDL
   * SQL can be executed
   */
  Flow.Publisher<Boolean> createTable(Connection connection)
    throws SQLException {

    PreparedStatement createTableStatement =
      connection.prepareStatement(
        "CREATE TABLE employee_names (" +
          "id NUMBER PRIMARY KEY, " +
          "first_name VARCHAR(50), " +
          "last_name VARCHAR2(50))");

    Flow.Publisher<Boolean> createTablePublisher =
      createTableStatement.unwrap(OraclePreparedStatement.class)
        .executeAsyncOracle();

    createTablePublisher.subscribe(
      // This subscriber will close the PreparedStatement
      new Flow.Subscriber<Boolean>() {
        public void onSubscribe(Flow.Subscription subscription) {
          subscription.request(1L);
        }
        public void onNext(Boolean item) { }
        public void onError(Throwable throwable) { closeStatement(); }
        public void onComplete() { closeStatement(); }
        void closeStatement() {
          try { createTableStatement.close(); }
          catch (SQLException closeException) { log(closeException); }
        }
      });

    return createTablePublisher;
  }
22.2.2.2 executeUpdateAsyncOracleメソッドによるDML文の実行

この項では、標準のexecuteLargeUpdateメソッドに相当するexecuteUpdateAsyncOracleメソッドについて説明します。

OraclePreparedStatement.executeUpdateAsyncOracleメソッドを使用して、単一(バッチ以外)のDML文を実行できます。このコールは、Flow.Publisher<Long>タイプを返します。返されたパブリッシャは、単一のLong値を送信します。このLong値は、DML文によって更新された行数または挿入される行数を示します。このLong値の結果は、標準のexecuteLargeUpdateメソッドから返されるlong値と意味的に同じです。
  /**
   * Asynchronously updates table data by executing a DML SQL statement
   * @param connection Connection to a database where the table data resides
   * @return A Publisher that emits the number of rows updated
   * @throws SQLException If a database access error occurs before the DML
   * SQL can be executed
   */
  Flow.Publisher<Long> updateData(Connection connection)
    throws SQLException {

    PreparedStatement updateStatement = connection.prepareStatement(
      "UPDATE employee_names SET " +
        "first_name = UPPER(first_name), " +
        "last_name = UPPER(last_name)");

    Flow.Publisher<Long> updatePublisher =
      updateStatement.unwrap(OraclePreparedStatement.class)
        .executeUpdateAsyncOracle();

    updatePublisher.subscribe(
      // This subscriber will close the PreparedStatement
      new Flow.Subscriber<Long>() {
        public void onSubscribe(Flow.Subscription subscription) {
          subscription.request(1L);
        }
        public void onNext(Long item) { }
        public void onError(Throwable throwable) { closeStatement(); }
        public void onComplete() { closeStatement(); }
        void closeStatement() {
          try { updateStatement.close(); }
          catch (SQLException closeException) { log(closeException); }
        }
      });

    return updatePublisher;
  }
22.2.2.3 executeBatchAsyncOracleメソッドによるバッチDML文の実行

この項では、標準のexecuteLargeBatchメソッドに相当するexecuteBatchAsyncOracleメソッドについて説明します。

OraclePreparedStatement.executeBatchAsyncOracleメソッドを使用して、バッチDML文を実行できます。このコールは、Flow.Publisher<Long>タイプを返します。返されたパブリッシャは、バッチ内の各文に対してLong値を送信します。Long値は、各DML文によって更新された行数を示します。これらのLong値の結果は、標準のexecuteLargeBatchメソッドから返されるlong[]値と意味的に同じです。
  /**
   * Asynchronously loads table data by executing a batch of DML SQL statements.
   * @param connection Connection to a database where the table data resides.
   * @return A Publisher which emits the number of rows updated.
   * @throws SQLException If a database access error occurs before the DML
   * SQL can be executed.
   */
  Flow.Publisher<Long> createData(
    Connection connection, Iterable<Employee> employeeData)
    throws SQLException {

    PreparedStatement batchStatement = connection.prepareStatement(
      "INSERT INTO employee_names (id, first_name, last_name) " +
        "VALUES (?, ?, ?)");

    for (Employee employee : employeeData) {
      batchStatement.setLong(1, employee.id());
      batchStatement.setString(2, employee.firstName());
      batchStatement.setString(3, employee.lastName());
      batchStatement.addBatch();
    }

    Flow.Publisher<Long> batchPublisher =
      batchStatement.unwrap(OraclePreparedStatement.class)
        .executeBatchAsyncOracle();

    batchPublisher.subscribe(
      // This subscriber will close the PreparedStatement
      new Flow.Subscriber<Long>() {
        public void onSubscribe(Flow.Subscription subscription) {
          subscription.request(Long.MAX_VALUE);
        }
        public void onNext(Long item) { }
        public void onError(Throwable throwable) { closeStatement(); }
        public void onComplete() { closeStatement(); }
        void closeStatement() {
          try { batchStatement.close(); }
          catch (SQLException closeException) { log(closeException); }
        }
      });

    return batchPublisher;
  }
22.2.2.4 executeQueryAsyncOracleメソッドによるSQL問合せの実行

この項では、標準のexecuteQueryメソッドに相当するexecuteQueryAsyncOracleメソッドについて説明します。

OraclePreparedStatement.executeQueryAsyncOracleメソッドを使用して、SQL問合せ文を実行できます。このコールは、Flow.Publisher<OracleResultSet>タイプを返します。返されたパブリッシャは、単一のOracleResultSet値を発行します。OracleResultSet値により、SQL問合せの結果の行データにアクセスできます。このOracleResultSetは、標準のexecuteQueryメソッドによって返されるResultSetと意味的に同じです。
  /**
   * Asynchronously reads table data by executing a SELECT SQL statement
   * @param connection Connection to a database where the table resides
   * @return A Publisher that emits the number of rows updated
   * @throws SQLException If a database access error occurs before the SELECT
   * SQL can be executed
   */
  Flow.Publisher<OracleResultSet> readData(Connection connection)
    throws SQLException {

    PreparedStatement queryStatement = connection.prepareStatement(
      "SELECT id, first_name, last_name FROM employee_names");

    Flow.Publisher<OracleResultSet> queryPublisher =
      queryStatement.unwrap(OraclePreparedStatement.class)
        .executeQueryAsyncOracle();

    // Close the PreparedStatement after the result set is consumed.
    queryStatement.closeOnCompletion();

    return queryPublisher;
  }

22.2.3 非同期メソッドによる行データのフェッチについて

この項では、非同期メソッドで行データをフェッチする方法について説明します。

OracleResultSetインタフェースでは、非同期行データ・フェッチのためのpublisherOracle(Function<OracleRow, T>)メソッドが公開されています。このメソッドの引数は、行データのマッピング関数です。マッピング関数は、ResultSetの各行に適用されます。このメソッドは、Flow.Publisher<T>タイプを返します。Tはマッピング関数の出力タイプです。マッピング関数の入力タイプはOracleRowです。OracleRowResultSetの単一行を表し、その行の列値にアクセスするためのメソッドを公開します。

次の例では、非同期メソッドで行データをフェッチする方法を示しています。

  /**
   * Asynchronously fetches table data by from a ResultSet.
   * @param resultSet ResultSet which fetches table data.
   * @return A Publisher which emits the fetched data as Employee objects.
   * @throws SQLException If a database access error occurs before table data is
   * fetched.
   */
  Flow.Publisher<Employee> fetchData(ResultSet resultSet)
    throws SQLException {
    // Before closing the ResultSet with publisherOracle(..), first obtain a
    // reference to the ResultSet's Statement. The Statement needs to be closed
    // after all data has been fetched.
    Statement resultSetStatement = resultSet.getStatement();

    Flow.Publisher<Employee> employeePublisher =
      resultSet.unwrap(OracleResultSet.class)
        .publisherOracle(oracleRow -> {
          try {
            return new Employee(
              oracleRow.getObject("id", Long.class),
              oracleRow.getObject("first_name", String.class),
              oracleRow.getObject("last_name", String.class));
          }
          catch (SQLException getObjectException) {
            // Unchecked exceptions thrown by a row mapping function will be
            // emitted to each Subscriber's onError method.
            throw new RuntimeException(getObjectException);
          }
        });

    employeePublisher.subscribe(
      // This subscriber will close the ResultSet's Statement
      new Flow.Subscriber<Employee>() {
        public void onSubscribe(Flow.Subscription subscription) {
          subscription.request(Long.MAX_VALUE);
        }
        public void onNext(Employee item) { }
        public void onError(Throwable throwable) { closeStatement(); }
        public void onComplete() { closeStatement(); }
        void closeStatement() {
          try { resultSetStatement.close(); }
          catch (SQLException closeException) { log(closeException); }
        }
      });

    return employeePublisher;
  }

マッピング関数が入力として受け取るOracleRowのインスタンスは、関数が戻った後に無効になります。OracleRowのアクセスをマッピング関数の範囲に制限することで、行データの格納に使用されるメモリーをドライバで効率的に管理できます。OracleRowの永続コピーが必要な場合は、OracleRow.cloneメソッドを使用して、元のOracleRowデータのコピーによってバッキングされるOracleRowの新しいインスタンスを作成できます。cloneメソッドにより返されるOracleRowは、マッピング関数の範囲外で有効なままであり、データベース接続が閉じた後もそのデータを保持します。

行マッピング関数は、null以外の値を返すか、未チェックの例外をスローする必要があります。マッピング関数が未チェックの例外をスローした場合、onErrorシグナルとして行データのサブスクライバに配信されます。行データ・パブリッシャは、複数のサブスクライバをサポートしています。複数のサブスクライバへの行データの送信は、次に示す特定のポリシーに従います。

  • サブスクライバは、onSubscribeシグナルを受信する前に送信された行データのonNextシグナルを受信しません。
  • 他のすべてのサブスクライバが要求を通知するまで、サブスクライバはonNextシグナルを受信しません。

次の表は、複数のサブスクライバで作業している場合のイベント・フローを示しています。

表22-2 複数のサブスクライバへの送信

時間 イベント 原因
0 SubscriberAがonSubscribeシグナルを受信します 行データ・パブリッシャのサブスクライブ(サブスクライバ)メソッドへのコールが、SubscriberAのサブスクリプションをリクエストしました
1 SubscriberAが1行をリクエストします SubscriberAがサブスクリプション上で要求を通知しました
2 SubscriberAがResultSetsの1行目のデータを受信します 行データ・パブリッシャは、SubscriberAによってリクエストされたデータの行をフェッチしました
3 SubscriberBがonSubscribeシグナルを受信します 行データ・パブリッシャのサブスクライブ(サブスクライバ)メソッドへのコールが、SubscriberBのサブスクリプションをリクエストしました
4 SubscriberAが1行をリクエストします SubscriberAがサブスクリプション上で要求を通知しました
5 SubscriberBが1行をリクエストします SubscriberBがサブスクリプション上で要求を通知しました
6 SubscriberAとSubscriberBの両方が、ResultSetの2行目のデータを受信します 行データ・パブリッシャは、両方のサブスクライバによってリクエストされたデータの行をフェッチしました
7 SubscriberAが1行をリクエストします SubscriberAがサブスクリプション上で要求を通知しました
8 行データは送信されません。 行データ・パブリッシャは、すべてのサブスクライバがリクエストするまで、次の行を送信しません。
9 SubscriberBが1行をリクエストします SubscriberBがサブスクリプション上で要求を通知しました
10 SubscriberAとSubscriberBの両方が、ResultSetの3行目のデータを受信します 行データ・パブリッシャは、両方のサブスクライバによってリクエストされたデータの行をフェッチしました

ノート:

SubscriberBは、最初の行のデータを受信しませんでした。これは、最初の行が送信された後にSubscriberBがサブスクライブしたためです。また、データが8秒で送信されませんでした。これは、すべてのサブスクライバが、送信される前に次の行をリクエストする必要があるためです。8秒でSubscriberAが次の行をリクエストしましたが、SubscriberBはそれまでリクエストを発行しませんでした。

22.2.4 非同期メソッドを使用したLOBデータの読取り

OracleBlobOracleBFileOracleClobおよびOracleNClobインタフェースでは、LOBデータの非同期読取りのためのpublisherOracle(long)メソッドが公開されています。

publisherOracle(long)メソッドの引数は、データが読み取られるLOBの位置です。OracleBlob.publisherOracle(long)およびOracleBFile.publisherOracle(long)メソッドは、Publisher< byte[]>タイプを返します。このパブリッシャは、LOBから読み取られたバイナリ・データのセグメントを送信します。OracleClob.publisherOracle(long)およびOracleNClob.publisherOracle(long)メソッドは、Publisher<String>タイプを返します。このパブリッシャは、LOBから読み取られた文字データのセグメントを送信します。

次の例では、LOBからバイナリ・データを非同期に読み取る方法を示しています。

  /**
   * Asynchronously reads binary data from a BLOB
   * @param connection Connection to a database where the BLOB resides
   * @param employeeId ID associated to the BLOB
   * @return A Publisher that emits binary data of a BLOB
   * @throws SQLException If a database access error occurs before the
   * BLOB can be read
   */
  Flow.Publisher<byte[]> readLOB(Connection connection, long employeeId)
    throws SQLException {
    PreparedStatement lobQueryStatement = connection.prepareStatement(
      "SELECT photo_bytes FROM employee_photos WHERE id = ?");
    lobQueryStatement.setLong(1, employeeId);

    ResultSet resultSet = lobQueryStatement.executeQuery();
    if (!resultSet.next())
      throw new SQLException("No photo found for employee ID " + employeeId);

    OracleBlob photoBlob =
      (OracleBlob)resultSet.unwrap(OracleResultSet.class).getBlob(1);
    Flow.Publisher<byte[]> photoPublisher = photoBlob.publisherOracle(1);

    photoPublisher.subscribe(
      // This subscriber will close the PreparedStatement and BLOB
      new Flow.Subscriber<byte[]>() {
        public void onSubscribe(Flow.Subscription subscription) {
          subscription.request(Long.MAX_VALUE);
        }
        public void onNext(byte[] item) { }
        public void onError(Throwable throwable) { freeResources(); }
        public void onComplete() { freeResources(); }
        void freeResources() {
          try { lobQueryStatement.close(); }
          catch (SQLException closeException) { log(closeException); }
          try { photoBlob.free(); }
          catch (SQLException freeException) { log(freeException); }
        }
      });
    return photoPublisher;
  }

LOBパブリッシャによって送信されるデータ・セグメントのサイズは構成できません。ドライバは、最適化されるセグメント・サイズを、データベースのDB_BLOCK_SIZEパラメータに従って選択します。

22.2.5 非同期メソッドを使用したLOBデータの書込み

OracleBlobOracleClobおよびOracleNClobインタフェースでは、LOBデータの非同期書込みのためのsubscriberOracle(long)メソッドが公開されています。

subscriberOracle(long)メソッドの引数は、データが書き込まれるLOBの位置です。OracleBlob.subscriberOracle(long)メソッドは、Subscriber<byte[]>タイプを返します。このサブスクライバは、LOBに書き込まれるバイナリ・データのセグメントを受信します。OracleClob.subscriberOracle(long)メソッドおよびOracleNClob.subscriberOracle(long)メソッドは、Subscriber<String>タイプを返します。これらのサブスクライバは、LOBに書き込まれる文字データのセグメントを受信します。

次の例では、LOBにバイナリ・データを非同期に書き込む方法を示しています。

  /**
   * Asynchronously writes binary data to a BLOB
   * @param connection Connection to a database where the BLOB resides
   * @param bytesPublisher Publisher that emits binary data
   * @return A CompletionStage that completes with a reference to the BLOB,
   * after all binary data is written.
   * @throws SQLException If a database access error occurs before the table data is
   * fetched
   */
  CompletionStage<Blob> writeLOB(
    Connection connection, Flow.Publisher<byte[]> bytesPublisher)
    throws SQLException {

    OracleBlob oracleBlob =
      (OracleBlob) connection.unwrap(OracleConnection.class).createBlob();

    // This future is completed after all bytes have been written to the BLOB
    CompletableFuture<Blob> writeFuture = new CompletableFuture<>();

    Flow.Subscriber<byte[]> blobSubscriber =
      oracleBlob.subscriberOracle(1L,
        // This Subscriber will receive a terminal signal when all byte[]'s
        // have been written to the BLOB.
        new Flow.Subscriber<Long>() {
          long totalWriteLength = 0;
          @Override
          public void onSubscribe(Flow.Subscription subscription) {
            subscription.request(Long.MAX_VALUE);
          }
          @Override
          public void onNext(Long writeLength) {
            totalWriteLength += writeLength;
            log(totalWriteLength + " bytes written.");
          }
          @Override
          public void onError(Throwable throwable) {
            writeFuture.completeExceptionally(throwable);
          }
          @Override
          public void onComplete() {
            writeFuture.complete(oracleBlob);
          }
        });

    bytesPublisher.subscribe(blobSubscriber);
    return writeFuture;
  }

OracleBlobOracleClobおよびOracleNClobインタフェースでは、単一引数形式のsubscriberOracle(long)メソッドと同じ機能を実行するsubscriberOracle(long, Subscriber<Long>)メソッドも公開されています。ただし、単一引数形式では、Subscriber<Long>タイプも受け入れます。Subscriber<Long>タイプは、データベースに対する書込み操作の結果をサブスクライバに受信するように通知します。非同期の書込み操作が完了するたびに、Subscriber<Long>タイプは、操作によって書き込まれたバイト数または文字数を示すonNextシグナルを受信します。非同期の書込み操作が失敗した場合、Subscriber<Long>タイプはonErrorシグナルを受信します。最後の書込み操作が完了した後、Subscriber<Long>onCompleteシグナルを受信します。

writeLOBメソッドによって返されるCompletionStage<Blob>が完了した後、結果のBLOBオブジェクトをinsertLOBメソッドに渡し、BLOBデータを表に格納できます。

次の例では、データを挿入する方法を示しています。

  /**
   * Asynchronously inserts BLOB data into a table by executing a DML SQL
   * statement
   * @param connection Connection to a database where the table data resides
   * @param employeeId ID related to the BLOB data
   * @param photoBlob Reference to BLOB data
   * @return A Publisher that emits the number of rows inserted (always 1)
   * @throws SQLException If a database access error occurs before the DML
   * SQL can be executed
   */
  Flow.Publisher<Long> insertLOB(
    Connection connection, long employeeId, Blob photoBlob)
    throws SQLException {

    PreparedStatement lobInsertStatement = connection.prepareStatement(
      "INSERT INTO employee_photos(id, photo_bytes) VALUES (? ,?)");
    lobInsertStatement.setLong(1, employeeId);
    lobInsertStatement.setBlob(2, photoBlob);

    Flow.Publisher<Long> insertPublisher =
      lobInsertStatement.unwrap(OraclePreparedStatement.class)
        .executeUpdateAsyncOracle();

    insertPublisher.subscribe(new Flow.Subscriber<Long>() {
      @Override
      public void onSubscribe(Flow.Subscription subscription) {
        subscription.request(1L);
      }
      @Override
      public void onNext(Long item) { }
      @Override
      public void onError(Throwable throwable) { releaseResources(); }
      @Override
      public void onComplete() { releaseResources(); }
      void releaseResources() {
        try { lobInsertStatement.close(); }
        catch (SQLException closeException) { log(closeException); }
        try { photoBlob.free(); }
        catch (SQLException freeException) { log(freeException); }
      }
    });
    return insertPublisher;
  }

22.2.6 非同期メソッドを使用したトランザクションのコミット

OracleConnectionインタフェースでは、非同期トランザクションの完了のためのcommitAsyncOracleおよびrollbackAsyncOracleメソッドが公開されています。

commitAsyncOracleメソッドおよびrollbackAsyncOracleメソッドはどちらもFlow.Publisher<Void>タイプを返します。パブリッシャーは、<Void>タイプで示されているように、いかなるアイテムも送信しません。パブリッシャーは、コミットまたはロールバック操作が正常に完了したかどうかを示すために、単一のonCompleteまたはonErrorシグナルを送信します。

次の例では、トランザクションを非同期にコミットする方法を示しています。

  /**
   * Asynchronously commits a transaction
   * @param connection Connection to a database with an active transaction
   * @return A Publisher that emits a terminal signal when the transaction
   * has been committed
   * @throws SQLException If a database access error occurs before the
   * transaction can be committed
   */
  public Flow.Publisher<Void> commitTransaction(Connection connection)
    throws SQLException {
    return connection.unwrap(OracleConnection.class)
      .commitAsyncOracle();
  }

commitAsyncOracleおよびrollbackAsyncOracleメソッドは、Connection.commitおよびConnection.rollbackメソッドと同じ機能を実行します。

22.2.7 非同期メソッドを使用した接続のクローズ

OracleConnectionインタフェースでは、非同期接続を閉じるためのcloseAsyncOracleメソッドが公開されています。

closeAsyncOracleメソッドは、Flow.Publisher<Void>タイプを返します。パブリッシャーは、<Void>タイプで示されているように、いかなるアイテムも送信しません。パブリッシャは、接続が正常に閉じられたかどうかを示すために単一のonCompleteまたはonErrorシグナルを送信します。

次の例では、接続を非同期に閉じる方法を示しています。

  /**
   * Asynchronously closes a connection
   * @param connection Connection to be closed
   * @return A Publisher that emits a terminal signal when the connection
   * has been closed
   * @throws SQLException If a database access error occurs before the
   * connection can be closed
   */
  Flow.Publisher<Void> closeConnection(Connection connection)
    throws SQLException {
    return connection.unwrap(OracleConnection.class)
      .closeAsyncOracle();
  }

closeAsyncOracleメソッドは、Connection.closeメソッドと同じ機能を実行します。

22.3 非同期メソッドのスレッド・モデル

この項では、非同期メソッドのスレッド・モデルについて説明します。

非同期メソッドがコールされた場合、コール側スレッドで可能なかぎり多くの処理がネットワーク読取りへのブロッキングなしで実行されます。非同期メソッド・コールは、リクエストがネットワークに書き込まれた直後にレスポンスを待機せずに戻ります。オペレーティング・システムの書込みバッファの大きさが十分でないために完全なリクエストを格納できない場合、コール側スレッドはこのバッファがフラッシュされるまでブロックされる可能性があります。

書込みジョブが完了すると、ネットワーク・チャネルはI/Oの準備状況ポーリングのために登録されます。1つのスレッドが、同じJVM内のすべてのOracle JDBC接続のネットワーク・チャネルをポーリングします。I/Oポーリング・スレッドはoracle.net.nt.TcpMultiplexerという名前で、デーモン・スレッドとして構成されます。このポーリング・スレッドは、セレクタを使用してブロッキング操作を実行します。

ネットワーク・チャネルに対してI/Oの準備状況が検出されると、ポール側スレッドはイベントを処理するワーカー・スレッドを用意します。ワーカー・スレッドは、ネットワークから読み取った後、操作が完了したことをパブリッシャに通知します。通知されると、パブリッシャは、そのサブスクライバごとにシグナルを送信するワーカー・スレッドを調整します。

java.util.concurrent.Executorインタフェースは、ワーカー・スレッドを管理します。デフォルトのエグゼキュータはjava.util.concurrent.ForkJoinPool.commonPoolメソッドです。OracleConnectionBuilder.executorOracle(Executor)メソッドをコールして、代替エグゼキュータを指定できます。

22.4 Flow APIについて

java.util.concurrent.Flowタイプでは、リアクティブ・ストリームの作成に使用できる最低限の操作セットが定義されています。

Flow APIに対して、アプリケーション・コードを直接記述できます。ただし、次のリンクで指定されたリアクティブ・ストリーム仕様に従って低レベルの信号処理を実装する必要があります

https://github.com/reactive-streams/reactive-streams-jvm/blob/master/README.md

JDBCリアクティブ・エクステンションAPIでは、java.util.concurrent.Flow.Publisherおよびjava.util.concurrent.Flow.Subscriberを使用して、アプリケーションに対するFlow APIの低レベルのメカニズムを処理します。次のリンクで指定されたreactive-streams-jvmプロジェクトによって定義されたorg.reactivestreams.Publisherおよびorg.reactivestreams.Subscriberタイプを含むReactor、RxJava、Akka-Streamsなどの人気のライブラリ。

https://github.com/reactive-streams/reactive-streams-jvm/tree/master/api/src/main/java/org/reactivestreams

org.reactivestreams.Publisherタイプとorg.reactivestreams.Subscriberタイプ、およびそのjava.util.concurrent.Flowの対応部分は同じインタフェースを宣言しますが、Javaコンパイラはそれらをまだ個別のタイプとみなす必要があります。次のリンクで指定されているorg.reactivestreams.FlowAdaptersクラスを使用して、Flowタイプとorg.reactivestreamsタイプとの間を変換できます。

https://github.com/reactive-streams/reactive-streams-jvm/tree/master/api/src/main/java9/org/reactivestreams

Oracleでは、JDBCドライバによって公開されているFlowタイプとインタフェースをとるときに、リアクティブ・ストリーム・ライブラリを使用することをお薦めしています。

22.5 FlowAdaptersクラスの使用方法

この項で説明するライブラリは、Flow APIについての項で説明するorg.reactivestreamsタイプとインタフェースをとります。

次のコードの抜粋に、FlowAdaptersクラスがFlow.Publisherタイプをorg.reactivestreamsタイプに変換する方法を示します。

例22-1 org.reactivestreamsタイプへの変換

public static org.reactivestreams.Publisher<ResultSet> publishQuery(
  PreparedStatement queryStatement) {
  try {
    Flow.Publisher<OracleResultSet> queryPublisher =
      queryStatement.unwrap(OraclePreparedStatement.class)
        .executeQueryAsyncOracle();

    return FlowAdapters.toPublisher(queryPublisher);
  }
  catch (SQLException sqlException) {
    return createErrorPublisher(sqlException);
  }
}

public static <T> org.reactivestreams.Publisher<T> publishRows(
  ResultSet resultSet, Function<OracleRow, T> rowMappingFunction) {
  try {
    Flow.Publisher<T> rowPublisher =
      resultSet.unwrap(OracleResultSet.class)
        .publisherOracle(rowMappingFunction);

    return FlowAdapters.toPublisher(rowPublisher);
  }
  catch (SQLException sqlException) {
    return createErrorPublisher(sqlException);
  }
}

22.6 リアクタ・ライブラリによる行データのストリーミング

リアクタ・ライブラリは、多数のアイテムのストリームを表すFluxタイプと、単一のアイテムのみのストリームを表すMonoタイプを定義します。

次の例では、リアクティブ・エクステンションを使用して行データのFluxを作成する方法を示しています。
      private Publisher<Employee> queryAllEmployees(Connection connection) {

      return Flux.using(

        // Prepare a SQL statement.

        () -> connection.prepareStatement("SELECT * FROM emp"),


        // Execute the PreparedStatement.

        preparedStatement ->

          // Create a Mono which emits one ResultSet.

          Mono.from(publishQuery(preparedStatement))

            // Flat map the ResultSet to a Flux which emits many Rows. Each row

            // is mapped to an Employee object.

            .flatMapMany(resultSet ->

              publishRows(resultSet, row -> mapRowToEmployee(row))),

        

        // Close the PreparedStatement after emitting the last Employee object

        prepareStatement -> {

          try {

            prepareStatement.close();

          }

          catch (SQLException sqlException) {

            throw new RuntimeException(sqlException);

          }

        });

    }

usingファクトリ・メソッドは、明示的に解放する必要があるリソースに依存するFluxを作成します。この場合、リソースは、closeメソッドをコールして解放されるPreparedStatementインスタンスです。

最初のラムダ引数は、PreparedStatementインスタンスを作成します。このラムダは、Fluxがアイテムの送信を開始する前に実行されます。

2番目のラムダ引数は、PreparedStatementインスタンスを使用して、Employeeオブジェクトにマップされる行データのストリームを作成します。最初に、PreparedStatementインスタンスはpublishQueryメソッドへのコールにより実行されます。問合せ実行パブリッシャは単一のResultSetを送信するため、Monoに適合します。問合せの実行が完了すると、Monoは、flatMapManyメソッドで指定されたラムダにResultSetを送信します。このラムダは、OracleRowオブジェクトをEmployeeオブジェクトにマップする関数を使用してpublishRowsメソッドをコールします。この結果、flatMapManyメソッド・コールでEmployeeオブジェクトのFluxが返され、各EmployeeResultSetオブジェクトの行からマップされます。

3番目のラムダ引数は、PreparedStatementインスタンスを閉じます。このラムダは、Fluxが最後のアイテムを送信した後に実行されます。

22.7 RxJavaライブラリによる行データのストリーミング

RxJavaライブラリは、多数のアイテムのストリームを表すFlowableタイプと、単一のアイテムのみのストリームを表すSingleタイプを定義します。

次の例では、リアクティブ・エクステンションを使用して行データのFlowableを作成する方法を示しています。
    private Publisher<Employee> queryAllEmployees(Connection connection) {

      return Flowable.using(

        // Prepare a SQL statement

        () -> connection.prepareStatement("SELECT * FROM emp"),


        // Execute the PreparedStatement

        queryStatement ->

          // Create a Single which emits one ResultSet

          Single.fromPublisher(publishQuery(queryStatement))

            // Flat map the ResultSet to a Flowable which emits many rows, where

            // each row is mapped to an Employee object

            .flatMapPublisher(resultSet ->

              publishRows(resultSet, oracleRow -> mapRowToEmployee(oracleRow))),


        // Close the PreparedStatement after emitting the last Employee object

        PreparedStatement::close

      );

    }

usingファクトリ・メソッドは、明示的に解放する必要があるリソースに依存するFlowableを作成します。この場合、リソースは、closeメソッドをコールして解放されるPreparedStatementインスタンスです。

最初のラムダ引数は、PreparedStatementインスタンスを作成します。このラムダは、Flowableがアイテムの送信を開始する前に実行されます。

2番目のラムダ引数は、PreparedStatementインスタンスを使用して、Employeeオブジェクトにマップされる行データのストリームを作成します。最初に、PreparedStatementインスタンスはpublishQueryメソッド・コールにより実行されます。問合せ実行パブリッシャは単一のResultSetオブジェクトを送信するため、Singleに適合します。問合せの実行が完了すると、Singleは、flatMapPublisherメソッドで指定されたラムダにResultSetオブジェクトを送信します。このラムダは、OracleRowオブジェクトをEmployeeオブジェクトにマップする関数を使用してpublishRowsメソッドをコールします。この結果、flatMapPublisherメソッド・コールでEmployeeオブジェクトのFlowableが返され、各EmployeeResultSetの行からマップされます。

3番目のメソッド・ハンドル引数は、PreparedStatementインスタンスを閉じます。このラムダは、Flowableが最後のアイテムを送信した後に実行されます。

22.8 Akka Streamsライブラリによる行データのストリーミング

Akka Streamsライブラリは、アイテムのストリームを表すSourceタイプを定義します。

次の例では、リアクティブ・エクステンションを使用して行データのSourceを作成する方法を示しています。
    private Source<Employee, NotUsed> queryAllEmployees(Connection connection) {


      final PreparedStatement queryStatement;

      try {

        queryStatement = connection.prepareStatement("SELECT * FROM emp");

      }

      catch (SQLException prepareStatementFailure) {

        return Source.failed(prepareStatementFailure);

      }


      // Create a Source which emits one ResultSet

      return Source.fromPublisher(publishQuery(queryStatement))

        // Flat map the ResultSet to a Source which emits many Rows, where each

        // Row is mapped to an Employee object

        .flatMapConcat(resultSet -> {

          Publisher<Employee> employeePublisher =

            publishRows(resultSet, oracleRow -> mapRowToEmployee(oracleRow));

          return Source.fromPublisher(employeePublisher);

        })

        // This Sink closes the PreparedStatement when the Source terminates

        .alsoTo(Sink.onComplete(result -> queryStatement.close()));

    }

PreparedStatementインスタンスが、Employeeオブジェクトにマップされる行データのストリームを作成するために使用されます。最初に、PreparedStatementインスタンスはpublishQueryメソッド・コールにより実行されます。問合せ実行パブリッシャは、単一のResultSetオブジェクトを送信します。このパブリッシャは、Sourceに適合します。問合せが完了すると、Sourceは、flatMapConcatメソッドで指定されたラムダにResultSetオブジェクトを送信します。

このラムダは、OracleRowオブジェクトをEmployeeオブジェクトにマップする関数を使用してpublishRowsメソッドをコールします。この結果、flatMapConcatメソッド・コールでEmployeeオブジェクトのSourceが返され、各EmployeeResultSetオブジェクトの行からマップされます。

PreparedStatementインスタンスは、明示的に閉じる必要があるリソースです。これは、alsoToメソッド・コールによって処理されます。このコールでは、SourceonCompleteまたはonErrorシグナルを送信したときに、PreparedStatementインスタンスを閉じるSinkが指定されます。

22.9 JDBCリアクティブ・エクステンションの制限

この項では、JDBCリアクティブ・エクステンションの制限事項について説明します。

JDBCリアクティブ・エクステンションには、次の制限事項があります。

  • 接続が、同時に複数の非同期操作を受け入れません。非同期メソッド・コールが、現在進行中の別の操作がある場合にコール側スレッドをブロックします。
  • 非同期メソッドにアクセスするには、タイプをキャストしてアクセスするのではなく、java.sql.Wrapper.unwrapメソッドを使用する必要があります。これにより、接続プールで非同期メソッドを使用する場合など、プロキシOracle JDBCクラスで使用する際に非同期メソッドが正しく動作します。
  • OracleNetレイヤーのセッションの確立は、I/Oバウンド操作のブロックです。
  • ネットワークから大きいレスポンスを読み取るには、I/Oバウンド操作のブロックが必要になる場合があります。読取り操作のブロックは、TCP受信バッファ・サイズより大きいレスポンスをドライバが読み取った場合に発生することがあります。
  • 非同期SQL実行では、スクロール可能なResultSetタイプも機密のResultSetタイプもサポートされていません。