模块 java.base

类 CountedCompleter<T>

java.lang.Object
java.util.concurrent.ForkJoinTask <T>
java.util.concurrent.CountedCompleter<T>
类型参数:
T - 完成者结果的类型
所有已实现的接口:
Serializable , Future<T>

public abstract class CountedCompleter<T> extends ForkJoinTask <T>
一个 ForkJoinTask ,在触发时执行完成操作,并且没有剩余的待处理操作。与其他形式的 ForkJoinTasks 相比,CountedCompleters 通常在存在子任务停顿和阻塞时更健壮,但编程起来不太直观。 CountedCompleter 的使用类似于其他基于完成的组件(例如 CompletionHandler )的使用,除了可能需要多个 pending 完成来触发完成操作 onCompletion(CountedCompleter) ,而不仅仅是一个。除非以其他方式初始化,否则 未决计数 从零开始,但可以使用方法 setPendingCount(int) addToPendingCount(int) compareAndSetPendingCount(int, int) 进行(原子地)更改。调用 tryComplete() 时,如果未决操作计数不为零,则递减;否则,完成动作被执行,如果这个完成者本身有一个完成者,这个过程继续它的完成者。与 Phaser Semaphore 等相关同步组件的情况一样,这些方法仅影响内部计数;他们不建立任何进一步的内部簿记。特别是,未维护待处理任务的身份。如下图所示,您可以创建子类,在需要时记录一些或所有未决任务或其结果。如下图所示,还提供了支持自定义完成遍历的实用方法。但是,由于 CountedCompleters 仅提供基本的同步机制,因此创建进一步的抽象子类可能很有用,这些子类维护链接、字段和适合一组相关用法的其他支持方法。

具体的 CountedCompleter 类必须定义方法 compute() ,在大多数情况下(如下图所示),在返回之前调用 tryComplete() 一次。该类还可以选择性地覆盖方法 onCompletion(CountedCompleter) 以在正常完成时执行操作,以及方法 onExceptionalCompletion(Throwable, CountedCompleter) 以在任何异常时执行操作。

CountedCompleters 通常不承担结果,在这种情况下,它们通常被声明为 CountedCompleter<Void> ,并且将始终返回 null 作为结果值。在其他情况下,您应该覆盖方法 getRawResult() 以提供来自 join(), invoke() 和相关方法的结果。通常,此方法应返回保存完成时结果的 CountedCompleter 对象的一个字段(或一个或多个字段的函数)的值。默认情况下,方法 setRawResult(T) 在 CountedCompleters 中不起作用。重写此方法以维护保存结果数据的其他对象或字段是可能的,但很少适用。

本身没有完成器的 CountedCompleter(即 getCompleter() 返回 null 的完成器)可以用作具有此附加功能的常规 ForkJoinTask。然而,任何完成者反过来又拥有另一个完成者仅作为其他计算的内部帮助者,因此它自己的任务状态(如 ForkJoinTask.isDone() 等方法中报告的那样)是任意的;此状态仅在显式调用 complete(T) ForkJoinTask.cancel(boolean) ForkJoinTask.completeExceptionally(Throwable) 或异常完成方法 compute 时更改。在任何异常完成时,异常可能会被转发给任务的完成者(及其完成者,等等),如果存在并且它还没有完成的话。同样,取消内部 CountedCompleter 只会对该完成者产生局部影响,因此通常用处不大。

示例用法。

并行递归分解。CountedCompleters 可以排列在树中,类似于那些经常与 RecursiveAction 一起使用的树,尽管设置它们所涉及的结构通常会有所不同。在这里,每个任务的完成者是其在计算树中的父级。尽管它们需要更多的簿记,但在对数组或集合的每个元素应用可能耗时的操作(无法进一步细分)时,CountedCompleters 可能是更好的选择;尤其是当某些元素的操作完成时间与其他元素明显不同时,这可能是因为内在变化(例如 I/O)或辅助效果(例如垃圾收集)。因为 CountedCompleters 提供了自己的延续,其他任务不需要阻塞等待来执行它们。

