構造化並行性

構造化並行性は、異なるスレッドで実行されている関連タスクのグループを1つの作業単位として処理することにより、エラーの処理と取消しを合理化し、信頼性と可観測性を向上させます。

構造化並行性では、タスク(作業単位)が複数の同時サブタスクに分割されます。これらのサブタスクは、タスクを続行する前に完了する必要があります。サブタスクはスコープ内にグループ化され、java.util.concurrentパッケージのStructuredTaskScopeクラスによって表されます。スコープ内でサブタスクを実行するには、値を返すメソッドを実行する分岐します。デフォルトでは、これはサブタスクを実行するスコープ内の新しい仮想スレッドを開始します。サブタスクを分岐した後、StructuredTaskScope::joinメソッドをコールしてサブタスクを結合します。その結果、スコープはすべての分岐されたサブタスクが完了するまで待機します。デフォルトでは、すべてのサブタスクが正常に完了した場合、joinメソッドはnullを返します。それ以外の場合は例外をスローします。これはスコープのデフォルトのポリシーです。ジョイナを指定すると、別のポリシーを指定できます。たとえば、すべてのサブタスクが正常に完了した場合、すべてのサブタスクのストリームを返すジョイナがあります。

サブタスクでは、独自のスコープを作成して独自のサブタスクを分岐できるため、スコープの階層を作成できます。サブタスクの存続期間は、その含まれるスコープの存続期間に制限されます。サブタスクのすべてのスレッドは、スコープが閉じられると必ず終了されます。jcmdコマンドを使用してスレッド・ダンプを生成することで、スコープのこの階層を確認できます。

ノート:

これはプレビュー機能です。プレビュー機能は、設計、仕様および実装が完了したが、永続的でない機能です。プレビュー機能は、将来のJava SEリリースで、異なる形式で存在することもあれば、まったく存在しないこともあります。プレビュー機能が含まれているコードをコンパイルして実行するには、追加のコマンド行オプションを指定する必要があります。『Preview Language and VM Features』を参照してください。

構造化並行性の背景情報は、JEP 505を参照してください。

StructuredTaskScopeクラスの基本的な使用方法

StructuredTaskScopeクラスを使用するには、次の一般的なステップに従います。

  1. try-with-resources文で静的openメソッドのいずれかをコールして、新しいStructuredTaskScopeを開きます。スコープを開くスレッドは、スコープの所有者です。
  2. サブタスクをCallableまたはRunnableのインスタンスとして定義します。
  3. tryブロック内で、StructuredTaskScope::forkを使用して独自のスレッドの各サブタスクをフォークします。
  4. StructuredTaskScope::joinをコールして、スコープのすべてのサブタスクを1つの単位として結合します。その結果、StructuredTaskScopeはすべてのサブタスクが完了するのを待機し、結果を返します。これにより、例外がスローされることがあります。
  5. StructuredTaskScope::joinの結果を処理します。
  6. 通常はtry-with-resources文を使用して暗黙的にスコープを閉じます。スコープがまだ取り消されていない場合は、スコープを取り消します。これにより、新しいスレッドがスコープ内で開始されなくなり、未完了のサブタスクを実行しているスレッドが中断されます。

次の図に、これらのステップを示します。join()メソッドが理由で、StructuredTaskScopeはすべてのサブタスクの実行が終了するまで待機する必要があることに注意してください。

図14-2 StructuredTaskScopeクラスの使用


この図については前の文で説明しています。

通常、StructuredTaskScopeクラスを使用するコードには次の構造があります。

Callable<String> task1 = () -> { return "Hello World"; };
Callable<Integer> task2 = () -> { return Interlingual(42); };

// Open a new StructuredTaskScope
try (var scope = StructuredTaskScope.open()) {
    // Fork subtasks
    Subtask<String> subtask1 = scope.fork(task1);
    Subtask<Integer> subtask2 = scope.fork(task2);

    // Join the scope's subtasks and propagate exceptions
    scope.join();
                            
    // Process the join method's results
    System.out.println("subtask1: " + subtask1.get());
    System.out.println("subtask2: " + subtask2.get());
                            
} catch (InterruptedException e) {
    System.out.println("InterruptedException");
}           

パラメータがゼロのopen()ファクトリ・メソッドは、デフォルト・ポリシーを実装するStructuredTaskScopeを作成および開きます。これは、すべてのサブタスクが正常に完了した場合はnullを返し、サブタスクが失敗した場合はStructuredTaskScope.FailedExceptionをスローします。StructuredTaskSope.Joinerをパラメータとして使用するopenファクトリ・メソッドのいずれかをコールして、別のポリシーを指定できます。

