26 パイプライン化されたデータベース操作のサポート
Javaアプリケーションで、前のコールの返答を待たずに複数のSQLリクエストを非同期的にサーバーに送信できるようになりました。
この章では、次の各トピックで、パイプライン処理について詳しく説明します:
26.1 パイプライン処理の概要
パイプライン処理は、アプリケーションでレスポンスを待つ必要なく複数のリクエストを1つのサーバーに送信できる、ネットワーク通信の形式です。
パイプライン処理は基本的には、サーバーをビジー状態に保ち、アプリケーションでインターリービング・リクエストを適切に使用できるようにすることが目的となっています。アプリケーションでリクエストを送信し続けることができ、サーバーによってキューが構築されてそれらのリクエストが1つずつ処理されます。その後、サーバーによって、リクエストを受信したのと同じ順序で、クライアントにレスポンスが返されます。
パイプラインはレスポンスをあまり頻繁に読み取らないため、以前のSQLレスポンスからのデータを後続のリクエストへのバインド・データとして使用すると、依存関係が作成され、パイプラインが中断されます。そのため、アプリケーションによって、リクエストが独立していることが確認される必要があります。これは、それがパイプライン処理機能の必須条件であるためです。
パイプライン処理によって、アプリケーションに次の利点がもたらされます:
- レスポンス時間の短縮とスループットの向上
- コンテキストのスイッチングの減少によるスケーラビリティの向上
26.2 JDBCでのパイプライン処理のサポート
以前のリリースでは、JDBCドライバで、現在のコールが完了するまでは新しいデータベース・コールを開始できませんでした。Oracle Databaseリリース23ai以降では、JDBC Thinドライバで、パイプライン化されたデータベース操作がサポートされるようになりました。
パイプライン処理はレスポンスを待つことなく複数のリクエストを送信することで実行されるため、この機能を非同期プログラミング・モデルと言い換えることができます。この機能では、複数のSQL文の実行を、スレッドでそれらの文の結果が消費される前であっても開始できます。
Oracle Databaseの次の非同期プログラミング機能を使用して、JDBCアプリケーションにパイプライン処理を実装できます。
ノート:
リアクティブ(非同期)APIまたは標準JDBCバッチAPIをコールすると、パイプラインはデフォルトで有効になります。26.3 リアクティブ・エクステンションを使用したパイプライン処理
Oracle Databaseリリース23ai以降では、リアクティブ・エクステンションAPIを使用して、パイプライン化されたデータベース操作を実行できます。
次の例では、unwrap(OraclePreparedStatement.class)
メソッドのコールを使用して、executeUpdateAsyncOracle
メソッドとexecuteQueryAsyncOracle
メソッドにアクセスします。これらのメソッドでは、リアクティブ・ストリーム仕様を実装するFlow.Publisher
が返されます。対応するパブリッシャは、各SQL文の結果を受け取ると、サブスクライバにその結果を伝えます。
...
void pipelineExample(OracleConnection connection) throws SQLException {
// Prepare statements to execute
PreparedStatement delete = connection.prepareStatement(
"DELETE FROM example WHERE id = 0");
PreparedStatement insert = connection.prepareStatement(
"INSERT INTO example (id, value) VALUES (1, 'x')");
PreparedStatement select = connection.prepareStatement(
"SELECT id, value FROM example WHERE id = 2");
// Execute statements in a pipeline
Flow.Publisher<Long> deletePublisher =
delete.unwrap(OraclePreparedStatement.class)
.executeUpdateAsyncOracle();
Flow.Publisher<Long> insertPublisher =
insert.unwrap(OraclePreparedStatement.class)
.executeUpdateAsyncOracle();
Flow.Publisher<OracleResultSet> selectPublisher =
select.unwrap(OraclePreparedStatement.class)
.executeQueryAsyncOracle();
...
}
26.4 リアクティブ・ストリームの収集のためのJavaライブラリによるパイプライン処理
Oracle Databaseリリース23ai以降では、リアクティブ・ストリームの収集のためのJavaライブラリを使用して、パイプライン化されたデータベース操作を実行できます。
次の例では、Mono
およびFlux
パブリッシャのインスタンスが、それらのfrom
メソッドをコールすることで作成されます。このメソッドの引数はorg.reactivestreams.Publisher
であり、これは、org.reactivestreams.FlowAdapters.toPublisher
をコールすることで、Flow.Publisher
に合わせて変化します。コールバック関数で結果を非同期的に消費するために、subscribeメソッドをコールします。複数行の結果を消費するため、Mono.flatMapMany
のコールにより、単一ResultSet
のストリームが複数行値のストリームに変換されます。
import oracle.jdbc.OracleConnection;
import oracle.jdbc.OraclePreparedStatement;
import oracle.jdbc.OracleResultSet;
import org.reactivestreams.FlowAdapters;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Flow;
public class ReactiveExample {
void reactivePipelineExample(OracleConnection connection) throws SQLException {
// Push a DELETE into the pipeline
Mono.using(
() -> connection.prepareStatement("DELETE FROM example WHERE id = 1"),
preparedStatement -> Mono.from(publishUpdate(preparedStatement)),
preparedStatement -> close(preparedStatement))
.subscribe(deleteCount ->
System.out.println(deleteCount + " rows deleted"));
// Push an INSERT operation into the pipeline
Mono.using(
() -> connection.prepareStatement(
"INSERT INTO example (id, value) VALUES (1, 'x')"),
preparedStatement -> Mono.from(publishUpdate(preparedStatement)),
preparedStatement -> close(preparedStatement))
.subscribe(insertCount ->
System.out.println(insertCount + " rows inserted"));
// Push a SELECT into the pipeline
Flux.using(
() -> connection.prepareStatement(
"SELECT id, value FROM example ORDER BY id"),
preparedStatement ->
Mono.from(publishQuery(preparedStatement))
.flatMapMany(resultSet -> publishRows(resultSet)),
preparedStatement -> close(preparedStatement))
.subscribe(rowString ->
System.out.println(rowString));
}
Publisher<Long> publishUpdate(PreparedStatement preparedStatement) {
try {
Flow.Publisher<Long> updatePublisher =
preparedStatement.unwrap(OraclePreparedStatement.class)
.executeUpdateAsyncOracle();
return FlowAdapters.toPublisher(updatePublisher);
}
catch (SQLException sqlException) {
return Mono.error(sqlException);
}
}
Publisher<OracleResultSet> publishQuery(PreparedStatement preparedStatement) {
try {
Flow.Publisher<OracleResultSet> queryPublisher =
preparedStatement.unwrap(OraclePreparedStatement.class)
.executeQueryAsyncOracle();
return FlowAdapters.toPublisher(queryPublisher);
}
catch (SQLException sqlException) {
return Mono.error(sqlException);
}
}
Publisher<String> publishRows(ResultSet resultSet) {
try {
Flow.Publisher<String> rowPublisher =
resultSet.unwrap(OracleResultSet.class)
.publisherOracle(row -> {
try {
return String.format("id: %d, value: %s\n",
row.getObject("id", Long.class),
row.getObject("value", String.class));
}
catch (SQLException sqlException) {
throw new CompletionException(sqlException);
}
});
return FlowAdapters.toPublisher(rowPublisher);
}
catch (SQLException sqlException) {
return Flux.error(sqlException);
}
}
void close(PreparedStatement preparedStatement) {
try {
preparedStatement.close();
}
catch (SQLException sqlException) {
throw new RuntimeException(sqlException);
}
}
}