Structured Concurrency

Structured concurrency treats groups of related tasks running in different threads as a single unit of work, thereby streamlining error handling and cancellation, improving reliability, and enhancing observability.

The principal class of the structured concurrency API is StructuredTaskScope in the java.util.concurrent package. This class enables you coordinate a group of concurrent subtasks as a unit. With a StructuredTaskScope instance, you fork each subtask, which runs them in their own individual thread. After, you join them as a unit. As a result, the StructuredTaskScope ensures that the subtasks are completed before the main task continues. Alternatively, you can specify that the application continues when one subtask succeeds.

Note:

This is a preview feature. A preview feature is a feature whose design, specification, and implementation are complete, but is not permanent. A preview feature may exist in a different form or not at all in future Java SE releases. To compile and run code that contains preview features, you must specify additional command-line options. See Preview Language and VM Features.

For background information about structured concurrency, see JEP 453.

Basic Usage of the StructuredTaskScope Class

To use the StructuredTaskScope class, you follow these general steps:

  1. Create a StructuredTaskScope; use it with a try-with-resources statement.
  2. Define your subtasks as instances of Callable.
  3. Within the try block, fork each subtask in its own thread with StructuredTaskScope::fork.
  4. Call StructuredTaskScope::join.
  5. Handle the outcome from the subtasks.
  6. Ensure that the StructuredTaskScope is shut down.

The following figure illustrates these steps. Notice that the task scope must wait for all subtasks to finish execution because of the join() method. Afterward, it can handle the results of the subtask.

Figure 14-2 Using the StructuredTaskScope Class


The text preceding this figure describes it.

In general, code that use the StructuredTaskScope class has the following structure:

    Callable<String> task1 = ...
    Callable<Integer> task2 = ...

    try (var scope = new StructuredTaskScope<Object>()) {

        Subtask<String> subtask1 = scope.fork(task1);
        Subtask<Integer> subtask2 = scope.fork(task2);

        scope.join();

        ... process results/exceptions ...

    } // close

Because the StructuredTaskScope was defined in a try-with-resources statement, at the end of the try block, the StructuredTaskScope is shut down, and the task scope waits for threads running any unfinished subtasks to complete.

The StructuredTaskScope class defines the shutdown method to shut down a task scope without closing it. This method cancels all unfinished subtasks by interrupting the threads. In addition, the shutdown method enables subclasses of StructuredTaskScope to implement a policy that doesn't require all subtasks to finish. The section Common Shutdown Policies: ShutdownOnSuccess and ShutdownOnFailure describe two subclasses of StructuredTaskScope, ShutdownOnSuccess and ShutdownOnFailure. The first implements a policy that shuts down a task scope as soon as a subtask completes successfully while the second shuts down a task scope as soon as a subtask throws an exception.

Common Shutdown Policies: ShutdownOnSuccess and ShutdownOnFailure

The StructuredTaskScope class contains two subclasses, ShutdownOnFailure and ShutdownOnSuccess. These subclasses implement two common shutdown policies. ShutdownOnFailure cancels all subtasks if one of them fails, while ShutdownOnSuccess cancels all remaining subtasks if one of them succeeds. These shutdown policies are examples of short-circuiting patterns. A short-circuiting pattern encourages subtasks to complete quickly by enabling the main task to interrupt and cancel subtasks whose outcomes are no longer needed.

The following example demonstrates the StructuredTaskScope.ShutdownOnFailure and StructuredTaskScope.ShutdownOnSuccess classes. Each task scope forks five subtasks that sleep for a random duration of time. However, if the duration is greater than a specified threshold, the subtask throws a TooSlowException. The handleShutDownOnFailure() method prints the total duration of all subtasks if none of them threw an exception. The handleShutDownOnSuccess() method prints the duration of the subtask that is completed first:

Figure 14-3 SCRandomTasks.java

import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.StructuredTaskScope.*;
import java.util.concurrent.StructuredTaskScope.Subtask;
import java.util.function.*;
import java.util.stream.*;

public class SCRandomTasks {
    
    class TooSlowException extends Exception {
        public TooSlowException(String s) {
            super(s);
        }
    }
    
    public Integer randomTask(int maxDuration, int threshold) throws InterruptedException, TooSlowException {
        int t = new Random().nextInt(maxDuration);
        System.out.println("Duration: " + t);
        if (t > threshold) {
            throw new TooSlowException("Duration " + t + " greater than threshold " + threshold);
        }
        Thread.sleep(t);
        return Integer.valueOf(t);        
    }        
    
