構造化並行性

構造化並行性は、異なるスレッドで実行されている関連タスクのグループを1つの作業単位として処理することにより、エラーの処理と取消しを合理化し、信頼性と可観測性を向上させます。

構造化並行性APIのプリンシパル・クラスは、java.util.concurrentパッケージ内のStructuredTaskScopeです。このクラスを使用すると、同時サブタスクのグループを1単位として調整できます。StructuredTaskScopeインスタンスでは、各サブタスクをフォークし、個別のスレッドで実行します。その後、それらを1単位として結合します。その結果、StructuredTaskScopeにより、メイン・タスクの続行前にサブタスクが確実に完了するようになります。または、1つのサブタスクが成功したときにアプリケーションを続行するように指定できます。

ノート:

これはプレビュー機能です。プレビュー機能は、設計、仕様および実装が完了したが、永続的でない機能です。プレビュー機能は、将来のJava SEリリースで、異なる形式で存在することもあれば、まったく存在しないこともあります。プレビュー機能が含まれているコードをコンパイルして実行するには、追加のコマンド行オプションを指定する必要があります。『Preview Language and VM Features』を参照してください。

構造化並行性の背景情報は、JEP 453を参照してください。

StructuredTaskScopeクラスの基本的な使用方法

StructuredTaskScopeクラスを使用するには、次の一般的なステップに従います。

  1. StructuredTaskScopeを作成し、try-with-resources文とともに使用します。
  2. サブタスクをCallableのインスタンスとして定義します。
  3. tryブロック内で、StructuredTaskScope::forkを使用して独自のスレッドの各サブタスクをフォークします。
  4. StructuredTaskScope::joinを呼び出します。
  5. サブタスクからの結果を処理します。
  6. StructuredTaskScopeが停止していることを確認します。

次の図に、これらのステップを示します。join()メソッドが理由で、タスク・スコープはすべてのサブタスクの実行が終了するまで待機する必要があることに注意してください。その後、サブタスクの結果を処理できます。

図14-2 StructuredTaskScopeクラスの使用


この図については前の文で説明しています。

通常、StructuredTaskScopeクラスを使用するコードには次の構造があります。

    Callable<String> task1 = ...
    Callable<Integer> task2 = ...

    try (var scope = new StructuredTaskScope<Object>()) {

        Subtask<String> subtask1 = scope.fork(task1);
        Subtask<Integer> subtask2 = scope.fork(task2);

        scope.join();

        ... process results/exceptions ...

    } // close

StructuredTaskScopetry-with-resources文で定義されているため、tryブロックの最後にStructuredTaskScopeが停止し、タスク・スコープは未完了のサブタスクを実行するスレッドが完了するのを待機します。

StructuredTaskScopeクラスは、タスク・スコープを閉じることなく停止するshutdownメソッドを定義します。このメソッドは、スレッドを中断することによって、未完了のサブタスクをすべて取り消します。また、shutdownメソッドを使用すると、StructuredTaskScopeのサブクラスで、すべてのサブタスクの完了を必要としないポリシーを実装できます。「一般的なシャットダウン・ポリシー: ShutdownOnSuccessおよびShutdownOnFailure」の項では、StructuredTaskScopeの2つのサブクラスであるShutdownOnSuccessおよびShutdownOnFailureについて説明します。1つ目は、サブタスクが正常に完了するとすぐにタスク・スコープを停止するポリシーを実装し、2つ目はサブタスクが例外をスローするとすぐにタスク・スコープを停止します。

一般的なシャットダウン・ポリシー: ShutdownOnSuccessおよびShutdownOnFailure

StructuredTaskScopeクラスには、ShutdownOnFailureおよびShutdownOnSuccessという2つのサブクラスが含まれています。これらのサブクラスは、2つの一般的なシャットダウン・ポリシーを実装します。ShutdownOnFailureは、サブタスクのいずれかが失敗した場合にすべてのサブタスクを取り消し、ShutdownOnSuccessは、サブタスクのいずれかが成功した場合に残りのサブタスクをすべて取り消します。これらのシャットダウン・ポリシーは、短絡パターンの例です。短絡パターンを使用すると、結果が不要になったサブタスクをメイン・タスクで中断および取消しできるようにすることで、サブタスクの迅速な完了が促進されます。

