类 StructuredTaskScope<T>

java.lang.Object
jdk.incubator.concurrent.StructuredTaskScope<T>
类型参数:
T - 在范围内执行的任务的结果类型
所有已实现的接口:
AutoCloseable
已知子类:
StructuredTaskScope.ShutdownOnFailure , StructuredTaskScope.ShutdownOnSuccess

public class StructuredTaskScope<T> extends Object implements AutoCloseable
structured concurrency 的基本 API。 StructuredTaskScope 支持任务拆分为多个并发子任务,在各自的线程中执行,以及子任务必须在主任务继续之前完成的情况。 StructuredTaskScope 可用于确保并发操作的生命周期受 syntax block 限制,就像结构化编程中的顺序操作一样。

基本用法

StructuredTaskScope 是使用其公共构造函数之一创建的。它定义了启动线程执行任务的fork 方法,等待所有线程完成的join 方法,以及关闭任务作用域的close 方法。该 API 旨在与 try-with-resources 构造一起使用。其意图是 block 中的代码使用 fork 方法派生线程来执行子任务,等待线程使用 join 方法完成,然后使用 process the results 。结果处理可能包括处理或重新抛出异常。
  try (var scope = new StructuredTaskScope<Object>()) {

    Future<Integer> future1 = scope.fork(task1);
    Future<String> future2 = scope.fork(task2);

    scope.join();

    ... process results/exceptions ...

  } // close
 
为确保正确使用,joinclose 方法只能由 owner(打开/创建任务范围的线程})调用,如果所有者在分叉后未调用 join 方法,close 方法将在关闭后抛出异常。

StructuredTaskScope 定义了 shutdown 方法来关闭任务范围而不关闭它。关闭对于子任务完成并出现结果(或异常)并且不再需要其他未完成的子任务的结果的情况很有用。如果子任务在所有者等待 join 方法时调用 shutdown,那么它将导致 join 唤醒,所有未完成的线程都将变为 interrupted 并阻止新线程在任务范围内启动。

具有常见案例策略的子类

定义了 StructuredTaskScope 的两个子类来实现常见情况的策略:
  1. ShutdownOnSuccess 捕获第一个结果并关闭任务范围以中断未完成的线程并唤醒所有者。此类适用于任何子任务的结果都将执行(“调用任何”)并且无需等待其他未完成任务的结果的情况。它定义了获取第一个结果或在所有子任务失败时抛出异常的方法。
  2. ShutdownOnFailure 捕获第一个异常并关闭任务范围。此类适用于需要所有子任务的结果(“全部调用”)的情况;如果任何子任务失败,则不再需要其他未完成子任务的结果。 If 定义了在任何子任务失败时抛出异常的方法。

以下是使用这两个类的两个示例。在这两种情况下,一对子任务被分叉以从两个 URL 位置“左”和“右”获取资源。第一个示例创建一个 ShutdownOnSuccess 对象来捕获第一个子任务正常完成的结果,通过关闭任务范围取消另一个。主任务在 join 中等待,直到任一子任务完成并返回结果或两个子任务都失败。它调用 result(Function) 方法来获取捕获的结果。如果两个子任务都失败,则此方法将抛出一个 WebApplicationException,其中一个子任务异常是原因。

  try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {

    scope.fork(() -> fetch(left));
    scope.fork(() -> fetch(right));

    scope.join();

    String result = scope.result(e -> new WebApplicationException(e));

    ...
  }
 
第二个示例创建一个 ShutdownOnFailure 对象来捕获第一个子任务失败的异常,通过关闭任务范围来取消另一个。主任务在 joinUntil(Instant) 中等待,直到两个子任务都完成并产生结果,要么失败,要么达到截止日期。当任一子任务失败时,它调用 throwIfFailed(Function) 以抛出异常。如果没有子任务失败,则此方法是空操作。主要任务使用 FutureresultNow() 方法来检索结果。
  Instant deadline = ...

  try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {

    Future<String> future1 = scope.fork(() -> query(left));
    Future<String> future2 = scope.fork(() -> query(right));

    scope.joinUntil(deadline);

    scope.throwIfFailed(e -> new WebApplicationException(e));

    // both subtasks completed successfully
    String result = Stream.of(future1, future2)
        .map(Future::resultNow)
        .collect(Collectors.joining(", ", "{ ", " }"));

    ...
  }
 

扩展 StructuredTaskScope

StructuredTaskScope 可以扩展,handleComplete 可以被覆盖,以实现 ShutdownOnSuccessShutdownOnFailure 实现的策略以外的策略。例如,可以重写该方法以收集完成结果的子任务的结果并忽略失败的子任务。它可能会在子任务失败时收集异常。它可能会调用 shutdown 方法关闭并导致 join 在出现某些情况时唤醒。