    void handleShutdownOnFailure() throws ExecutionException, InterruptedException {
        try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
            // var t = new SCRandomTasks();
            var subtasks = IntStream.range(0, 5)
                                    .mapToObj(i -> scope.fork(() -> randomTask(1000, 850)))
                                    .toList();
            scope.join()
                 .throwIfFailed();
            var totalDuration = subtasks.stream()
                                        .map(t -> t.get())
                                        .reduce(0, Integer::sum);
            System.out.println("Total duration: " + totalDuration);
        }
    }
    
    void handleShutdownOnSuccess() throws ExecutionException, InterruptedException {
        try (var scope = new StructuredTaskScope.ShutdownOnSuccess()) {
            IntStream.range(0, 5)
                     .mapToObj(i -> scope.fork(() -> randomTask(1000, 850)))
                     .toList();
            scope.join();
            System.out.println("First task to finish: " + scope.result());
        }
    }    
    
    public static void main(String[] args) {
        var myApp = new SCRandomTasks();
        try {
            System.out.println("Running handleShutdownOnFailure...");
            myApp.handleShutdownOnFailure();
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
        try {
            System.out.println("Running handleShutdownOnSuccess...");
            myApp.handleShutdownOnSuccess();
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }      
    }
}

It prints output similar to the following:

Running handleShutdownOnFailure...
Duration: 359
Duration: 676
Duration: 322
Duration: 591
Duration: 315
Total duration: 2263
Running handleShutdownOnSuccess...
Duration: 480
Duration: 40
Duration: 868
Duration: 526
Duration: 532
First task to finish: 40

The StructuredTaskScope.ShutdownOnFailure class captures the first exception thrown by one of its subtasks, then invokes the shutdown method. This prevents any new subtasks from starting, interrupts all unfinished threads running other subtasks, and enables the application to continue running. To access the captured exception, call the ShutdownOnFailure::exception method. If you want to rethrow the exception instead, call the ShutdownOnFailure::throwIfFailed method, which this example demonstrates:

            scope.join()
                 .throwIfFailed();

The StructuredTaskScope.ShutdownOnSuccess class captures the result of the first subtask to be completed successfully, and like ShutdownOnFailure, invokes the shutdown method. To access the result of the subtask that completed successfully, call the ShutdownOnSuccess::result method, which this example demonstrates:

            System.out.println("First task to finish: " + scope.result());

Implement Your Own StructuredTaskScope Policies

You can implement your own StructuredTaskScope policies that handle subtasks differently than ShutdownOnFailure and ShutdownOnSuccess. Do this by extending the StructuredTaskScope class.

The following example, CollectingScope, contains two methods that return two streams of subtasks of the same type: successfulTasks() returns a stream of successful subtasks, and failedTasks() returns a stream of subtasks that threw an exception:

Figure 14-4 CollectingScope.java

import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;

public class CollectingScope<T> extends StructuredTaskScope<T> {
    private final Queue<Subtask<? extends T>> successSubtasks = new LinkedTransferQueue<>();
    private final Queue<Subtask<? extends T>> failedSubtasks = new LinkedTransferQueue<>();

    @Override
    protected void handleComplete(Subtask<? extends T> subtask) {
        if (subtask.state() == Subtask.State.SUCCESS) {
            successSubtasks.add(subtask);
        } else if (subtask.state() == Subtask.State.FAILED) {
            failedSubtasks.add(subtask);
        }
    }

    @Override
    public CollectingScope<T> join() throws InterruptedException {
        super.join();
        return this;
    }

    public Stream<Subtask<? extends T>> successfulTasks() {
        super.ensureOwnerAndJoined();
        return successSubtasks.stream();
    }
    
    public Stream<Subtask<? extends T>> failedTasks() {
        super.ensureOwnerAndJoined();
        return failedSubtasks.stream();
    }  
}

To use this class in the example SCRanndomTasks as described in Common Shutdown Policies: ShutdownOnSuccess and ShutdownOnFailure, add the following method. It prints the total duration of the subtasks that completed successfully and the exceptions of the subtasks that threw exceptions.

    void handleBoth() throws InterruptedException {
        try (var scope = new CollectingScope())  {
            // var t = new SCRandomTasks();
            var subtasks = IntStream.range(0, 5)
                                    .mapToObj(i -> scope.fork(() -> randomTask(1000, 500)))
                                    .toList();
            scope.join();

            var totalDuration = scope.successfulTasks()
                                     .mapToInt(st -> (Integer)((Subtask)st).get())
                                     .reduce(0, Integer::sum);
            System.out.println("Total duration: " + totalDuration);
            
            scope.failedTasks()
                 .forEach(ft ->
                    System.out.println(((Exception)((Subtask)ft).exception()).getMessage()));
        }        
    }

It prints output similar to the following:

Duration: 501
Duration: 211
Duration: 661
Duration: 903
Duration: 839
Total duration: 211
Duration 501 greater than threshold 500
Duration 661 greater than threshold 500
Duration 903 greater than threshold 500
Duration 839 greater than threshold 500