例如,这里是实用程序方法的初始版本,它使用二分法递归分解将工作划分为单个部分(叶任务)。即使将工作拆分为单独的调用,基于树的技术通常也比直接分叉叶任务更可取,因为它们减少了线程间通信并改善了负载平衡。在递归的情况下,每对子任务中的第二个要完成的子任务会触发其父任务的完成(因为没有执行结果组合,所以方法 onCompletion 的默认无操作实现不会被覆盖)。实用程序方法设置根任务并调用它(此处,隐式使用 ForkJoinPool.commonPool() )。始终将挂起计数设置为子任务数并在返回前立即调用 tryComplete() 是直接且可靠的(但不是最优的)。

 
 public static <E> void forEach(E[] array, Consumer<E> action) {
  class Task extends CountedCompleter<Void> {
   final int lo, hi;
   Task(Task parent, int lo, int hi) {
    super(parent); this.lo = lo; this.hi = hi;
   }

   public void compute() {
    if (hi - lo >= 2) {
     int mid = (lo + hi) >>> 1;
     // must set pending count before fork
     setPendingCount(2);
     new Task(this, mid, hi).fork(); // right child
     new Task(this, lo, mid).fork(); // left child
    }
    else if (hi > lo)
     action.accept(array[lo]);
    tryComplete();
   }
  }
  new Task(null, 0, array.length).invoke();
 } 
这种设计可以通过注意在递归情况下进行改进,任务在分叉其右任务后无事可做,因此可以在返回前直接调用其左任务。 (这是尾递归移除的模拟。)此外,当任务中的最后一个动作是派生或调用子任务(“尾调用”)时,可以优化对 tryComplete() 的调用,但代价是待定计数看起来“减一”。
 
   public void compute() {
    if (hi - lo >= 2) {
     int mid = (lo + hi) >>> 1;
     setPendingCount(1); // looks off by one, but correct!
     new Task(this, mid, hi).fork(); // right child
     new Task(this, lo, mid).compute(); // direct invoke
    } else {
     if (hi > lo)
      action.accept(array[lo]);
     tryComplete();
    }
   } 
作为进一步的优化,请注意左侧任务甚至不需要存在。我们可以继续使用原始任务,并为每个分叉添加一个待处理计数,而不是创建一个新任务。此外,因为此树中没有任务实现 onCompletion(CountedCompleter) 方法,所以 tryComplete 可以替换为 propagateCompletion()
 
   public void compute() {
    int n = hi - lo;
    for (; n >= 2; n /= 2) {
     addToPendingCount(1);
     new Task(this, lo + n/2, lo + n).fork();
    }
    if (n > 0)
     action.accept(array[lo]);
    propagateCompletion();
   } 
当可以预先计算未决计数时,可以在构造函数中建立它们:
 
 public static <E> void forEach(E[] array, Consumer<E> action) {
  class Task extends CountedCompleter<Void> {
   final int lo, hi;
   Task(Task parent, int lo, int hi) {
    super(parent, 31 - Integer.numberOfLeadingZeros(hi - lo));
    this.lo = lo; this.hi = hi;
   }

   public void compute() {
    for (int n = hi - lo; n >= 2; n /= 2)
     new Task(this, lo + n/2, lo + n).fork();
    action.accept(array[lo]);
    propagateCompletion();
   }
  }
  if (array.length > 0)
   new Task(null, 0, array.length).invoke();
 } 
此类类的其他优化可能需要为叶步骤专门化类,细分为四个,而不是每次迭代两个,并使用自适应阈值而不是总是细分为单个元素。

搜索。CountedCompleters 树可以在数据结构的不同部分搜索值或属性,并在找到后立即在 AtomicReference 中报告结果。其他人可以轮询结果以避免不必要的工作。 (您还可以 cancel 其他任务,但让他们注意到结果已设置并且如果已设置则跳过进一步处理通常更简单和更有效。)再次使用使用完整分区的数组进行说明(同样,在实践中,叶任务几乎总是会处理不止一个元素):

 
 class Searcher<E> extends CountedCompleter<E> {
  final E[] array; final AtomicReference<E> result; final int lo, hi;
  Searcher(CountedCompleter<?> p, E[] array, AtomicReference<E> result, int lo, int hi) {
   super(p);
   this.array = array; this.result = result; this.lo = lo; this.hi = hi;
  }
  public E getRawResult() { return result.get(); }
  public void compute() { // similar to ForEach version 3
   int l = lo, h = hi;
   while (result.get() == null && h >= l) {
    if (h - l >= 2) {
     int mid = (l + h) >>> 1;
     addToPendingCount(1);
     new Searcher(this, array, result, mid, h).fork();
     h = mid;
    }
    else {
     E x = array[l];
     if (matches(x) && result.compareAndSet(null, x))
      quietlyCompleteRoot(); // root task is now joinable
     break;
    }
   }
   tryComplete(); // normally complete whether or not found
  }
  boolean matches(E e) { ... } // return true if found

  public static <E> E search(E[] array) {
    return new Searcher<E>(null, array, new AtomicReference<E>(), 0, array.length).invoke();
  }
 } 
