11 UCP非同期拡張

Oracle Databaseリリース23ai以降、UCPでは非同期拡張が提供されます。非同期拡張は、非同期データベース・アクセスを提供するためにUCP標準を拡張する一連のメソッドです。

非同期拡張は、この拡張とともにUCPのすべての機能を使用できるように、ソース・コードの一部の変更とともにUCPに統合されます。この章では、UCPを非同期で使用するためにコードに対して加える必要がある変更について説明します。

ノート:

この機能は、JDK 11以降でサポートされています。

この章の構成は、次のとおりです。

11.1 UCP非同期拡張の概要

UCP非同期拡張では、接続オブジェクトの作成に非ブロッキング・メカニズムが使用されるため、アプリケーションはただちに流用する接続のCompletableFutureまたはPublisherを受け取ります。

これを実現するには、次を実行する必要があります。

  1. 接続ビルダーをインスタンス化します。
  2. CompletableFuture<Connection>またはPublisher<Connection>を使用して、UCPConnectionBuilderで非同期的に接続を作成します。

非同期拡張では、CompletableFuture<XAConnection>またはPublisher<XAConnection>を使用して、UCPXAConnectionBuilderでXA接続を流用することもできます。

非同期メソッドがコールされた場合、コール側スレッドで可能なかぎり多くの処理がネットワーク読取りまたは書込みへのブロッキングなしで実行されます。非同期メソッド・コールは、リクエストがネットワークに書き込まれた直後にレスポンスを待機せずに戻ります。ネットワーク・チャネルに対してI/Oの準備状況が検出されると、ポール側スレッドはイベントを処理するワーカー・スレッドを用意します。ワーカー・スレッドは、ネットワークから読み取った後、操作が完了したことをCompletableFutureまたはPublisherに通知します。通知されると、CompletableFutureまたはPublisherは、そのSubscribersごとにシグナルを送信するワーカー・スレッドを調整します。

java.util.concurrent.Executorインタフェースはワーカー・スレッドを管理しますが、デフォルトのExecutorjava.util.concurrent.ForkJoinPool.commonPoolメソッドです。アプリケーション・ソース・コードにexecutor()コードを実装しない場合、非同期流用操作はデフォルトのForkJoinPoolエグゼキュータで実行されます。任意のエグゼキュータを設定して非同期流用を実行するには、executor(executor)コールを使用できます。

11.2 例: UCP非同期拡張

この項では、UCP非同期拡張の使用方法を示すいくつかの例をいくつか示します。

例11-1 CompletableFuture<Connection>を使用した非同期接続の作成

...
final PoolDataSource pds = new PoolDataSourceImpl();
[//Initialize PoolDataSource object in the standard way]
 
final CompletionStage<Connection> connectionStage =
  pds.createConnectionBuilder()
    .user(<user name>)
    .password(<password>)
    .executor(executor)
    .buildAsyncOracle();
 
final CompletionStage<String> queryStage =
  connectionStage.thenApply(connection -> {
     [//Perform operations on the connection]
  });
...

例11-2 Publisher<Connection>を使用した非同期接続の作成

...
final PoolDataSource pds = new PoolDataSourceImpl();
[//Initialize PoolDataSource object in the standard way]
 
final Publisher<Connection> connectionPublisher =
  pds.createConnectionBuilder()
    .user(<user name>)
    .password(<password>)
    .executor(executor)
    .buildConnectionPublisherOracle();
 
[//Perform standard activities on the Publisher]
...

例11-3 UCPXAConnectionBuilderを使用したXA非同期接続の作成

...
final PoolXADataSource pds = new PoolXADataSourceImpl();
[//Initialize PoolXADataSource object in the standard way]
 
final CompletionStage<XAConnection> connectionStage =
  pds.createXAConnectionBuilder()
    .user(<user name>)
    .password(<password>)
    .executor(executor)
    .buildAsyncOracle();
 
final CompletionStage<String> queryStage =
  connectionStage.thenApply(xaConnection -> {
     [//Perform operations on the XAConnection]
  });
...

例11-4 Publisher<XAConnection>を使用したXA非同期接続の作成

...
final PoolXADataSource pds = new PoolXADataSourceImpl();
[//Initialize PoolXADataSource object in the standard way]
 
final Publisher<XAConnection> xaConnectionPublisher =
  pds.createXAConnectionBuilder()
    .user(<user name>)
    .password(<password>)
    .executor(executor)
    .buildConnectionPublisherOracle();
 
[//Perform standard activities on the Publisher]
...

11.3 非同期接続ラベリング

非同期モードでも、UCP接続ラベリング機能を利用できます。これを実現するには、新しいoracle.ucp.ConnectionLabelingCallback.configureAsync()メソッドのデフォルト・バージョンをオーバーライドする必要があります。

configureAsync: 構成メソッドの非同期バージョン

標準接続ラベリングの場合、アプリケーションでconfigureメソッドを使用します。非同期モードで接続ラベリングを使用するには、configureAsyncメソッドを使用する必要があります。configureAsyncメソッドの定義は次のとおりです。

default CompletionStage<Boolean> configureAsync(Properties requestedLabels, Object connection) {
  throw new NoSuchMethodError();

関連項目:

11.4 例: 非同期接続ラベリング

この項では、接続ラベリングでUCP非同期拡張を使用する方法の例を示します。

例11-5 接続ラベリングを使用した非同期接続の作成

package tests.ucp.async.labeling;

import oracle.ucp.ConnectionLabelingCallback;
import oracle.ucp.jdbc.PoolDataSource;
import oracle.ucp.jdbc.PoolDataSourceImpl;

import java.sql.Connection;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

public class AsyncLabelingExample {
  public static void main(String ... args) throws Exception {
    final PoolDataSource pds = new PoolDataSourceImpl();

    // Set the pool data source properties

    final ConnectionLabelingCallback labelingCallback = new ConnectionLabelingCallback() {
      @Override
      public int cost(Properties requestedLabels, Properties currentLabels) {
        // some cost manipulations, same as in synchronous case
        return 0; // or some other integer, depending on the cost computation logic
      }

      @Override
      public boolean configure(Properties requestedLabels, Object connection) {
        // some connection configuration manipulations for synchronous case,
        // not used in asynchronous case, so it can be skipped.
        return true;
      }

      @Override
      public CompletionStage<Boolean> configureAsync(Properties requestedLabels, Object connection) {
        final var cf = new CompletableFuture<Boolean>();

        // Perform some asynchronous connection configuration
        doSomeConfigAction((Connection)connection).whenComplete((p, e) -> {
          if (null == e) {
            cf.complete(p);
          } else {
            cf.completeExceptionally(e);
          }
        });

        return cf;
      }

      private CompletableFuture<Boolean> doSomeConfigAction(Connection conn) {
        final var cf = new CompletableFuture<Boolean>();

        // ...
        // configure the connection asynchronously
        // ... complete CompletableFuture with result or exception ...

        return cf;
      }
    };

    pds.registerConnectionLabelingCallback(labelingCallback);

    // some labeling code

  }
}