サブタスクを開始するには、fork(Callable)またはfork(Runnable)メソッドをコールします。これにより、サブタスク(デフォルトでは仮想スレッド)を実行するためのスレッドが開始されます。

スコープの所有者スレッドは、スコープ内からjoinメソッドをコールする必要があります。joinメソッドは、このスコープで開始されたすべてのサブタスクが完了するか、スコープが取り消されるまで待機します。デフォルトのポリシーに従って、サブタスクが失敗した場合、joinメソッドは例外をスローし、スコープは取り消されます。すべてのサブタスクが成功した場合、通常、joinメソッドが完了し、nullが返されます。ジョイナを使用してStructuredTaskScopeを開くと、joinメソッドは別のタイプの値を返すことができます。

スコープのブロックが結合前に終了すると、スコープは取り消され、所有者はすべてのサブタスクが終了するまでcloseメソッドで待機してから例外をスローします。

結合後、スコープの所有者は、forkメソッドから返されたサブタスク・オブジェクトを使用してサブタスクの結果を処理できます。たとえば、Subtask::getメソッドをコールして、正常に完了したサブタスクの結果を取得します。このメソッドは、結合前にコールされた場合、例外をスローすることに注意してください。

ジョイナ

ジョイナとはStructuredTaskScopeとともに使用されるオブジェクトで、サブタスクの完了を処理し、サブタスクの完了をjoinメソッドで待機しているスコープ所有者に対して結果を生成します。ジョイナによっては、joinメソッドが結果、要素のストリームまたはその他のオブジェクトを返す場合があります。

StructuredTaskScope.Joinerインタフェースは、一般的に使用されるポリシーのジョイナを作成する次の静的メソッドを定義します:

表14-2 ポリシーの静的メソッド

静的メソッド StructuredTaskScope::joinメソッドの結果
allSuccessfulOrThrow
  • すべてが正常に完了したすべてのサブタスクのストリームを返します
  • サブタスクが失敗した場合、例外をスローします
anySuccessfulResultOrThrow
  • 最初に成功したサブタスクの結果を返します
  • すべてのサブタスクが失敗した場合、例外をスローします
awaitAllSuccessfulorThrow
  • すべてのサブタスクが完了したら、nullを返します
  • 失敗した最初のサブタスクから例外をスローします
awaitAll
  • すべてのサブタスクが完了したら、nullを返します
  • 例外をスローしません

次の例では、allSuccessfulOrThrowによって返されたジョイナを使用してStructuredTaskScopeを開きます。StructuredTaskScopeは5つのサブタスクを分岐し、それぞれがrandomTaskを実行します。randomTaskメソッドは、パラメータとして最大期間としきい値を取ります。randomTaskメソッドは、期間をランダムに生成します。この期間がしきい値より大きい場合は、TooSlowExceptionがスローされます。

class TooSlowException extends Exception {
    public TooSlowException(String s) {
        super(s);
    }
}

Integer randomTask(int maxDuration, int threshold) throws InterruptedException, TooSlowException {
    int t = new Random().nextInt(maxDuration);
    if (t > threshold) {
        throw new TooSlowException("Duration " + t + " greater than threshold " + threshold);
    }
    Thread.sleep(t);
    System.out.println("Duration: " + t);
    return Integer.valueOf(t);        
} 

void runConcurrentlyRandomTasks() {

    List<Callable<Integer>> subtasks = IntStream.range(0, 5)
                            .mapToObj(i -> (Callable<Integer>) () -> randomTask(1000, 900))
                            .toList();
                                
    try (var scope = StructuredTaskScope.open(Joiner.<Integer>allSuccessfulOrThrow())) {
        subtasks.forEach(scope::fork);
        scope.join().forEach(e -> System.out.println("Result: " + e.get()));
    } catch (InterruptedException e) {
        System.out.println("InterruptedException");         
    } catch (StructuredTaskScope.FailedException e) {
        Throwable cause = e.getCause();
        System.out.println("FailedException: " + cause.getClass().getSimpleName() + ": " + cause.getMessage());
    } 
}

例では、5つのサブタスクすべてが例外をスローしない場合、次のような出力が表示されます:

Duration: 312
Duration: 635
Duration: 672
Duration: 816
Duration: 891
Result: 635
Result: 891
Result: 672
Result: 816
Result: 312

例では、1つのサブタスクが例外をスローした場合、次のような出力が表示されます:

FailedException: TooSlowException: Duration 966 greater than threshold 900

この例では、StructuredTaskScopeallSuccessfulOrThrowメソッドによって返されたジョイナで開かれたため、そのjoinメソッドはサブタスクのストリームを返します(すべてのサブタスクが正常に完了した場合)。

ヒント:

同じタイプの一連のサブタスクを分岐する場合は、次のパターンを使用できます:
<T> List<T> runConcurrently(Collection<Callable<T>> tasks) throws InterruptedException {
    try (var scope = StructuredTaskScope.open(Joiner.<T>allSuccessfulOrThrow())) {
        tasks.forEach(scope::fork);
        return scope.join().map(Subtask::get).toList();
    }
}

StructuredTaskScope::joinが返す値の処理方法は、ジョイナによって異なります。たとえば、anySuccessfulResultOrThrowメソッドによって返されたジョイナを使用して開かれたStructuredTaskScopejoinメソッドは、最初に成功したサブタスクの結果を返します:

<T> T race(Collection<Callable<T>> tasks) throws InterruptedException {
    try (var scope = StructuredTaskScope.open(Joiner.<T>anySuccessfulResultOrThrow())) {
        tasks.forEach(scope::fork);
        return scope.join();
    }
}

カスタム・ジョイナ

StructuredTaskScope.Joiner<T,R>インタフェースを実装することで独自のカスタム・ジョイナを作成できます:

public static interface Joiner<T, R> {
    public default boolean onFork(Subtask<? extends T> subtask);
    public default boolean onComplete(Subtask<? extends T> subtask);
    public R result() throws Throwable;
}

パラメータTは、スコープ内で実行されるサブタスクの結果タイプで、Rjoinメソッドの結果タイプです。

onForkメソッドは、サブタスクの分岐時に呼び出されます。onCompleteメソッドは、サブタスクの完了で呼び出されます。

onForkおよびonCompleteメソッドは、スコープを取り消す必要があるかどうかを示すboolean値を返します。

resultメソッドが呼び出され、すべてのサブタスクが完了したらjoinメソッドの結果を生成するか、スコープが取り消された場合は例外をスローします。

次の例のCollectingJoinerは、正常に完了したサブタスクの結果を収集し、失敗したサブタスクを無視するジョイナです。

class CollectingJoiner<T> implements Joiner<T, Stream<T>> {
                
    private final Queue<T> results = new ConcurrentLinkedQueue<>();
                    
    public boolean onComplete(Subtask<? extends T> subtask) {
        if (subtask.state() == Subtask.State.SUCCESS) {
            results.add(subtask.get());
        }
        return false;
    }
        
    public Stream<T> result() {
        return results.stream();
    }
}

onCompleteメソッドは複数のスレッドによって同時に呼び出すことができます。したがって、CollectingJoinerはスレッド・セーフであり、成功したサブタスクの結果をConccurrentLinkedQueueに格納します。メソッドSubtask::stateは、タイプStructuredTaskScope.Subtask.Stateの次の値のいずれかを返すことができます。

  • FAILED: サブタスクが例外で失敗しました。
  • SUCCESS: サブタスクが正常に完了しました。
  • UNAVAILABLE: サブタスクの結果または例外は使用できません。この状態は、サブタスクが分岐されたが、まだ完了していないこと、スコープが取り消された後に完了したこと、またはスコープが取り消された後に分岐されたことを示します。

resultメソッドは、成功したサブタスク結果のストリームを返します。

次の例では、このカスタム・ポリシーを使用します:

<T> List<T> allSuccessful(List<Callable<T>> tasks) throws InterruptedException {
    try (var scope = StructuredTaskScope.open(new CollectingJoiner<T>())) {
        tasks.forEach(scope::fork);
        return scope.join().toList();
    }
}

void testCollectingJoiner() {
    List<Callable<Integer>> subtasks = IntStream
        .range(0, 10)
        .mapToObj(i -> (Callable<Integer>) () -> randomTask(1000, 300))
        .collect(Collectors.toList());

    try {                                
        allSuccessful(subtasks).stream().forEach(r -> System.out.println("Result: " + r));
    } catch (InterruptedException e) {
        Throwable cause = e.getCause();
        System.out.println("FailedException: " + cause.getClass().getSimpleName() + ": " + cause.getMessage());            
    }
}

次のような出力が表示されます:

Duration: 122
Duration: 238
Result: 122
Result: 238

StructuredTaskScopeの構成