子类通常会定义方法,以便为在 join 方法之后执行的代码提供可用的结果、状态或其他结果。收集结果并忽略失败的子任务的子类可以定义一个返回结果集合的方法。实现子任务失败时关闭策略的子类可以定义一个方法来检索第一个失败的子任务的异常。

以下是一个 StructuredTaskScope 实现的示例,它收集成功完成的子任务的结果。它定义了方法results() 由主要任务用来检索结果。

  class MyScope<T> extends StructuredTaskScope<T> {
    private final Queue<T> results = new ConcurrentLinkedQueue<>();

    MyScope() {
      super(null, Thread.ofVirtual().factory());
    }

    @Override
    protected void handleComplete(Future<T> future) {
      if (future.state() == Future.State.SUCCESS) {
        T result = future.resultNow();
        results.add(result);
      }
    }

    // Returns a stream of results from the subtasks that completed successfully
    public Stream<T> results() {
      return results.stream();
    }
  }
 

树结构

任务范围形成一棵树,其中在打开新任务范围时隐式建立父子关系:
  • 当在任务范围内启动的线程打开它自己的任务范围时,就建立了父子关系。在任务范围“A”中启动并打开任务范围“B”的线程建立父子关系,其中任务范围“A”是任务范围“B”的父级。
  • 通过嵌套建立父子关系。如果线程打开任务作用域“B”,然后打开任务作用域“C”(在关闭“B”之前),则封闭任务作用域“B”是嵌套任务作用域“C”的父级。
descendants任务作用域的子任务作用域是它作为父任务作用域的子任务作用域,加上子任务作用域的子任务作用域,递归地。

树结构支持:

  • 跨线程继承作用域值
  • 月子检查。方法描述中的短语“任务范围中包含的线程”表示在任务范围或后代范围中启动的线程。

以下示例演示了作用域值的继承。作用域值 USERNAME 绑定到值“duke”。创建一个 StructuredTaskScope 并调用其 fork 方法以启动一个线程来执行 childTask。线程继承创建任务作用域时捕获的作用域值bindingschildTask 中的代码使用范围值的值,因此读取值“duke”。

  private static final ScopedValue<String> USERNAME = ScopedValue.newInstance();

  ScopedValue.where(USERNAME, "duke", () -> {
    try (var scope = new StructuredTaskScope<String>()) {

      scope.fork(() -> childTask());
      ...
     }
  });

  ...

  String childTask() {
    String name = USERNAME.get();  // "duke"
    ...
  }
 

StructuredTaskScope此时没有定义公开树结构的API。

除非另有说明,否则将 null 参数传递给此类中的构造函数或方法将导致抛出 NullPointerException

记忆一致性效应

Callable 任务的 分叉 之前的任务范围内的所有者线程或包含的线程中的操作 发生在之前 该任务采取的任何操作,这反过来发生在之前任务结果通过其 Future 检索,或者发生在之前在任务范围的 joining 之后的线程中执行的任何操作。