次の例は、StructuredTaskScope.ShutdownOnFailureおよびStructuredTaskScope.ShutdownOnSuccessクラスを示しています。各タスク・スコープは、ランダムな時間にわたってスリープする5つのサブタスクをフォークします。ただし、期間が指定のしきい値より大きい場合、サブタスクはTooSlowExceptionをスローします。handleShutDownOnFailure()メソッドは、いずれのサブタスクも例外をスローしなかった場合に、すべてのサブタスクの合計継続時間を出力します。handleShutDownOnSuccess()メソッドは、最初に完了したサブタスクの継続時間を出力します。

図14-3 SCRandomTasks.java

import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.StructuredTaskScope.*;
import java.util.concurrent.StructuredTaskScope.Subtask;
import java.util.function.*;
import java.util.stream.*;

public class SCRandomTasks {
    
    class TooSlowException extends Exception {
        public TooSlowException(String s) {
            super(s);
        }
    }
    
    public Integer randomTask(int maxDuration, int threshold) throws InterruptedException, TooSlowException {
        int t = new Random().nextInt(maxDuration);
        System.out.println("Duration: " + t);
        if (t > threshold) {
            throw new TooSlowException("Duration " + t + " greater than threshold " + threshold);
        }
        Thread.sleep(t);
        return Integer.valueOf(t);        
    }        
    
    void handleShutdownOnFailure() throws ExecutionException, InterruptedException {
        try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
            // var t = new SCRandomTasks();
            var subtasks = IntStream.range(0, 5)
                                    .mapToObj(i -> scope.fork(() -> randomTask(1000, 850)))
                                    .toList();
            scope.join()
                 .throwIfFailed();
            var totalDuration = subtasks.stream()
                                        .map(t -> t.get())
                                        .reduce(0, Integer::sum);
            System.out.println("Total duration: " + totalDuration);
        }
    }
    
    void handleShutdownOnSuccess() throws ExecutionException, InterruptedException {
        try (var scope = new StructuredTaskScope.ShutdownOnSuccess()) {
            IntStream.range(0, 5)
                     .mapToObj(i -> scope.fork(() -> randomTask(1000, 850)))
                     .toList();
            scope.join();
            System.out.println("First task to finish: " + scope.result());
        }
    }    
    
    public static void main(String[] args) {
        var myApp = new SCRandomTasks();
        try {
            System.out.println("Running handleShutdownOnFailure...");
            myApp.handleShutdownOnFailure();
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
        try {
            System.out.println("Running handleShutdownOnSuccess...");
            myApp.handleShutdownOnSuccess();
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }      
    }
}

次のような出力が表示されます:

Running handleShutdownOnFailure...
Duration: 359
Duration: 676
Duration: 322
Duration: 591
Duration: 315
Total duration: 2263
Running handleShutdownOnSuccess...
Duration: 480
Duration: 40
Duration: 868
Duration: 526
Duration: 532
First task to finish: 40

StructuredTaskScope.ShutdownOnFailureクラスは、サブタスクの1つによってスローされた最初の例外を取得し、shutdownメソッドを呼び出します。これにより、新しいサブタスクが開始されなくなり、他のサブタスクを実行している未完了のスレッドがすべて中断され、アプリケーションの実行が続行されます。取得された例外にアクセスするには、ShutdownOnFailure::exceptionメソッドを呼び出します。かわりに例外を再スローする場合は、次の例に示すShutdownOnFailure::throwIfFailedメソッドを呼び出します。

            scope.join()
                 .throwIfFailed();

StructuredTaskScope.ShutdownOnSuccessクラスは、正常に完了する最初のサブタスクの結果を取得し、ShutdownOnFailureと同様にshutdownメソッドを呼び出します。正常に完了したサブタスクの結果にアクセスするには、次の例に示すShutdownOnSuccess::resultメソッドを呼び出します。

            System.out.println("First task to finish: " + scope.result());

独自のStructuredTaskScopeポリシーの実装