在这个例子中,以及其他任务除了对 compareAndSet 没有其他影响之外的共同结果,tryComplete 的尾随无条件调用可以有条件(if (result.get() == null) tryComplete(); )因为一旦根任务完成就不需要进一步的簿记来管理完成.

记录子任务。组合多个子任务结果的 CountedCompleter 任务通常需要在方法 onCompletion(CountedCompleter) 中访问这些结果。如以下类所示(执行简化形式的 map-reduce,其中映射和归约都是 E 类型),在分而治之设计中执行此操作的一种方法是让每个子任务记录其兄弟,以便它可以在方法 onCompletion 中访问。这种技术适用于组合左右结果的顺序无关紧要的归约;有序减少需要明确的左/右指定。上述示例中看到的其他流线型的变体也可能适用。

 
 class MyMapper<E> { E apply(E v) { ... } }
 class MyReducer<E> { E apply(E x, E y) { ... } }
 class MapReducer<E> extends CountedCompleter<E> {
  final E[] array; final MyMapper<E> mapper;
  final MyReducer<E> reducer; final int lo, hi;
  MapReducer<E> sibling;
  E result;
  MapReducer(CountedCompleter<?> p, E[] array, MyMapper<E> mapper,
       MyReducer<E> reducer, int lo, int hi) {
   super(p);
   this.array = array; this.mapper = mapper;
   this.reducer = reducer; this.lo = lo; this.hi = hi;
  }
  public void compute() {
   if (hi - lo >= 2) {
    int mid = (lo + hi) >>> 1;
    MapReducer<E> left = new MapReducer(this, array, mapper, reducer, lo, mid);
    MapReducer<E> right = new MapReducer(this, array, mapper, reducer, mid, hi);
    left.sibling = right;
    right.sibling = left;
    setPendingCount(1); // only right is pending
    right.fork();
    left.compute();   // directly execute left
   }
   else {
    if (hi > lo)
      result = mapper.apply(array[lo]);
    tryComplete();
   }
  }
  public void onCompletion(CountedCompleter<?> caller) {
   if (caller != this) {
    MapReducer<E> child = (MapReducer<E>)caller;
    MapReducer<E> sib = child.sibling;
    if (sib == null || sib.result == null)
     result = child.result;
    else
     result = reducer.apply(child.result, sib.result);
   }
  }
  public E getRawResult() { return result; }

  public static <E> E mapReduce(E[] array, MyMapper<E> mapper, MyReducer<E> reducer) {
   return new MapReducer<E>(null, array, mapper, reducer,
               0, array.length).invoke();
  }
 } 
此处,方法 onCompletion 采用许多结合结果的完成设计所共有的形式。在挂起计数为零或变为零的两个不同上下文中,每个任务触发一次此回调样式方法:(1) 由任务本身触发,如果在调用 tryComplete 时其挂起计数为零,或(2) 在其任何子任务完成时将待处理计数递减为零。 caller 参数区分大小写。大多数情况下,当调用者是 this 时,无需执行任何操作。否则,可以使用调用者参数(通常通过强制转换)来提供要组合的值(和/或指向其他值的链接)。假设正确使用挂起计数,onCompletion 内的操作会在任务及其子任务完成时发生(一次)。此方法中不需要额外的同步来确保访问此任务或其他已完成任务的字段的线程安全。