Java 语言规范:
17.4.5 先于顺序发生
自从:
19
  • 构造方法详细信息

    • StructuredTaskScope

      public StructuredTaskScope(String  name, ThreadFactory  factory)
      创建具有给定名称和线程工厂的结构化任务范围。出于监视和管理的目的,可选择命名任务范围。当任务为 forked 时,线程工厂用于 create 线程。任务范围由当前线程拥有。

      此方法捕获当前线程的 作用域值 绑定,以供在任务范围内创建的线程继承。类描述中的 树结构 部分详细说明了如何为继承作用域值绑定而隐式建立父子关系。

      参数:
      name - 任务范围的名称,可以为空
      factory - 线程工厂
    • StructuredTaskScope

      public StructuredTaskScope()
      创建一个未命名的结构化任务范围,该范围创建虚拟线程。任务范围由当前线程拥有。

      此构造函数等效于调用名称为 null 的 2-arg 构造函数和创建虚拟线程的线程工厂。

      抛出:
      UnsupportedOperationException - 如果未启用预览功能
  • 方法详情

    • handleComplete

      protected void handleComplete(Future <T > future)
      在范围关闭之前任务完成时调用。

      handleComplete 方法应该是线程安全的。它可以被多个线程同时调用。

      实现要求:
      默认实现什么都不做。
      参数:
      future - 完成的任务
    • fork

      public <U extends T > Future <U> fork(Callable <? extends U> task)
      启动一个新线程来运行给定的任务。

      使用任务范围的 ThreadFactory 创建新线程。它继承了当前线程的 作用域值 绑定。绑定必须与创建任务范围时捕获的绑定相匹配。

      如果任务在任务范围为 shutdown 之前完成,则调用 handleComplete 方法来使用已完成的任务。 handleComplete 方法在任务完成并出现结果或异常时运行。如果 Futurecancel 方法用于在任务范围关闭之前取消任务,则 handleComplete 方法由调用 cancel 的线程运行。如果任务范围在任务完成或取消的同时或大约同时关闭,则可能会或可能不会调用 handleComplete 方法。

      如果此任务范围是 shutdown(或正在关闭过程中),则 fork 返回一个 Future 表示未运行的 cancelled 任务。

      此方法只能由任务范围所有者或任务范围中包含的线程调用。返回的 Future 对象的 cancel 方法也仅限于任务范围所有者或任务范围中包含的线程。如果从另一个线程调用,cancel 方法将抛出 WrongThreadException 。返回的 Future 对象上的所有其他方法(例如 get )不受限制。

      类型参数:
      U - 结果类型
      参数:
      task - 要运行的任务
      返回:
      一个未来
      抛出:
      IllegalStateException - 如果此任务范围已关闭
      WrongThreadException - 如果当前线程不是所有者或包含在任务范围内的线程
      StructureViolationException - 如果当前作用域值绑定与创建任务作用域时不同
      RejectedExecutionException - 如果线程工厂拒绝创建线程来运行任务
    • join

      public StructuredTaskScope <T > join() throws InterruptedException
      等待所有线程完成或任务范围关闭。此方法等待任务范围内启动的所有线程完成执行(任务和 handleComplete 方法),调用 shutdown 方法关闭任务范围,或者当前线程为 interrupted

      此方法只能由任务范围所有者调用。

      返回:
      此任务范围
      抛出:
      IllegalStateException - 如果此任务范围已关闭
      WrongThreadException - 如果当前线程不是所有者
      InterruptedException - 如果在等待时被打断
    • joinUntil

      public StructuredTaskScope <T > joinUntil(Instant  deadline) throws InterruptedException , TimeoutException
      等待所有线程完成或任务范围关闭,直到给定的截止日期。此方法等待任务作用域中启动的所有线程完成执行(任务和 handleComplete 方法),调用 shutdown 方法关闭任务作用域,当前线程为 interrupted ,或者达到截止日期。

      此方法只能由任务范围所有者调用。

      参数:
      deadline - 截止日期
      返回:
      此任务范围
      抛出:
      IllegalStateException - 如果此任务范围已关闭
      WrongThreadException - 如果当前线程不是所有者
      InterruptedException - 如果在等待时被打断
      TimeoutException - 如果在等待时达到截止日期
    • shutdown

      public void shutdown()
      关闭任务范围而不关闭它。关闭任务范围会阻止新线程启动,中断所有未完成的线程,并导致 join 方法唤醒。关闭对于不再需要未完成的子任务的结果的情况很有用。

      更具体地说,这个方法:

      • 取消 具有线程的任务 等待 结果使等待线程唤醒。
      • 中断 任务范围内所有未完成的线程(当前线程除外)。
      • 如果所有者正在 join() joinUntil(Instant) 中等待,则唤醒所有者。如果所有者没有等待,那么它对 joinjoinUntil 的下一次调用将立即返回。

      当此方法完成时,所有任务的 Future 对象将是 done ,正常或异常。可能仍有线程未完成,因为它们正在执行未响应(或及时响应)线程中断的代码。此方法不等待这些线程。当所有者调用 close 方法关闭任务范围时,它将等待剩余线程完成。

      此方法只能由任务范围所有者或任务范围中包含的线程调用。

      抛出:
      IllegalStateException - 如果此任务范围已关闭
      WrongThreadException - 如果当前线程不是所有者或包含在任务范围内的线程
    • close

      public void close()
      关闭此任务范围。

      此方法首先关闭任务作用域(就像通过调用 shutdown 方法一样)。然后等待执行任何未完成任务的线程完成。如果中断,则此方法将继续等待线程完成,然后再完成中断状态设置。

      此方法只能由任务范围所有者调用。如果任务范围已经关闭,则调用此方法的所有者无效。

      StructuredTaskScope 旨在用于 structured manner。如果在关闭嵌套任务作用域之前调用此方法来关闭任务作用域,则它会关闭每个嵌套任务作用域的基础构造(以与创建它们的顺序相反的顺序),关闭此任务作用域,然后抛出 StructureViolationException 。同样,如果在使用 作用域值 绑定执行时调用此方法关闭任务范围,并且任务范围是在绑定范围值之前创建的,则在关闭任务范围后抛出 StructureViolationException。如果一个线程在没有先关闭它拥有的任务范围的情况下终止,那么终止将导致其每个打开的任务范围的基础构造被关闭。关闭以创建任务范围的相反顺序执行。因此,当所有者必须等待在这些任务范围中分叉的线程完成时,线程终止可能会延迟。

      指定者:
      close 在接口 AutoCloseable
      抛出:
      IllegalStateException - 如果所有者在分叉后未调用连接,则在关闭任务范围后抛出
      WrongThreadException - 如果当前线程不是所有者
      StructureViolationException - 如果检测到结构违规