In the example CollectingScope, before successfulTasks() and failedTasks() return streams for successSubtasks and failedSubtasks, respectively, it calls StructuredTaskScope::ensureOwnerAndJoined. This ensures that the example can only access successSubtasks and failedSubtasks provided that the current thread is the owner of the task scope, and the task scope has joined the subtasks after they have been forked.

Debugging StructuredTaskScope and Its Forked Subtasks with the jcmd Command

The jcmd tool can emit a thread dump in JSON format. This thread dump displays the threads running the forked subtasks of a StructuredTaskScope in an array, along with their stack traces.

Consider the following example that forks three subtasks. These subtasks repeatedly alternate between printing a message and sleeping for one second.

Figure 14-5 SCObervable.java

import java.util.*;
import java.util.function.*;
import java.util.concurrent.*;
import java.util.stream.*;

public class SCObservable {
    
    static Long sleepOneSecond(String s) throws InterruptedException {
        long pid = ProcessHandle.current().pid();
        for (int i = 0; i<60; i++)  {
            System.out.println("[" + pid + ", " + s + "]" + " Sleeping for 1s...");
            Thread.sleep(1000);
        }
        return Long.valueOf(pid);
    }
    
    void handle() throws ExecutionException, InterruptedException {
        try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
            Supplier<Long> task1 = scope.fork(() -> sleepOneSecond("task1"));
            Supplier<Long> task2 = scope.fork(() -> sleepOneSecond("task2"));
            Supplier<Long> task3 = scope.fork(() -> sleepOneSecond("task3"));
            scope.join()
                 .throwIfFailed();
        }
    }
    
    public static void main(String[] args) {
        try {
            var myApp = new SCObservable();
            myApp.handle();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
It prints output similar to the following:
[10852, task1] Sleeping for 1s...
[10852, task2] Sleeping for 1s...
[10852, task3] Sleeping for 1s...
[10852, task1] Sleeping for 1s...
...

While this example is running, you can create a thread dump by running the following command in a different console, where <pid> is the process ID of the running Java process:

jcmd <pid> Thread.dump_to_file -format=json <output_file>

The following excerpt from a sample thread dump output file shows the StructuredTaskScope with the threads of its forked subtasks in an array. The thread dump also shows the reference to the parent of the StructuredTaskScope so that the structure of the program can be reconstituted from the thread dump:

{
  "threadDump": {
    "processId": "10852",
    "time": "2023-06-22T13:59:05.156805300Z",
    "runtimeVersion": "21-ea+27-LTS-2343",
    "threadContainers": [
      {
        "container": "<root>",
        "parent": null,
        "owner": null,
        "threads": [
         {
           "tid": "1",
           "name": "main",
           "stack": [
              "java.base\/jdk.internal.misc.Unsafe.park(Native Method)",
              "java.base\/java.util.concurrent.locks.LockSupport.park(LockSupport.java:371)",
              "java.base\/jdk.internal.misc.ThreadFlock.awaitAll(ThreadFlock.java:315)",
              "java.base\/java.util.concurrent.StructuredTaskScope.implJoin(StructuredTaskScope.java:621)",
              "java.base\/java.util.concurrent.StructuredTaskScope.join(StructuredTaskScope.java:647)",
              "java.base\/java.util.concurrent.StructuredTaskScope$ShutdownOnFailure.join(StructuredTaskScope.java:1200)",
              "SCObservable.handle(SCObservable.java:22)",
              "SCObservable.main(SCObservable.java:30)"
           ]
         },
         
         ...
         
        ],
        "threadCount": "7"
      },

      ...

      {
        "container": "java.util.concurrent.StructuredTaskScope$ShutdownOnFailure@5674cd4d",
        "parent": "<root>",
        "owner": "1",
        "threads": [
         {
           "tid": "21",
           "name": "",
           "stack": [
              "java.base\/java.lang.VirtualThread.parkNanos(VirtualThread.java:631)",
              "java.base\/java.lang.VirtualThread.sleepNanos(VirtualThread.java:803)",
              "java.base\/java.lang.Thread.sleep(Thread.java:507)",
              "SCObservable.sleepOneSecond(SCObservable.java:12)",
              "SCObservable.lambda$handle$0(SCObservable.java:19)",
              "java.base\/java.util.concurrent.StructuredTaskScope$SubtaskImpl.run(StructuredTaskScope.java:883)",
              "java.base\/java.lang.VirtualThread.run(VirtualThread.java:311)"
           ]
         },
         {
           "tid": "23",
           "name": "",
           "stack": [
              ...
              "SCObservable.sleepOneSecond(SCObservable.java:12)",
              "SCObservable.lambda$handle$1(SCObservable.java:20)",
              ...
           ]
         },
         {
           "tid": "24",
           "name": "",
           "stack": [
              ...
              "SCObservable.sleepOneSecond(SCObservable.java:12)",
              "SCObservable.lambda$handle$2(SCObservable.java:21)",
              ...
           ]
         }
        ],
        "threadCount": "3"
      }
    ]
  }
}