StructuredTaskScope::openメソッドの1つは、ジョイナに加えて、StructuredTaskScope.Configオブジェクトをパラメータとして受け入れます。このオブジェクトでは、次のことができます:

  • 監視および管理の目的でのスコープの名前の設定
  • スコープのタイムアウトの設定
  • スコープのforkメソッドがスレッドの作成に使用するスレッド・ファクトリを設定

次の例では、タイムアウト200ミリ秒を指定する構成オブジェクトを使用してStructuredTaskScopeを開きます。これは、StructuredTaskScopeで分岐されたサブタスクが200ミリ秒以内に完了しない場合、TimeoutExceptionがスローされることを意味します:

void runConcurrentlyConfiguredRandomTasks() {
    var subtasks = IntStream.range(0, 5)
                            .mapToObj(i -> (Callable<Integer>) () -> randomTask(1000, 900))
                            .collect(Collectors.toList());
                                
    try (var scope = StructuredTaskScope.open(Joiner.<Integer>allSuccessfulOrThrow(),
                                              cf -> cf.withTimeout(Duration.ofMillis(200)))) {
        subtasks.forEach(scope::fork);
        Stream<Subtask<Integer>> s = scope.join();
        s.forEach(r -> System.out.println("Result: " + r.get()));
    } catch (InterruptedException e) {
        System.out.println("InterruptedException");
    } catch (StructuredTaskScope.TimeoutException e) {
        System.out.println("TimeoutException");
    } catch (StructuredTaskScope.FailedException e) {
        Throwable cause = e.getCause();
        System.out.println("FailedException: " + cause.getClass().getSimpleName() + ": " + cause.getMessage());
    }
}

スコープ階層および可観測性

サブタスクは、独自のStructuredTaskScopeを作成して独自のサブタスクを分岐できるため、スコープの階層を作成できます。サブタスクの存続期間は、その含まれるスコープの存続期間に制限されます。サブタスクのすべてのスレッドは、スコープが閉じられると必ず終了されます。jcmdコマンドを使用してスレッド・ダンプを生成することで、スコープのこの階層を確認できます。

次の例では、RandomTaskScopeRandomTaskScopeInsideSubtaskおよびRandomTaskSubscopeという名前の3つのスコープがあります。RandomTaskScopeInsideSubtaskという名前のスコープは、サブタスク内で開かれたスコープです。RandomTaskSubscopeという名前のスコープは、RandomTaskScopeという名前のスコープ内で開かれるスコープです。これらの3つのスコープは、名前およびスレッド・ファクトリを指定するStructuredTaskScope.Configオブジェクトで開かれます。このスレッド・ファクトリは、一意の名前を持つ仮想スレッドを作成します。jcmdコマンドは、スレッド・ダンプの生成時にこれらのスコープおよび仮想スレッド名を使用します。

public class SCObservable {
    
    ThreadFactory factory = Thread.ofVirtual().name("RandomTask-", 0).factory();
    
    static String sleepOneSecond(String s) throws InterruptedException {
        long pid = ProcessHandle.current().pid();
        String threadName = null;
        for (int i = 0; i < 20; i++)  {
            threadName = Thread.currentThread().getName();
            System.out.println("PID: " + pid + ", name: " + s + ", thread name: " + Thread.currentThread().getName());
            Thread.sleep(1000);
        }
        return threadName;
    }
    
    void handle() throws InterruptedException {
        try (var scope = StructuredTaskScope.open(StructuredTaskScope.Joiner.<String>allSuccessfulOrThrow(),
                                                   cf -> cf.withThreadFactory(factory)
                                                           .withName("RandomTaskScope"))) {
            Supplier<String> task0 = scope.fork(() -> sleepOneSecond("task0"));
            Supplier<String> task1 = scope.fork(() -> sleepOneSecond("task1"));
            
            Callable<String> t = () -> {
                String results = "Result in RandomTaskScopeInsideSubtask: ";
                try (var subtaskscope = StructuredTaskScope.open(
                        StructuredTaskScope.Joiner.<String>allSuccessfulOrThrow(),
                        cf -> cf.withThreadFactory(factory)
                                .withName("RandomTaskScopeInsideSubtask"))) {
                    Supplier<String> task2a = subtaskscope.fork(() -> sleepOneSecond("task2a"));
                    Supplier<String> task2b = subtaskscope.fork(() -> sleepOneSecond("task2b"));
                    results += subtaskscope.join().map(Subtask::get).collect(Collectors.joining(", "));
                }
                return results;
            };
            
            Supplier<String> task2 = scope.fork(t);
            
            try (var childscope = StructuredTaskScope.open(
                    StructuredTaskScope.Joiner.<String>allSuccessfulOrThrow(),
                    cf -> cf.withThreadFactory(factory)
                            .withName("RamdomTaskSubscope"))) {
                Supplier<String> task2a = childscope.fork(() -> sleepOneSecond("task3a"));
                Supplier<String> task2b = childscope.fork(() -> sleepOneSecond("task3b"));
                childscope.join().forEach(r -> System.out.println("Result in RamdomTaskSubscope: " + r.get()));
            }
            scope.join().forEach(r -> System.out.println("Result in RandomTaskScope: " + r.get()));
        }
    }
    
