22 JDBCリアクティブ・エクステンション
リアクティブ・エクステンションは、非同期データベース・アクセスを提供するためにJDBC標準を拡張する一連のメソッドです。
リアクティブ・エクステンションは、接続オブジェクトの作成、SQL文の実行、行のフェッチ、トランザクションのコミット、接続オブジェクトのロールバック、およびBFILE、BLOB、CLOBの読取りと書込みに対して非ブロッキング・メカニズムを使用します。
この章の内容は次のとおりです。
22.1 JDBCリアクティブ・エクステンションの概要
リアクティブ・エクステンションは、java.util.concurrent.Flow
インタフェースによって定義された、パブリッシャおよびサブスクライバ・タイプを実装します。このインタフェースは、リアクティブ・ストリームの標準のJDK表現です。
リアクティブ・エクステンションでは、データベース操作をブロックしないために単一のJava NIOセレクタが使用されます。
JDBCリアクティブ・エクステンションの使用要件
JDBCリアクティブ・エクステンションを使用する場合は、次を実行する必要があります。
- 接続の構築にJDBC Thinドライバのみを使用します
- JDK 11および
ojdbc11.jar
を使用します - Oracle JDBCドライバ20c以降を使用します
22.2 リアクティブ・エクステンションによるアプリケーションの作成について
この項では、リアクティブ・エクステンションを使用してアプリケーションを作成するために従う必要があるステップについて説明します。
リアクティブ・エクステンションを使用してアプリケーションを作成するステップは、標準の方法でアプリケーションを作成する場合と同じです。ただし、リアクティブ・エクステンションの場合は、新しい非同期メソッドを使用します。この項では、次の各項で様々な非同期メソッドを使用する方法について説明します。
22.2.1 非同期メソッドを使用した接続のオープン
OracleConnectionBuilder
インタフェースでは、接続を非同期に開くためのメソッドが提供されています。
OracleConnectionBuilder.buildConnectionPublisherOracle
メソッドは、Flow.Publisher<OracleConnection>
タイプを返します。パブリッシャは、サブスクライバに対して単一の接続を発行します。サブスクライバが要求を発行すると、パブリッシャは非同期に新しい接続を開きます。公開された接続は、ConnectionBuilder.build
メソッドを使用して構築できる接続と同じです。
次の例では、接続を非同期に開く方法を示しています。
/**
* Asynchronously opens a new connection
* @param dataSource Datasource configured with a URL, User, and Password
* @return A Publisher that emits a single connection
* @throws SQLException If a database access error occurs before the
* connection can be opened
*/
Flow.Publisher<OracleConnection> openConnection(DataSource dataSource)
throws SQLException {
return dataSource.unwrap(OracleDataSource.class)
.createConnectionBuilder()
.buildConnectionPublisherOracle();
}
22.2.2 非同期メソッドによるSQL文の実行
この項では、非同期メソッドでSQL文を実行する方法について説明します。
OraclePreparedStatement
インタフェースでは、非同期SQL実行のためのメソッドが公開されています。それぞれの非同期メソッドが、対応するSQL実行の同期メソッドに類似した機能を実行します。この関係が、次の表に示されています。
表22-1 メソッドの比較
同期メソッド | 非同期メソッド |
---|---|
boolean execute |
Flow.Publisher<Boolean> executeAsyncOracle |
long executeLargeUpdate |
Flow.Publisher<Long> executeUpdateAsyncOracle |
long[] executeLargeBatch |
Flow.Publisher<Long> executeBatchAsyncOracle |
ResultSet executeQuery |
Flow.Publisher<OracleResultSet> executeQueryAsyncOracle |
次の項では、非同期メソッドの詳細を説明します。
22.2.2.1 executeAsyncOracleメソッドによる標準SQL文の実行
この項では、標準のexecute
メソッドに相当するexecuteAsyncOracle
メソッドについて説明します。
OraclePreparedStatement.executeAsyncOracle
メソッドをコールすると、すべてのタイプのSQL文を実行できます。このコールは、Flow.Publisher<Boolean>
タイプを返します。パブリッシャは、単一のBooleanを送信し、複数のサブスクライバをサポートします。Boolean値がTRUE
の場合、SQL文によって行データが生成され、OraclePreparedStatement.getResultSet
メソッドからアクセスできることを意味します。FALSE
の場合、SQL文が更新件数を返したことを意味します。Booleanの結果は、execute
メソッドによって返されるboolean
と意味的に同じです。
/**
* Asynchronously creates a new table by executing a DDL SQL statement
* @param connection Connection to a database where the table is created
* @return A Publisher that emits the result of executing DDL SQL
* @throws SQLException If a database access error occurs before the DDL
* SQL can be executed
*/
Flow.Publisher<Boolean> createTable(Connection connection)
throws SQLException {
PreparedStatement createTableStatement =
connection.prepareStatement(
"CREATE TABLE employee_names (" +
"id NUMBER PRIMARY KEY, " +
"first_name VARCHAR(50), " +
"last_name VARCHAR2(50))");
Flow.Publisher<Boolean> createTablePublisher =
createTableStatement.unwrap(OraclePreparedStatement.class)
.executeAsyncOracle();
createTablePublisher.subscribe(
// This subscriber will close the PreparedStatement
new Flow.Subscriber<Boolean>() {
public void onSubscribe(Flow.Subscription subscription) {
subscription.request(1L);
}
public void onNext(Boolean item) { }
public void onError(Throwable throwable) { closeStatement(); }
public void onComplete() { closeStatement(); }
void closeStatement() {
try { createTableStatement.close(); }
catch (SQLException closeException) { log(closeException); }
}
});
return createTablePublisher;
}
22.2.2.2 executeUpdateAsyncOracleメソッドによるDML文の実行
この項では、標準のexecuteLargeUpdate
メソッドに相当するexecuteUpdateAsyncOracle
メソッドについて説明します。
OraclePreparedStatement.executeUpdateAsyncOracle
メソッドを使用して、単一(バッチ以外)のDML文を実行できます。このコールは、Flow.Publisher<Long>
タイプを返します。返されたパブリッシャは、単一のLong
値を送信します。このLong
値は、DML文によって更新された行数または挿入される行数を示します。このLong
値の結果は、標準のexecuteLargeUpdate
メソッドから返されるlong
値と意味的に同じです。
/**
* Asynchronously updates table data by executing a DML SQL statement
* @param connection Connection to a database where the table data resides
* @return A Publisher that emits the number of rows updated
* @throws SQLException If a database access error occurs before the DML
* SQL can be executed
*/
Flow.Publisher<Long> updateData(Connection connection)
throws SQLException {
PreparedStatement updateStatement = connection.prepareStatement(
"UPDATE employee_names SET " +
"first_name = UPPER(first_name), " +
"last_name = UPPER(last_name)");
Flow.Publisher<Long> updatePublisher =
updateStatement.unwrap(OraclePreparedStatement.class)
.executeUpdateAsyncOracle();
updatePublisher.subscribe(
// This subscriber will close the PreparedStatement
new Flow.Subscriber<Long>() {
public void onSubscribe(Flow.Subscription subscription) {
subscription.request(1L);
}
public void onNext(Long item) { }
public void onError(Throwable throwable) { closeStatement(); }
public void onComplete() { closeStatement(); }
void closeStatement() {
try { updateStatement.close(); }
catch (SQLException closeException) { log(closeException); }
}
});
return updatePublisher;
}
22.2.2.3 executeBatchAsyncOracleメソッドによるバッチDML文の実行
この項では、標準のexecuteLargeBatch
メソッドに相当するexecuteBatchAsyncOracle
メソッドについて説明します。
OraclePreparedStatement.executeBatchAsyncOracle
メソッドを使用して、バッチDML文を実行できます。このコールは、Flow.Publisher<Long>
タイプを返します。返されたパブリッシャは、バッチ内の各文に対してLong
値を送信します。Long
値は、各DML文によって更新された行数を示します。これらのLong
値の結果は、標準のexecuteLargeBatch
メソッドから返されるlong[]
値と意味的に同じです。
/**
* Asynchronously loads table data by executing a batch of DML SQL statements.
* @param connection Connection to a database where the table data resides.
* @return A Publisher which emits the number of rows updated.
* @throws SQLException If a database access error occurs before the DML
* SQL can be executed.
*/
Flow.Publisher<Long> createData(
Connection connection, Iterable<Employee> employeeData)
throws SQLException {
PreparedStatement batchStatement = connection.prepareStatement(
"INSERT INTO employee_names (id, first_name, last_name) " +
"VALUES (?, ?, ?)");
for (Employee employee : employeeData) {
batchStatement.setLong(1, employee.id());
batchStatement.setString(2, employee.firstName());
batchStatement.setString(3, employee.lastName());
batchStatement.addBatch();
}
Flow.Publisher<Long> batchPublisher =
batchStatement.unwrap(OraclePreparedStatement.class)
.executeBatchAsyncOracle();
batchPublisher.subscribe(
// This subscriber will close the PreparedStatement
new Flow.Subscriber<Long>() {
public void onSubscribe(Flow.Subscription subscription) {
subscription.request(Long.MAX_VALUE);
}
public void onNext(Long item) { }
public void onError(Throwable throwable) { closeStatement(); }
public void onComplete() { closeStatement(); }
void closeStatement() {
try { batchStatement.close(); }
catch (SQLException closeException) { log(closeException); }
}
});
return batchPublisher;
}
22.2.2.4 executeQueryAsyncOracleメソッドによるSQL問合せの実行
この項では、標準のexecuteQuery
メソッドに相当するexecuteQueryAsyncOracle
メソッドについて説明します。
OraclePreparedStatement.executeQueryAsyncOracle
メソッドを使用して、SQL問合せ文を実行できます。このコールは、Flow.Publisher<OracleResultSet>
タイプを返します。返されたパブリッシャは、単一のOracleResultSet
値を発行します。OracleResultSet
値により、SQL問合せの結果の行データにアクセスできます。このOracleResultSet
は、標準のexecuteQuery
メソッドによって返されるResultSet
と意味的に同じです。
/**
* Asynchronously reads table data by executing a SELECT SQL statement
* @param connection Connection to a database where the table resides
* @return A Publisher that emits the number of rows updated
* @throws SQLException If a database access error occurs before the SELECT
* SQL can be executed
*/
Flow.Publisher<OracleResultSet> readData(Connection connection)
throws SQLException {
PreparedStatement queryStatement = connection.prepareStatement(
"SELECT id, first_name, last_name FROM employee_names");
Flow.Publisher<OracleResultSet> queryPublisher =
queryStatement.unwrap(OraclePreparedStatement.class)
.executeQueryAsyncOracle();
// Close the PreparedStatement after the result set is consumed.
queryStatement.closeOnCompletion();
return queryPublisher;
}
22.2.3 非同期メソッドによる行データのフェッチについて
この項では、非同期メソッドで行データをフェッチする方法について説明します。
OracleResultSet
インタフェースでは、非同期行データ・フェッチのためのpublisherOracle(Function<OracleRow, T>)
メソッドが公開されています。このメソッドの引数は、行データのマッピング関数です。マッピング関数は、ResultSet
の各行に適用されます。このメソッドは、Flow.Publisher<T>
タイプを返します。Tはマッピング関数の出力タイプです。マッピング関数の入力タイプはOracleRow
です。OracleRow
はResultSet
の単一行を表し、その行の列値にアクセスするためのメソッドを公開します。
次の例では、非同期メソッドで行データをフェッチする方法を示しています。
/**
* Asynchronously fetches table data by from a ResultSet.
* @param resultSet ResultSet which fetches table data.
* @return A Publisher which emits the fetched data as Employee objects.
* @throws SQLException If a database access error occurs before table data is
* fetched.
*/
Flow.Publisher<Employee> fetchData(ResultSet resultSet)
throws SQLException {
// Before closing the ResultSet with publisherOracle(..), first obtain a
// reference to the ResultSet's Statement. The Statement needs to be closed
// after all data has been fetched.
Statement resultSetStatement = resultSet.getStatement();
Flow.Publisher<Employee> employeePublisher =
resultSet.unwrap(OracleResultSet.class)
.publisherOracle(oracleRow -> {
try {
return new Employee(
oracleRow.getObject("id", Long.class),
oracleRow.getObject("first_name", String.class),
oracleRow.getObject("last_name", String.class));
}
catch (SQLException getObjectException) {
// Unchecked exceptions thrown by a row mapping function will be
// emitted to each Subscriber's onError method.
throw new RuntimeException(getObjectException);
}
});
employeePublisher.subscribe(
// This subscriber will close the ResultSet's Statement
new Flow.Subscriber<Employee>() {
public void onSubscribe(Flow.Subscription subscription) {
subscription.request(Long.MAX_VALUE);
}
public void onNext(Employee item) { }
public void onError(Throwable throwable) { closeStatement(); }
public void onComplete() { closeStatement(); }
void closeStatement() {
try { resultSetStatement.close(); }
catch (SQLException closeException) { log(closeException); }
}
});
return employeePublisher;
}
マッピング関数が入力として受け取るOracleRow
のインスタンスは、関数が戻った後に無効になります。OracleRow
のアクセスをマッピング関数の範囲に制限することで、行データの格納に使用されるメモリーをドライバで効率的に管理できます。OracleRow
の永続コピーが必要な場合は、OracleRow.clone
メソッドを使用して、元のOracleRow
データのコピーによってバッキングされるOracleRow
の新しいインスタンスを作成できます。clone
メソッドにより返されるOracleRow
は、マッピング関数の範囲外で有効なままであり、データベース接続が閉じた後もそのデータを保持します。
行マッピング関数は、null以外の値を返すか、未チェックの例外をスローする必要があります。マッピング関数が未チェックの例外をスローした場合、onError
シグナルとして行データのサブスクライバに配信されます。行データ・パブリッシャは、複数のサブスクライバをサポートしています。複数のサブスクライバへの行データの送信は、次に示す特定のポリシーに従います。
- サブスクライバは、
onSubscribe
シグナルを受信する前に送信された行データのonNext
シグナルを受信しません。 - 他のすべてのサブスクライバが要求を通知するまで、サブスクライバは
onNext
シグナルを受信しません。
次の表は、複数のサブスクライバで作業している場合のイベント・フローを示しています。
表22-2 複数のサブスクライバへの送信
時間 | イベント | 原因 |
---|---|---|
0 | SubscriberAがonSubscribe シグナルを受信します
|
行データ・パブリッシャのサブスクライブ(サブスクライバ)メソッドへのコールが、SubscriberAのサブスクリプションをリクエストしました |
1 | SubscriberAが1行をリクエストします | SubscriberAがサブスクリプション上で要求を通知しました |
2 | SubscriberAがResultSetsの1行目のデータを受信します | 行データ・パブリッシャは、SubscriberAによってリクエストされたデータの行をフェッチしました |
3 | SubscriberBがonSubscribe シグナルを受信します
|
行データ・パブリッシャのサブスクライブ(サブスクライバ)メソッドへのコールが、SubscriberBのサブスクリプションをリクエストしました |
4 | SubscriberAが1行をリクエストします | SubscriberAがサブスクリプション上で要求を通知しました |
5 | SubscriberBが1行をリクエストします | SubscriberBがサブスクリプション上で要求を通知しました |
6 | SubscriberAとSubscriberBの両方が、ResultSetの2行目のデータを受信します | 行データ・パブリッシャは、両方のサブスクライバによってリクエストされたデータの行をフェッチしました |
7 | SubscriberAが1行をリクエストします | SubscriberAがサブスクリプション上で要求を通知しました |
8 | 行データは送信されません。 | 行データ・パブリッシャは、すべて のサブスクライバがリクエストするまで、次の行を送信しません。
|
9 | SubscriberBが1行をリクエストします | SubscriberBがサブスクリプション上で要求を通知しました |
10 | SubscriberAとSubscriberBの両方が、ResultSetの3行目のデータを受信します | 行データ・パブリッシャは、両方のサブスクライバによってリクエストされたデータの行をフェッチしました |
ノート:
SubscriberBは、最初の行のデータを受信しませんでした。これは、最初の行が送信された後にSubscriberBがサブスクライブしたためです。また、データが8秒で送信されませんでした。これは、すべてのサブスクライバが、送信される前に次の行をリクエストする必要があるためです。8秒でSubscriberAが次の行をリクエストしましたが、SubscriberBはそれまでリクエストを発行しませんでした。22.2.4 非同期メソッドを使用したLOBデータの読取り
OracleBlob
、OracleBFile
、OracleClob
およびOracleNClob
インタフェースでは、LOBデータの非同期読取りのためのpublisherOracle(long)
メソッドが公開されています。
publisherOracle(long)
メソッドの引数は、データが読み取られるLOBの位置です。OracleBlob.publisherOracle(long)
およびOracleBFile.publisherOracle(long)
メソッドは、Publisher< byte[]>タイプを返します。このパブリッシャは、LOBから読み取られたバイナリ・データのセグメントを送信します。OracleClob.publisherOracle(long)
およびOracleNClob.publisherOracle(long)
メソッドは、Publisher<String>タイプを返します。このパブリッシャは、LOBから読み取られた文字データのセグメントを送信します。
次の例では、LOBからバイナリ・データを非同期に読み取る方法を示しています。
/**
* Asynchronously reads binary data from a BLOB
* @param connection Connection to a database where the BLOB resides
* @param employeeId ID associated to the BLOB
* @return A Publisher that emits binary data of a BLOB
* @throws SQLException If a database access error occurs before the
* BLOB can be read
*/
Flow.Publisher<byte[]> readLOB(Connection connection, long employeeId)
throws SQLException {
PreparedStatement lobQueryStatement = connection.prepareStatement(
"SELECT photo_bytes FROM employee_photos WHERE id = ?");
lobQueryStatement.setLong(1, employeeId);
ResultSet resultSet = lobQueryStatement.executeQuery();
if (!resultSet.next())
throw new SQLException("No photo found for employee ID " + employeeId);
OracleBlob photoBlob =
(OracleBlob)resultSet.unwrap(OracleResultSet.class).getBlob(1);
Flow.Publisher<byte[]> photoPublisher = photoBlob.publisherOracle(1);
photoPublisher.subscribe(
// This subscriber will close the PreparedStatement and BLOB
new Flow.Subscriber<byte[]>() {
public void onSubscribe(Flow.Subscription subscription) {
subscription.request(Long.MAX_VALUE);
}
public void onNext(byte[] item) { }
public void onError(Throwable throwable) { freeResources(); }
public void onComplete() { freeResources(); }
void freeResources() {
try { lobQueryStatement.close(); }
catch (SQLException closeException) { log(closeException); }
try { photoBlob.free(); }
catch (SQLException freeException) { log(freeException); }
}
});
return photoPublisher;
}
LOBパブリッシャによって送信されるデータ・セグメントのサイズは構成できません。ドライバは、最適化されるセグメント・サイズを、データベースのDB_BLOCK_SIZE
パラメータに従って選択します。
22.2.5 非同期メソッドを使用したLOBデータの書込み
OracleBlob
、OracleClob
およびOracleNClob
インタフェースでは、LOBデータの非同期書込みのためのsubscriberOracle(long)
メソッドが公開されています。
subscriberOracle(long)
メソッドの引数は、データが書き込まれるLOBの位置です。OracleBlob.subscriberOracle(long)
メソッドは、Subscriber<byte[]>
タイプを返します。このサブスクライバは、LOBに書き込まれるバイナリ・データのセグメントを受信します。OracleClob.subscriberOracle(long)
メソッドおよびOracleNClob.subscriberOracle(long)
メソッドは、Subscriber<String>
タイプを返します。これらのサブスクライバは、LOBに書き込まれる文字データのセグメントを受信します。
次の例では、LOBにバイナリ・データを非同期に書き込む方法を示しています。
/**
* Asynchronously writes binary data to a BLOB
* @param connection Connection to a database where the BLOB resides
* @param bytesPublisher Publisher that emits binary data
* @return A CompletionStage that completes with a reference to the BLOB,
* after all binary data is written.
* @throws SQLException If a database access error occurs before the table data is
* fetched
*/
CompletionStage<Blob> writeLOB(
Connection connection, Flow.Publisher<byte[]> bytesPublisher)
throws SQLException {
OracleBlob oracleBlob =
(OracleBlob) connection.unwrap(OracleConnection.class).createBlob();
// This future is completed after all bytes have been written to the BLOB
CompletableFuture<Blob> writeFuture = new CompletableFuture<>();
Flow.Subscriber<byte[]> blobSubscriber =
oracleBlob.subscriberOracle(1L,
// This Subscriber will receive a terminal signal when all byte[]'s
// have been written to the BLOB.
new Flow.Subscriber<Long>() {
long totalWriteLength = 0;
@Override
public void onSubscribe(Flow.Subscription subscription) {
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(Long writeLength) {
totalWriteLength += writeLength;
log(totalWriteLength + " bytes written.");
}
@Override
public void onError(Throwable throwable) {
writeFuture.completeExceptionally(throwable);
}
@Override
public void onComplete() {
writeFuture.complete(oracleBlob);
}
});
bytesPublisher.subscribe(blobSubscriber);
return writeFuture;
}
OracleBlob
、OracleClob
およびOracleNClob
インタフェースでは、単一引数形式のsubscriberOracle(long)
メソッドと同じ機能を実行するsubscriberOracle(long, Subscriber<Long>)
メソッドも公開されています。ただし、単一引数形式では、Subscriber<Long>
タイプも受け入れます。Subscriber<Long>
タイプは、データベースに対する書込み操作の結果をサブスクライバに受信するように通知します。非同期の書込み操作が完了するたびに、Subscriber<Long>
タイプは、操作によって書き込まれたバイト数または文字数を示すonNext
シグナルを受信します。非同期の書込み操作が失敗した場合、Subscriber<Long>
タイプはonError
シグナルを受信します。最後の書込み操作が完了した後、Subscriber<Long>
はonComplete
シグナルを受信します。
writeLOB
メソッドによって返されるCompletionStage<Blob>
が完了した後、結果のBLOBオブジェクトをinsertLOB
メソッドに渡し、BLOBデータを表に格納できます。
次の例では、データを挿入する方法を示しています。
/**
* Asynchronously inserts BLOB data into a table by executing a DML SQL
* statement
* @param connection Connection to a database where the table data resides
* @param employeeId ID related to the BLOB data
* @param photoBlob Reference to BLOB data
* @return A Publisher that emits the number of rows inserted (always 1)
* @throws SQLException If a database access error occurs before the DML
* SQL can be executed
*/
Flow.Publisher<Long> insertLOB(
Connection connection, long employeeId, Blob photoBlob)
throws SQLException {
PreparedStatement lobInsertStatement = connection.prepareStatement(
"INSERT INTO employee_photos(id, photo_bytes) VALUES (? ,?)");
lobInsertStatement.setLong(1, employeeId);
lobInsertStatement.setBlob(2, photoBlob);
Flow.Publisher<Long> insertPublisher =
lobInsertStatement.unwrap(OraclePreparedStatement.class)
.executeUpdateAsyncOracle();
insertPublisher.subscribe(new Flow.Subscriber<Long>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {
subscription.request(1L);
}
@Override
public void onNext(Long item) { }
@Override
public void onError(Throwable throwable) { releaseResources(); }
@Override
public void onComplete() { releaseResources(); }
void releaseResources() {
try { lobInsertStatement.close(); }
catch (SQLException closeException) { log(closeException); }
try { photoBlob.free(); }
catch (SQLException freeException) { log(freeException); }
}
});
return insertPublisher;
}
22.2.6 非同期メソッドを使用したトランザクションのコミット
OracleConnection
インタフェースでは、非同期トランザクションの完了のためのcommitAsyncOracle
およびrollbackAsyncOracle
メソッドが公開されています。
commitAsyncOracle
メソッドおよびrollbackAsyncOracle
メソッドはどちらもFlow.Publisher<Void>
タイプを返します。パブリッシャーは、<Void>
タイプで示されているように、いかなるアイテムも送信しません。パブリッシャーは、コミットまたはロールバック操作が正常に完了したかどうかを示すために、単一のonComplete
またはonError
シグナルを送信します。
次の例では、トランザクションを非同期にコミットする方法を示しています。
/**
* Asynchronously commits a transaction
* @param connection Connection to a database with an active transaction
* @return A Publisher that emits a terminal signal when the transaction
* has been committed
* @throws SQLException If a database access error occurs before the
* transaction can be committed
*/
public Flow.Publisher<Void> commitTransaction(Connection connection)
throws SQLException {
return connection.unwrap(OracleConnection.class)
.commitAsyncOracle();
}
commitAsyncOracle
およびrollbackAsyncOracle
メソッドは、Connection.commit
およびConnection.rollback
メソッドと同じ機能を実行します。
22.2.7 非同期メソッドを使用した接続のクローズ
OracleConnection
インタフェースでは、非同期接続を閉じるためのcloseAsyncOracle
メソッドが公開されています。
closeAsyncOracle
メソッドは、Flow.Publisher<Void>
タイプを返します。パブリッシャーは、<Void>
タイプで示されているように、いかなるアイテムも送信しません。パブリッシャは、接続が正常に閉じられたかどうかを示すために単一のonComplete
またはonError
シグナルを送信します。
次の例では、接続を非同期に閉じる方法を示しています。
/**
* Asynchronously closes a connection
* @param connection Connection to be closed
* @return A Publisher that emits a terminal signal when the connection
* has been closed
* @throws SQLException If a database access error occurs before the
* connection can be closed
*/
Flow.Publisher<Void> closeConnection(Connection connection)
throws SQLException {
return connection.unwrap(OracleConnection.class)
.closeAsyncOracle();
}
closeAsyncOracle
メソッドは、Connection.close
メソッドと同じ機能を実行します。
22.3 非同期メソッドのスレッド・モデル
この項では、非同期メソッドのスレッド・モデルについて説明します。
非同期メソッドがコールされた場合、コール側スレッドで可能なかぎり多くの処理がネットワーク読取りへのブロッキングなしで実行されます。非同期メソッド・コールは、リクエストがネットワークに書き込まれた直後にレスポンスを待機せずに戻ります。オペレーティング・システムの書込みバッファの大きさが十分でないために完全なリクエストを格納できない場合、コール側スレッドはこのバッファがフラッシュされるまでブロックされる可能性があります。
書込みジョブが完了すると、ネットワーク・チャネルはI/Oの準備状況ポーリングのために登録されます。1つのスレッドが、同じJVM内のすべてのOracle JDBC接続のネットワーク・チャネルをポーリングします。I/Oポーリング・スレッドはoracle.net.nt.TcpMultiplexer
という名前で、デーモン・スレッドとして構成されます。このポーリング・スレッドは、セレクタを使用してブロッキング操作を実行します。
ネットワーク・チャネルに対してI/Oの準備状況が検出されると、ポール側スレッドはイベントを処理するワーカー・スレッドを用意します。ワーカー・スレッドは、ネットワークから読み取った後、操作が完了したことをパブリッシャに通知します。通知されると、パブリッシャは、そのサブスクライバごとにシグナルを送信するワーカー・スレッドを調整します。
java.util.concurrent.Executor
インタフェースは、ワーカー・スレッドを管理します。デフォルトのエグゼキュータはjava.util.concurrent.ForkJoinPool.commonPool
メソッドです。OracleConnectionBuilder.executorOracle(Executor)
メソッドをコールして、代替エグゼキュータを指定できます。
22.4 Flow APIについて
java.util.concurrent.Flow
タイプでは、リアクティブ・ストリームの作成に使用できる最低限の操作セットが定義されています。
Flow APIに対して、アプリケーション・コードを直接記述できます。ただし、次のリンクで指定されたリアクティブ・ストリーム仕様に従って低レベルの信号処理を実装する必要があります
https://github.com/reactive-streams/reactive-streams-jvm/blob/master/README.md
JDBCリアクティブ・エクステンションAPIでは、java.util.concurrent.Flow.Publisher
およびjava.util.concurrent.Flow.Subscriber
を使用して、アプリケーションに対するFlow APIの低レベルのメカニズムを処理します。次のリンクで指定されたreactive-streams-jvm
プロジェクトによって定義されたorg.reactivestreams.Publisher
およびorg.reactivestreams.Subscriber
タイプを含むReactor、RxJava、Akka-Streamsなどの人気のライブラリ。
org.reactivestreams.Publisher
タイプとorg.reactivestreams.Subscriber
タイプ、およびそのjava.util.concurrent.Flow
の対応部分は同じインタフェースを宣言しますが、Javaコンパイラはそれらをまだ個別のタイプとみなす必要があります。次のリンクで指定されているorg.reactivestreams.FlowAdapters
クラスを使用して、Flowタイプとorg.reactivestreams
タイプとの間を変換できます。
Oracleでは、JDBCドライバによって公開されているFlowタイプとインタフェースをとるときに、リアクティブ・ストリーム・ライブラリを使用することをお薦めしています。
22.5 FlowAdaptersクラスの使用方法
この項で説明するライブラリは、Flow APIについての項で説明するorg.reactivestreams
タイプとインタフェースをとります。
FlowAdapters
クラスがFlow.Publisher
タイプをorg.reactivestreams
タイプに変換する方法を示します。
例22-1 org.reactivestreamsタイプへの変換
public static org.reactivestreams.Publisher<ResultSet> publishQuery(
PreparedStatement queryStatement) {
try {
Flow.Publisher<OracleResultSet> queryPublisher =
queryStatement.unwrap(OraclePreparedStatement.class)
.executeQueryAsyncOracle();
return FlowAdapters.toPublisher(queryPublisher);
}
catch (SQLException sqlException) {
return createErrorPublisher(sqlException);
}
}
public static <T> org.reactivestreams.Publisher<T> publishRows(
ResultSet resultSet, Function<OracleRow, T> rowMappingFunction) {
try {
Flow.Publisher<T> rowPublisher =
resultSet.unwrap(OracleResultSet.class)
.publisherOracle(rowMappingFunction);
return FlowAdapters.toPublisher(rowPublisher);
}
catch (SQLException sqlException) {
return createErrorPublisher(sqlException);
}
}
22.6 リアクタ・ライブラリによる行データのストリーミング
リアクタ・ライブラリは、多数のアイテムのストリームを表すFluxタイプと、単一のアイテムのみのストリームを表すMonoタイプを定義します。
private Publisher<Employee> queryAllEmployees(Connection connection) {
return Flux.using(
// Prepare a SQL statement.
() -> connection.prepareStatement("SELECT * FROM emp"),
// Execute the PreparedStatement.
preparedStatement ->
// Create a Mono which emits one ResultSet.
Mono.from(publishQuery(preparedStatement))
// Flat map the ResultSet to a Flux which emits many Rows. Each row
// is mapped to an Employee object.
.flatMapMany(resultSet ->
publishRows(resultSet, row -> mapRowToEmployee(row))),
// Close the PreparedStatement after emitting the last Employee object
prepareStatement -> {
try {
prepareStatement.close();
}
catch (SQLException sqlException) {
throw new RuntimeException(sqlException);
}
});
}
using
ファクトリ・メソッドは、明示的に解放する必要があるリソースに依存するFluxを作成します。この場合、リソースは、close
メソッドをコールして解放されるPreparedStatement
インスタンスです。
最初のラムダ引数は、PreparedStatement
インスタンスを作成します。このラムダは、Fluxがアイテムの送信を開始する前に実行されます。
2番目のラムダ引数は、PreparedStatement
インスタンスを使用して、Employeeオブジェクトにマップされる行データのストリームを作成します。最初に、PreparedStatement
インスタンスはpublishQuery
メソッドへのコールにより実行されます。問合せ実行パブリッシャは単一のResultSet
を送信するため、Monoに適合します。問合せの実行が完了すると、Monoは、flatMapMany
メソッドで指定されたラムダにResultSet
を送信します。このラムダは、OracleRow
オブジェクトをEmployeeオブジェクトにマップする関数を使用してpublishRows
メソッドをコールします。この結果、flatMapMany
メソッド・コールでEmployeeオブジェクトのFlux
が返され、各EmployeeがResultSet
オブジェクトの行からマップされます。
3番目のラムダ引数は、PreparedStatement
インスタンスを閉じます。このラムダは、Fluxが最後のアイテムを送信した後に実行されます。
22.7 RxJavaライブラリによる行データのストリーミング
RxJavaライブラリは、多数のアイテムのストリームを表すFlowableタイプと、単一のアイテムのみのストリームを表すSingleタイプを定義します。
private Publisher<Employee> queryAllEmployees(Connection connection) {
return Flowable.using(
// Prepare a SQL statement
() -> connection.prepareStatement("SELECT * FROM emp"),
// Execute the PreparedStatement
queryStatement ->
// Create a Single which emits one ResultSet
Single.fromPublisher(publishQuery(queryStatement))
// Flat map the ResultSet to a Flowable which emits many rows, where
// each row is mapped to an Employee object
.flatMapPublisher(resultSet ->
publishRows(resultSet, oracleRow -> mapRowToEmployee(oracleRow))),
// Close the PreparedStatement after emitting the last Employee object
PreparedStatement::close
);
}
using
ファクトリ・メソッドは、明示的に解放する必要があるリソースに依存するFlowableを作成します。この場合、リソースは、close
メソッドをコールして解放されるPreparedStatement
インスタンスです。
最初のラムダ引数は、PreparedStatement
インスタンスを作成します。このラムダは、Flowableがアイテムの送信を開始する前に実行されます。
2番目のラムダ引数は、PreparedStatement
インスタンスを使用して、Employeeオブジェクトにマップされる行データのストリームを作成します。最初に、PreparedStatement
インスタンスはpublishQuery
メソッド・コールにより実行されます。問合せ実行パブリッシャは単一のResultSet
オブジェクトを送信するため、Singleに適合します。問合せの実行が完了すると、Singleは、flatMapPublisher
メソッドで指定されたラムダにResultSet
オブジェクトを送信します。このラムダは、OracleRow
オブジェクトをEmployeeオブジェクトにマップする関数を使用してpublishRows
メソッドをコールします。この結果、flatMapPublisher
メソッド・コールでEmployeeオブジェクトのFlowableが返され、各EmployeeがResultSet
の行からマップされます。
3番目のメソッド・ハンドル引数は、PreparedStatement
インスタンスを閉じます。このラムダは、Flowableが最後のアイテムを送信した後に実行されます。
22.8 Akka Streamsライブラリによる行データのストリーミング
Akka Streamsライブラリは、アイテムのストリームを表すSourceタイプを定義します。
private Source<Employee, NotUsed> queryAllEmployees(Connection connection) {
final PreparedStatement queryStatement;
try {
queryStatement = connection.prepareStatement("SELECT * FROM emp");
}
catch (SQLException prepareStatementFailure) {
return Source.failed(prepareStatementFailure);
}
// Create a Source which emits one ResultSet
return Source.fromPublisher(publishQuery(queryStatement))
// Flat map the ResultSet to a Source which emits many Rows, where each
// Row is mapped to an Employee object
.flatMapConcat(resultSet -> {
Publisher<Employee> employeePublisher =
publishRows(resultSet, oracleRow -> mapRowToEmployee(oracleRow));
return Source.fromPublisher(employeePublisher);
})
// This Sink closes the PreparedStatement when the Source terminates
.alsoTo(Sink.onComplete(result -> queryStatement.close()));
}
PreparedStatement
インスタンスが、Employeeオブジェクトにマップされる行データのストリームを作成するために使用されます。最初に、PreparedStatement
インスタンスはpublishQuery
メソッド・コールにより実行されます。問合せ実行パブリッシャは、単一のResultSet
オブジェクトを送信します。このパブリッシャは、Sourceに適合します。問合せが完了すると、Sourceは、flatMapConcat
メソッドで指定されたラムダにResultSet
オブジェクトを送信します。
このラムダは、OracleRow
オブジェクトをEmployeeオブジェクトにマップする関数を使用してpublishRows
メソッドをコールします。この結果、flatMapConcat
メソッド・コールでEmployeeオブジェクトのSourceが返され、各EmployeeがResultSet
オブジェクトの行からマップされます。
PreparedStatement
インスタンスは、明示的に閉じる必要があるリソースです。これは、alsoTo
メソッド・コールによって処理されます。このコールでは、SourceがonComplete
またはonError
シグナルを送信したときに、PreparedStatement
インスタンスを閉じるSinkが指定されます。
22.9 JDBCリアクティブ・エクステンションの制限
この項では、JDBCリアクティブ・エクステンションの制限事項について説明します。
JDBCリアクティブ・エクステンションには、次の制限事項があります。
- 接続が、同時に複数の非同期操作を受け入れません。非同期メソッド・コールが、現在進行中の別の操作がある場合にコール側スレッドをブロックします。
- 非同期メソッドにアクセスするには、タイプをキャストしてアクセスするのではなく、
java.sql.Wrapper.unwrap
メソッドを使用する必要があります。これにより、接続プールで非同期メソッドを使用する場合など、プロキシOracle JDBCクラスで使用する際に非同期メソッドが正しく動作します。 - OracleNetレイヤーのセッションの確立は、I/Oバウンド操作のブロックです。
- ネットワークから大きいレスポンスを読み取るには、I/Oバウンド操作のブロックが必要になる場合があります。読取り操作のブロックは、TCP受信バッファ・サイズより大きいレスポンスをドライバが読み取った場合に発生することがあります。
- 非同期SQL実行では、スクロール可能なResultSetタイプも機密のResultSetタイプもサポートされていません。