ShutdownOnFailureおよびShutdownOnSuccessとは異なる方法でサブタスクを処理する独自のStructuredTaskScopeポリシーを実装できます。これを行うには、StructuredTaskScopeクラスを拡張します。

次の例のCollectingScopeには、同じタイプのサブタスクの2つのストリームを返す2つのメソッドが含まれています。successfulTasks()は成功したサブタスクのストリームを返し、failedTasks()は例外をスローしたサブタスクのストリームを返します。

図14-4 CollectingScope.java

import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;

public class CollectingScope<T> extends StructuredTaskScope<T> {
    private final Queue<Subtask<? extends T>> successSubtasks = new LinkedTransferQueue<>();
    private final Queue<Subtask<? extends T>> failedSubtasks = new LinkedTransferQueue<>();

    @Override
    protected void handleComplete(Subtask<? extends T> subtask) {
        if (subtask.state() == Subtask.State.SUCCESS) {
            successSubtasks.add(subtask);
        } else if (subtask.state() == Subtask.State.FAILED) {
            failedSubtasks.add(subtask);
        }
    }

    @Override
    public CollectingScope<T> join() throws InterruptedException {
        super.join();
        return this;
    }

    public Stream<Subtask<? extends T>> successfulTasks() {
        super.ensureOwnerAndJoined();
        return successSubtasks.stream();
    }
    
    public Stream<Subtask<? extends T>> failedTasks() {
        super.ensureOwnerAndJoined();
        return failedSubtasks.stream();
    }  
}

「一般的なシャットダウン・ポリシー: ShutdownOnSuccessおよびShutdownOnFailure」の説明に従ってこのクラスをSCRanndomTasksの例で使用するには、次のメソッドを追加します。正常に完了したサブタスクの合計継続時間と、例外をスローしたサブタスクの例外が出力されます。

    void handleBoth() throws InterruptedException {
        try (var scope = new CollectingScope())  {
            // var t = new SCRandomTasks();
            var subtasks = IntStream.range(0, 5)
                                    .mapToObj(i -> scope.fork(() -> randomTask(1000, 500)))
                                    .toList();
            scope.join();

            var totalDuration = scope.successfulTasks()
                                     .mapToInt(st -> (Integer)((Subtask)st).get())
                                     .reduce(0, Integer::sum);
            System.out.println("Total duration: " + totalDuration);
            
            scope.failedTasks()
                 .forEach(ft ->
                    System.out.println(((Exception)((Subtask)ft).exception()).getMessage()));
        }        
    }

次のような出力が表示されます:

Duration: 501
Duration: 211
Duration: 661
Duration: 903
Duration: 839
Total duration: 211
Duration 501 greater than threshold 500
Duration 661 greater than threshold 500
Duration 903 greater than threshold 500
Duration 839 greater than threshold 500

CollectingScopeの例では、successfulTasks()およびfailedTasks()がそれぞれsuccessSubtasksおよびfailedSubtasksのストリームを返す前に、StructuredTaskScope::ensureOwnerAndJoinedを呼び出します。これにより、この例は、現在のスレッドがタスク・スコープの所有者であり、タスク・スコープがフォークされた後にサブタスクに参加した場合にのみ、successSubtasksおよびfailedSubtasksにアクセスできることが保証されます。

jcmdコマンドを使用したStructuredTaskScopeおよびそのフォークしたサブタスクのデバッグ

jcmdツールは、JSON形式でスレッド・ダンプを発行できます。このスレッド・ダンプは、配列内のStructuredTaskScopeのフォークされたサブタスクを実行しているスレッドとそのスタック・トレースを表示します。

3つのサブタスクをフォークする次の例を考えてみます。これらのサブタスクは、メッセージの出力と1秒間のスリープを交互に繰り返し行います。

図14-5 SCObervable.java

import java.util.*;
import java.util.function.*;
import java.util.concurrent.*;
import java.util.stream.*;

public class SCObservable {
    
    static Long sleepOneSecond(String s) throws InterruptedException {
        long pid = ProcessHandle.current().pid();
        for (int i = 0; i<60; i++)  {
            System.out.println("[" + pid + ", " + s + "]" + " Sleeping for 1s...");
            Thread.sleep(1000);
        }
        return Long.valueOf(pid);
    }
    