    public static void main(String[] args) {
        try {
            var myApp = new SCObservable();
            myApp.handle();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

次のような出力が表示されます:

PID: 13560, name: task3b, thread name: RandomTask-5
PID: 13560, name: task0, thread name: RandomTask-0
PID: 13560, name: task1, thread name: RandomTask-1
PID: 13560, name: task2b, thread name: RandomTask-6
PID: 13560, name: task3a, thread name: RandomTask-3
PID: 13560, name: task2a, thread name: RandomTask-4
PID: 13560, name: task0, thread name: RandomTask-0
PID: 13560, name: task1, thread name: RandomTask-1
PID: 13560, name: task3b, thread name: RandomTask-5
PID: 13560, name: task2a, thread name: RandomTask-4
PID: 13560, name: task2b, thread name: RandomTask-6
PID: 13560, name: task3a, thread name: RandomTask-3
...
Result in RamdomTaskSubscope: RandomTask-3
Result in RamdomTaskSubscope: RandomTask-5
Result in RandomTaskScope: RandomTask-0
Result in RandomTaskScope: RandomTask-1
Result in RandomTaskScope: Result in RandomTaskScopeInsideSubtask: RandomTask-4, RandomTask-6

SCObservableの実行中に、次のコマンドを実行して、JSON形式でスレッド・ダンプを作成できます:

jcmd <PID> Thread.dump_to_file -format=json <file>

スレッド・ダンプは次のようになります。この例のサブタスク階層をより適切に説明するために、仮想スレッドの名前およびスコープを表すJSONオブジェクトに関する情報のみが含まれています:

{
  "threadDump": {
    "processId": "13560",
    "time": "2025-06-27T20:31:26.138549300Z",
    "runtimeVersion": "25-ea+27-LTS-3363",
    "threadContainers": [
      {
        "container": "<root>", "parent": null, "owner": null,
        "threads": [
          { "tid": "3", "name": "main" },
          ... other threads omitted ...
        ],
        "threadCount": "8"
      },
      ... ForkJoinPool containers omitted ...
      {
        "container": "RandomTaskScope\/jdk.internal.misc.ThreadFlock$ThreadContainerImpl@44c794fd",
        "parent": "<root>",
        "owner": "3",
        "threads": [
          { "tid": "36", "virtual": true, "name": "RandomTask-0" },
          { "tid": "38", "virtual": true, "name": "RandomTask-1" },
          { "tid": "41", "virtual": true, "name": "RandomTask-2" }
        ],
        "threadCount": "3"
      },
      {
        "container": "RamdomTaskSubscope\/jdk.internal.misc.ThreadFlock$ThreadContainerImpl@44af33ca",
        "parent": "RandomTaskScope\/jdk.internal.misc.ThreadFlock$ThreadContainerImpl@44c794fd",
        "owner": "3",
        "threads": [
          { "tid": "43", "virtual": true, "name": "RandomTask-3" },
          { "tid": "46", "virtual": true, "name": "RandomTask-5" }
        ],
        "threadCount": "2"
      },
      {
        "container": "RandomTaskScopeInsideSubtask\/jdk.internal.misc.ThreadFlock$ThreadContainerImpl@3c3727de",
        "parent": "RandomTaskScope\/jdk.internal.misc.ThreadFlock$ThreadContainerImpl@44c794fd",
        "owner": "41",
        "threads": [
          { "tid": "48", "virtual": true, "name": "RandomTask-6" },
          { "tid": "44", "virtual": true, "name": "RandomTask-4" }
        ],
        "threadCount": "2"
      }
    ]
  }
}

各スコープのJSONオブジェクトには、スコープ内で分岐されたスレッドの配列が含まれます。スコープのJSONオブジェクトには、プログラムの構造をスレッド・ダンプから再構成できるように、その親への参照もあります。