完成遍历。如果使用 onCompletion 处理完成不适用或不方便,您可以使用方法 firstComplete() nextComplete() 创建自定义遍历。例如,要定义一个仅以第三个 ForEach 示例的形式拆分右侧任务的 MapReducer,完成项必须沿着未用尽的子任务链接协作减少,这可以按如下方式完成:

 
 class MapReducer<E> extends CountedCompleter<E> { // version 2
  final E[] array; final MyMapper<E> mapper;
  final MyReducer<E> reducer; final int lo, hi;
  MapReducer<E> forks, next; // record subtask forks in list
  E result;
  MapReducer(CountedCompleter<?> p, E[] array, MyMapper<E> mapper,
       MyReducer<E> reducer, int lo, int hi, MapReducer<E> next) {
   super(p);
   this.array = array; this.mapper = mapper;
   this.reducer = reducer; this.lo = lo; this.hi = hi;
   this.next = next;
  }
  public void compute() {
   int l = lo, h = hi;
   while (h - l >= 2) {
    int mid = (l + h) >>> 1;
    addToPendingCount(1);
    (forks = new MapReducer(this, array, mapper, reducer, mid, h, forks)).fork();
    h = mid;
   }
   if (h > l)
    result = mapper.apply(array[l]);
   // process completions by reducing along and advancing subtask links
   for (CountedCompleter<?> c = firstComplete(); c != null; c = c.nextComplete()) {
    for (MapReducer t = (MapReducer)c, s = t.forks; s != null; s = t.forks = s.next)
     t.result = reducer.apply(t.result, s.result);
   }
  }
  public E getRawResult() { return result; }

  public static <E> E mapReduce(E[] array, MyMapper<E> mapper, MyReducer<E> reducer) {
   return new MapReducer<E>(null, array, mapper, reducer,
               0, array.length, null).invoke();
  }
 } 

触发器。一些 CountedCompleters 本身从不分叉,而是在其他设计中充当一些管道;包括那些完成一个或多个异步任务会触发另一个异步任务的任务。例如:

 
 class HeaderBuilder extends CountedCompleter<...> { ... }
 class BodyBuilder extends CountedCompleter<...> { ... }
 class PacketSender extends CountedCompleter<...> {
  PacketSender(...) { super(null, 1); ... } // trigger on second completion
  public void compute() { } // never called
  public void onCompletion(CountedCompleter<?> caller) { sendPacket(); }
 }
 // sample use:
 PacketSender p = new PacketSender();
 new HeaderBuilder(p, ...).fork();
 new BodyBuilder(p, ...).fork(); 
