26 パイプライン化されたデータベース操作のサポート

Javaアプリケーションで、前のコールの返答を待たずに複数のSQLリクエストを非同期的にサーバーに送信できるようになりました。

26.1 パイプライン処理の概要

パイプライン処理は、アプリケーションでレスポンスを待つ必要なく複数のリクエストを1つのサーバーに送信できる、ネットワーク通信の形式です。

パイプライン処理は基本的には、サーバーをビジー状態に保ち、アプリケーションでインターリービング・リクエストを適切に使用できるようにすることが目的となっています。アプリケーションでリクエストを送信し続けることができ、サーバーによってキューが構築されてそれらのリクエストが1つずつ処理されます。その後、サーバーによって、リクエストを受信したのと同じ順序で、クライアントにレスポンスが返されます。

パイプラインはレスポンスをあまり頻繁に読み取らないため、以前のSQLレスポンスからのデータを後続のリクエストへのバインド・データとして使用すると、依存関係が作成され、パイプラインが中断されます。そのため、アプリケーションによって、リクエストが独立していることが確認される必要があります。これは、それがパイプライン処理機能の必須条件であるためです。

パイプライン処理によって、アプリケーションに次の利点がもたらされます:

  • レスポンス時間の短縮とスループットの向上
  • コンテキストのスイッチングの減少によるスケーラビリティの向上

26.2 JDBCでのパイプライン処理のサポート

以前のリリースでは、JDBCドライバで、現在のコールが完了するまでは新しいデータベース・コールを開始できませんでした。Oracle Databaseリリース23ai以降では、JDBC Thinドライバで、パイプライン化されたデータベース操作がサポートされるようになりました。

パイプライン処理はレスポンスを待つことなく複数のリクエストを送信することで実行されるため、この機能を非同期プログラミング・モデルと言い換えることができます。この機能では、複数のSQL文の実行を、スレッドでそれらの文の結果が消費される前であっても開始できます。

Oracle Databaseの次の非同期プログラミング機能を使用して、JDBCアプリケーションにパイプライン処理を実装できます。

ノート:

リアクティブ(非同期)APIまたは標準JDBCバッチAPIをコールすると、パイプラインはデフォルトで有効になります。

26.3 リアクティブ・エクステンションを使用したパイプライン処理

Oracle Databaseリリース23ai以降では、リアクティブ・エクステンションAPIを使用して、パイプライン化されたデータベース操作を実行できます。

次のコードでは、リアクティブ・エクステンションAPIを使用してパイプライン処理を実行する方法の簡潔な例を示します:

次の例では、unwrap(OraclePreparedStatement.class)メソッドのコールを使用して、executeUpdateAsyncOracleメソッドとexecuteQueryAsyncOracleメソッドにアクセスします。これらのメソッドでは、リアクティブ・ストリーム仕様を実装するFlow.Publisherが返されます。対応するパブリッシャは、各SQL文の結果を受け取ると、サブスクライバにその結果を伝えます。


...
void pipelineExample(OracleConnection connection) throws SQLException {

  // Prepare statements to execute
  PreparedStatement delete = connection.prepareStatement(
    "DELETE FROM example WHERE id = 0");
  PreparedStatement insert = connection.prepareStatement(
    "INSERT INTO example (id, value) VALUES (1, 'x')");
  PreparedStatement select = connection.prepareStatement(
    "SELECT id, value FROM example WHERE id = 2");

  // Execute statements in a pipeline
  Flow.Publisher<Long> deletePublisher =
    delete.unwrap(OraclePreparedStatement.class)
      .executeUpdateAsyncOracle();
  Flow.Publisher<Long> insertPublisher =
    insert.unwrap(OraclePreparedStatement.class)
      .executeUpdateAsyncOracle();
  Flow.Publisher<OracleResultSet> selectPublisher =
    select.unwrap(OraclePreparedStatement.class)
      .executeQueryAsyncOracle();
...
}

26.4 リアクティブ・ストリームの収集のためのJavaライブラリによるパイプライン処理