    void handle() throws ExecutionException, InterruptedException {
        try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
            Supplier<Long> task1 = scope.fork(() -> sleepOneSecond("task1"));
            Supplier<Long> task2 = scope.fork(() -> sleepOneSecond("task2"));
            Supplier<Long> task3 = scope.fork(() -> sleepOneSecond("task3"));
            scope.join()
                 .throwIfFailed();
        }
    }
    
    public static void main(String[] args) {
        try {
            var myApp = new SCObservable();
            myApp.handle();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
次のような出力が表示されます:
[10852, task1] Sleeping for 1s...
[10852, task2] Sleeping for 1s...
[10852, task3] Sleeping for 1s...
[10852, task1] Sleeping for 1s...
...

この例の実行中に、別のコンソールで次のコマンドを実行してスレッド・ダンプを作成できます。ここで、<pid>は実行中のJavaプロセスのプロセスIDです:

jcmd <pid> Thread.dump_to_file -format=json <output_file>

サンプル・スレッド・ダンプ出力ファイルの次の抜粋は、配列内のフォークされたサブタスクのスレッドを含むStructuredTaskScopeを示しています。スレッド・ダンプには、StructuredTaskScopeの親への参照も表示されるため、プログラムの構造をスレッド・ダンプから再構成できます。

{
  "threadDump": {
    "processId": "10852",
    "time": "2023-06-22T13:59:05.156805300Z",
    "runtimeVersion": "21-ea+27-LTS-2343",
    "threadContainers": [
      {
        "container": "<root>",
        "parent": null,
        "owner": null,
        "threads": [
         {
           "tid": "1",
           "name": "main",
           "stack": [
              "java.base\/jdk.internal.misc.Unsafe.park(Native Method)",
              "java.base\/java.util.concurrent.locks.LockSupport.park(LockSupport.java:371)",
              "java.base\/jdk.internal.misc.ThreadFlock.awaitAll(ThreadFlock.java:315)",
              "java.base\/java.util.concurrent.StructuredTaskScope.implJoin(StructuredTaskScope.java:621)",
              "java.base\/java.util.concurrent.StructuredTaskScope.join(StructuredTaskScope.java:647)",
              "java.base\/java.util.concurrent.StructuredTaskScope$ShutdownOnFailure.join(StructuredTaskScope.java:1200)",
              "SCObservable.handle(SCObservable.java:22)",
              "SCObservable.main(SCObservable.java:30)"
           ]
         },
         
         ...
         
        ],
        "threadCount": "7"
      },

      ...

      {
        "container": "java.util.concurrent.StructuredTaskScope$ShutdownOnFailure@5674cd4d",
        "parent": "<root>",
        "owner": "1",
        "threads": [
         {
           "tid": "21",
           "name": "",
           "stack": [
              "java.base\/java.lang.VirtualThread.parkNanos(VirtualThread.java:631)",
              "java.base\/java.lang.VirtualThread.sleepNanos(VirtualThread.java:803)",
              "java.base\/java.lang.Thread.sleep(Thread.java:507)",
              "SCObservable.sleepOneSecond(SCObservable.java:12)",
              "SCObservable.lambda$handle$0(SCObservable.java:19)",
              "java.base\/java.util.concurrent.StructuredTaskScope$SubtaskImpl.run(StructuredTaskScope.java:883)",
              "java.base\/java.lang.VirtualThread.run(VirtualThread.java:311)"
           ]
         },
         {
           "tid": "23",
           "name": "",
           "stack": [
              ...
              "SCObservable.sleepOneSecond(SCObservable.java:12)",
              "SCObservable.lambda$handle$1(SCObservable.java:20)",
              ...
           ]
         },
         {
           "tid": "24",
           "name": "",
           "stack": [
              ...
              "SCObservable.sleepOneSecond(SCObservable.java:12)",
              "SCObservable.lambda$handle$2(SCObservable.java:21)",
              ...
           ]
         }
        ],
        "threadCount": "3"
      }
    ]
  }
}