自从:
1.8
参见:
  • 构造方法详细信息

    • CountedCompleter

      protected CountedCompleter(CountedCompleter <?> completer, int initialPendingCount)
      使用给定的完成者和初始未决计数创建一个新的 CountedCompleter。
      参数:
      completer - 此任务的完成者,如果没有则为 null
      initialPendingCount - 初始未决计数
    • CountedCompleter

      protected CountedCompleter(CountedCompleter <?> completer)
      创建一个新的 CountedCompleter,它具有给定的完成者和初始未决计数为零。
      参数:
      completer - 此任务的完成者,如果没有则为 null
    • CountedCompleter

      protected CountedCompleter()
      创建一个没有完成者且初始未决计数为零的新 CountedCompleter。
  • 方法详情

    • compute

      public abstract void compute()
      此任务执行的主要计算。
    • onCompletion

      public void onCompletion(CountedCompleter <?> caller)
      在调用方法 tryComplete() 并且挂起计数为零时,或者在调用无条件方法 complete(T) 时执行操作。默认情况下,此方法不执行任何操作。您可以通过检查给定调用方参数的身份来区分情况。如果不等于 this ,那么它通常是一个子任务,可能包含要组合的结果(和/或指向其他结果的链接)。
      参数:
      caller - 调用此方法的任务(可能是此任务本身)
    • onExceptionalCompletion

      public boolean onExceptionalCompletion(Throwable  ex, CountedCompleter <?> caller)
      当调用方法 ForkJoinTask.completeExceptionally(Throwable) 或方法 compute() 抛出异常时执行操作,并且此任务尚未正常完成。在进入此方法时,此任务 ForkJoinTask.isCompletedAbnormally() 。此方法的返回值控制进一步的传播:如果true和此任务有一个尚未完成的完成者,则该完成者也异常完成,与此完成者具有相同的异常。此方法的默认实现除了 return true 外什么都不做。
      参数:
      ex - 异常
      caller - 调用此方法的任务(可能是此任务本身)
      返回:
      true 如果这个异常应该传播到这个任务的完成者,如果存在的话
    • getCompleter

      public final CountedCompleter <?> getCompleter()
      返回在此任务的构造函数中建立的完成者,如果没有则返回 null
      返回:
      完成者
    • getPendingCount

      public final int getPendingCount()
      返回当前挂起的计数。
      返回:
      当前未决计数
    • setPendingCount

      public final void setPendingCount(int count)
      将挂起计数设置为给定值。
      参数:
      count - 计数
    • addToPendingCount

      public final void addToPendingCount(int delta)
      将给定值(原子地)添加到待定计数。
      参数:
      delta - 要添加的值
    • compareAndSetPendingCount

      public final boolean compareAndSetPendingCount(int expected, int count)
      仅当当前持有给定的预期值时,才将挂起的计数(原子地)设置为给定的计数。
      参数:
      expected - 期望值
      count - 新值
      返回:
      true 如果成功
    • decrementPendingCountUnlessZero

      public final int decrementPendingCountUnlessZero()
      如果挂起计数不为零,则(原子地)递减它。
      返回:
      进入此方法时保持的初始(未递减)挂起计数
    • getRoot

      public final CountedCompleter <?> getRoot()
      返回当前计算的根;即,如果这个任务没有完成者,则它是完成者的根。
      返回:
      当前计算的根
    • tryComplete

      public final void tryComplete()
      如果挂起计数不为零,则递减计数;否则调用 onCompletion(CountedCompleter) 然后类似地尝试完成此任务的完成者,如果存在,则将此任务标记为完成。
    • propagateCompletion

      public final void propagateCompletion()
      等同于 tryComplete() 但不沿完成路径调用 onCompletion(CountedCompleter) :如果挂起计数非零,则递减计数;否则,类似地尝试完成此任务的完成者,如果存在,则将此任务标记为已完成。在不应或不需要为计算中的每个完成者调用 onCompletion 的情况下,此方法可能很有用。
    • complete

      public void complete(T  rawResult)
      无论挂起计数如何,调用 onCompletion(CountedCompleter) ,将此任务标记为完成并进一步触发 tryComplete() 此任务的完成者(如果存在)。在调用 onCompletion(CountedCompleter) 或将此任务标记为完成之前,给定的 rawResult 用作 setRawResult(T) 的参数;它的值仅对覆盖 setRawResult 的类有意义。此方法不会修改挂起计数。

      一旦获得多个子任务结果中的任何一个(相对于全部)就强制完成时,此方法可能很有用。但是,在未覆盖 setRawResult 的常见(和推荐)情况下,可以使用 quietlyCompleteRoot() 更简单地获得此效果。

      重写:
      complete 在类 ForkJoinTask<T>
      参数:
      rawResult - 原始结果
    • firstComplete

      public final CountedCompleter <?> firstComplete()
      如果此任务的待处理计数为零,则返回此任务;否则递减其未决计数并返回 null 。此方法旨在在完成遍历循环中与 nextComplete() 一起使用。
      返回:
      这个任务,如果挂起计数为零,否则null
    • nextComplete

      public final CountedCompleter <?> nextComplete()
      如果此任务没有完成者,则调用 ForkJoinTask.quietlyComplete() 并返回 null。或者,如果完成者的待定计数不为零,则递减该待定计数并返回 null 。否则,返回完成者。此方法可用作同类任务层次结构的完成遍历循环的一部分:
       
       for (CountedCompleter<?> c = firstComplete();
         c != null;
         c = c.nextComplete()) {
        // ... process c ...
       } 
      返回:
      完成者,或者 null 如果没有
    • quietlyCompleteRoot

      public final void quietlyCompleteRoot()
      相当于 getRoot().quietlyComplete()
    • helpComplete

      public final void helpComplete(int maxTasks)
      如果此任务尚未完成,则尝试处理至多给定数量的其他未处理任务(如果已知存在的话)。
      参数:
      maxTasks - 要处理的最大任务数。如果小于或等于零,则不处理任何任务。
    • exec

      protected final boolean exec()
      实现 CountedCompleters 的执行约定。
      指定者:
      exec 在类 ForkJoinTask<T>
      返回:
      true 如果已知此任务已正常完成
    • getRawResult

      public T  getRawResult()
      返回计算结果。默认情况下,返回 null ,这适用于 Void 操作,但在其他情况下应该被覆盖,几乎总是返回一个字段或字段的函数,该字段在完成时保存结果。
      指定者:
      getRawResult 在类 ForkJoinTask<T>
      返回:
      计算结果
    • setRawResult

      protected void setRawResult(T  t)
      一种方法,承载结果的 CountedCompleters 可以选择使用它来帮助维护结果数据。默认情况下,不执行任何操作。不建议重写。但是,如果重写此方法以更新现有对象或字段,则通常必须将其定义为线程安全的。
      指定者:
      setRawResult 在类 ForkJoinTask<T>
      参数:
      t - 值