Oracle Databaseリリース23ai以降では、リアクティブ・ストリームの収集のためのJavaライブラリを使用して、パイプライン化されたデータベース操作を実行できます。

次のコードでは、リアクティブ・ストリームの収集のためのJavaライブラリを使用してパイプライン処理を実行する方法の簡潔な例を示します:

次の例では、MonoおよびFluxパブリッシャのインスタンスが、それらのfromメソッドをコールすることで作成されます。このメソッドの引数はorg.reactivestreams.Publisherであり、これは、org.reactivestreams.FlowAdapters.toPublisherをコールすることで、Flow.Publisherに合わせて変化します。コールバック関数で結果を非同期的に消費するために、subscribeメソッドをコールします。複数行の結果を消費するため、Mono.flatMapManyのコールにより、単一ResultSetのストリームが複数行値のストリームに変換されます。

import oracle.jdbc.OracleConnection;
import oracle.jdbc.OraclePreparedStatement;
import oracle.jdbc.OracleResultSet;
import org.reactivestreams.FlowAdapters;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Flow;

public class ReactiveExample {

  void reactivePipelineExample(OracleConnection connection) throws SQLException {

    // Push a DELETE into the pipeline
    Mono.using(
        () -> connection.prepareStatement("DELETE FROM example WHERE id = 1"),
        preparedStatement -> Mono.from(publishUpdate(preparedStatement)),
        preparedStatement -> close(preparedStatement))
      .subscribe(deleteCount ->
        System.out.println(deleteCount + " rows deleted"));

    // Push an INSERT operation into the pipeline
    Mono.using(
        () -> connection.prepareStatement(
          "INSERT INTO example (id, value) VALUES (1, 'x')"),
        preparedStatement -> Mono.from(publishUpdate(preparedStatement)),
        preparedStatement -> close(preparedStatement))
      .subscribe(insertCount ->
        System.out.println(insertCount + " rows inserted"));

    // Push a SELECT into the pipeline
    Flux.using(
        () -> connection.prepareStatement(
          "SELECT id, value FROM example ORDER BY id"),
        preparedStatement ->
          Mono.from(publishQuery(preparedStatement))
            .flatMapMany(resultSet -> publishRows(resultSet)),
        preparedStatement -> close(preparedStatement))
      .subscribe(rowString ->
        System.out.println(rowString));
  }
  Publisher<Long> publishUpdate(PreparedStatement preparedStatement) {
    try {
      Flow.Publisher<Long> updatePublisher =
        preparedStatement.unwrap(OraclePreparedStatement.class)
          .executeUpdateAsyncOracle();

      return FlowAdapters.toPublisher(updatePublisher);
    }
    catch (SQLException sqlException) {
      return Mono.error(sqlException);
    }
  }

  Publisher<OracleResultSet> publishQuery(PreparedStatement preparedStatement) {
    try {
      Flow.Publisher<OracleResultSet> queryPublisher =
        preparedStatement.unwrap(OraclePreparedStatement.class)
          .executeQueryAsyncOracle();

      return FlowAdapters.toPublisher(queryPublisher);
    }
    catch (SQLException sqlException) {
      return Mono.error(sqlException);
    }
  }

  Publisher<String> publishRows(ResultSet resultSet) {
    try {
      Flow.Publisher<String> rowPublisher =
        resultSet.unwrap(OracleResultSet.class)
          .publisherOracle(row -> {
            try {
              return String.format("id: %d, value: %s\n",
                row.getObject("id", Long.class),
                row.getObject("value", String.class));
            }
            catch (SQLException sqlException) {
              throw new CompletionException(sqlException);
            }
          });

      return FlowAdapters.toPublisher(rowPublisher);
    }
    catch (SQLException sqlException) {
      return Flux.error(sqlException);
    }
  }

  void close(PreparedStatement preparedStatement) {
    try {
      preparedStatement.close();
    }
    catch (SQLException sqlException) {
      throw new RuntimeException(sqlException);
    }
  }

}