模块 java.base

类 Phaser

java.lang.Object
java.util.concurrent.Phaser

public class Phaser extends Object
一个可重用的同步屏障,在功能上类似于 CyclicBarrier CountDownLatch 但支持更灵活的使用。

登记。与其他障碍的情况不同,在移相器上同步的参与方数量registered可能会随时间变化。任务可以随时注册(使用方法 register() bulkRegister(int) 或建立初始参与方数量的构造函数形式),并可选择在任何到达时注销(使用 arriveAndDeregister() )。与大多数基本同步结构一样,注册和注销仅影响内部计数;它们不建立任何进一步的内部簿记,因此任务无法查询它们是否已注册。 (但是,您可以通过继承此类来引入此类簿记。)

同步。CyclicBarrier 一样,可以重复等待 Phaser。方法 arriveAndAwaitAdvance() 的效果类似于 CyclicBarrier.await 。每一代移相器都有一个相关联的相位编号。阶段编号从零开始,并在所有参与方到达移相器时递增,在到达 Integer.MAX_VALUE 后返回到零。阶段编号的使用可以通过任何注册方可以调用的两种方法,在到达阶段器和等待其他阶段时独立控制操作:

  • 到达。方法arrive() arriveAndDeregister() 记录到达。这些方法不会阻塞,但会返回关联的 arrival phase number ;也就是说,到达应用的相位器的相位编号。当给定阶段的最后一方到达时,执行可选操作并且阶段前进。这些动作由触发阶段提前的一方执行,并由覆盖方法 onAdvance(int, int) 安排,该方法也控制终止。覆盖此方法类似于,但比向 CyclicBarrier 提供屏障操作更灵活。
  • 等待。方法 awaitAdvance(int) 需要一个指示到达阶段编号的参数,并在相位器前进到(或已经处于)不同阶段时返回。与使用 CyclicBarrier 的类似构造不同,即使等待线程被中断,方法 awaitAdvance 也会继续等待。可中断和超时版本也可用,但在任务等待中断或超时时遇到的异常不会改变移相器的状态。如有必要,您可以在这些异常的处理程序中执行任何关联的恢复,通常是在调用 forceTermination 之后。在 ForkJoinPool 中执行的任务也可以使用移相器。如果池的并行级别可以容纳最大数量的同时被阻止的各方,则可以确保进展。

终止。移相器可能会进入 termination 状态,可以使用方法 isTerminated() 进行检查。终止后,所有同步方法立即返回而不等待提前,如负返回值所示。同样,尝试在终止时注册也没有效果。当 onAdvance 的调用返回 true 时触发终止。如果注销导致注册方的数量变为零,则默认实现返回 true。如下图所示,当相位器控制具有固定迭代次数的动作时,通常可以方便地覆盖此方法以在当前相位数达到阈值时导致终止。方法 forceTermination() 也可用于突然释放等待线程并允许它们终止。

分层。移相器可以是tiered(即,以树结构构建)以减少争用。具有大量参与方的移相器可能会被设置为让多组子移相器共享一个共同的父代,否则这些移相器将经历大量的同步争用成本。这可能会大大增加吞吐量,即使它会导致每个操作的开销更大。

在分层移相器树中,自动管理子移相器与其父移相器的注册和注销。每当子移相器的注册方数量变为非零时(在 Phaser(Phaser,int) 构造函数、register() bulkRegister(int) 中建立),子移相器就会向其父移相器注册。每当注册方的数量因调用 arriveAndDeregister() 而变为零时,子移相器就会从其父移相器中注销。

监控。虽然同步方法只能由注册方调用,但任何调用者都可以监视相位器的当前状态。在任何给定时刻,总共有 getRegisteredParties() 方,其中 getArrivedParties() 已经到达当前阶段 (getPhase() )。当剩余的 (getUnarrivedParties() ) 方到达时,阶段前进。这些方法返回的值可能反映瞬态,因此通常对同步控制没有用。方法 toString() 以方便非正式监控的形式返回这些状态查询的快照。

内存一致性影响:任何形式的到达方法之前的操作 发生在之前相应的阶段提前和 onAdvance 操作(如果存在),这反过来发生在之前阶段推进后的行动。

示例用法:

可以使用 Phaser 代替 CountDownLatch 来控制服务于可变数量的参与方的一次性操作。典型的习惯用法是将此设置为首先注册的方法,然后启动所有操作,然后取消注册,如下所示:

 
 void runTasks(List<Runnable> tasks) {
  Phaser startingGate = new Phaser(1); // "1" to register self
  // create and start threads
  for (Runnable task : tasks) {
   startingGate.register();
   new Thread(() -> {
    startingGate.arriveAndAwaitAdvance();
    task.run();
   }).start();
  }

  // deregister self to allow threads to proceed
  startingGate.arriveAndDeregister();
 } 

使一组线程针对给定的迭代次数重复执行操作的一种方法是覆盖 onAdvance

 
 void startTasks(List<Runnable> tasks, int iterations) {
  Phaser phaser = new Phaser() {
   protected boolean onAdvance(int phase, int registeredParties) {
    return phase >= iterations - 1 || registeredParties == 0;
   }
  };
  phaser.register();
  for (Runnable task : tasks) {
   phaser.register();
   new Thread(() -> {
    do {
     task.run();
     phaser.arriveAndAwaitAdvance();
    } while (!phaser.isTerminated());
   }).start();
  }
  // allow threads to proceed; don't wait for them
  phaser.arriveAndDeregister();
 } 
如果主任务稍后必须等待终止,它可能会重新注册然后执行类似的循环:
 
  // ...
  phaser.register();
  while (!phaser.isTerminated())
   phaser.arriveAndAwaitAdvance(); 

在您确定阶段永远不会绕过 Integer.MAX_VALUE 的上下文中,相关结构可用于等待特定阶段编号。例如:

 
 void awaitPhase(Phaser phaser, int phase) {
  int p = phaser.register(); // assumes caller not already registered
  while (p < phase) {
   if (phaser.isTerminated())
    // ... deal with unexpected termination
   else
    p = phaser.arriveAndAwaitAdvance();
  }
  phaser.arriveAndDeregister();
 } 

要使用移相器树创建一组 n 任务,您可以使用以下形式的代码,假设一个 Task 类的构造函数接受它在构造时注册的 Phaser。在调用 build(new Task[n], 0, n, new Phaser()) 之后,这些任务就可以开始了,例如通过提交到一个池:

 
 void build(Task[] tasks, int lo, int hi, Phaser ph) {
  if (hi - lo > TASKS_PER_PHASER) {
   for (int i = lo; i < hi; i += TASKS_PER_PHASER) {
    int j = Math.min(i + TASKS_PER_PHASER, hi);
    build(tasks, i, j, new Phaser(ph));
   }
  } else {
   for (int i = lo; i < hi; ++i)
    tasks[i] = new Task(ph);
    // assumes new Task(ph) performs ph.register()
  }
 } 
TASKS_PER_PHASER 的最佳值主要取决于预期的同步率。低至 4 的值可能适用于极小的每阶段任务主体(因此速率很高),或者高达数百适用于极大的任务主体。

实现说明:此实现将参与方的最大数量限制为 65535。尝试注册其他参与方会导致 IllegalStateException。但是,您可以而且应该创建分层移相器来容纳任意多的参与者。

自从:
1.7
  • 构造方法总结

    构造方法
    构造方法
    描述
    创建一个没有初始注册方、没有父级且初始阶段编号为 0 的新移相器。
    Phaser(int parties)
    创建一个新的移相器,具有给定数量的已注册未到达方,没有父级,初始阶段号为 0。
    Phaser(Phaser parent)
    相当于 Phaser(parent, 0)
    Phaser(Phaser parent, int parties)
    使用给定的父级和已注册未到达方的数量创建一个新的移相器。
  • 方法总结

    修饰符和类型
    方法
    描述
    int
    到达此移相器,无需等待其他人到达。
    int
    到达此移相器并等待其他人。
    int
    到达这个移相器并从中注销而不等待其他人到达。
    int
    awaitAdvance(int phase)
    等待此移相器的相位从给定的相位值前进,如果当前相位不等于给定的相位值或此移相器终止,则立即返回。
    int
    等待此移相器的相位从给定的相位值前进,如果在等待期间被中断则抛出InterruptedException,或者如果当前相位不等于给定的相位值或此移相器终止则立即返回。
    int
    awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit)
    等待此移相器的相位从给定的相位值前进或给定的超时时间过去,如果在等待时被中断则抛出 InterruptedException,或者如果当前相位不等于给定的相位值或此移相器终止则立即返回。
    int
    bulkRegister(int parties)
    将给定数量的新未到达方添加到此移相器。
    void
    强制此移相器进入终止状态。
    int
    返回已到达此移相器当前阶段的注册方数量。
    返回此移相器的父级,如果没有则返回 null
    final int
    返回当前阶段编号。
    int
    返回在此移相器上注册的参与方数量。
    返回此移相器的根祖先,如果它没有父级,则与此移相器相同。
    int
    返回尚未到达此移相器当前阶段的注册方的数量。
    boolean
    如果此移相器已终止,则返回 true
    protected boolean
    onAdvance(int phase, int registeredParties)
    在即将到来的阶段提前时执行操作并控制终止的可重写方法。
    int
    向此移相器添加一个新的未到达方。
    返回标识此移相器及其状态的字符串。

    在类 java.lang.Object 中声明的方法

    clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
  • 构造方法详细信息

    • Phaser

      public Phaser()
      创建一个没有初始注册方、没有父级和初始阶段编号 0 的新移相器。使用此移相器的任何线程都需要先注册它。
    • Phaser

      public Phaser(int parties)
      创建一个新的移相器,具有给定数量的已注册未到达方,没有父级,初始阶段号为 0。
      参数:
      parties - 进入下一阶段所需的参与方数量
      抛出:
      IllegalArgumentException - 如果参与方小于零或大于支持的最大参与方数量
    • Phaser

      public Phaser(Phaser  parent)
      相当于 Phaser(parent, 0)
      参数:
      parent - 父移相器
    • Phaser

      public Phaser(Phaser  parent, int parties)
      使用给定的父级和已注册未到达方的数量创建一个新的移相器。当给定的父项不为空且给定的参与方数量大于零时,此子移相器将向其父项注册。
      参数:
      parent - 父移相器
      parties - 进入下一阶段所需的参与方数量
      抛出:
      IllegalArgumentException - 如果参与方小于零或大于支持的最大参与方数量
  • 方法详情

    • register

      public int register()
      向此移相器添加一个新的未到达方。如果正在进行对 onAdvance(int, int) 的调用,则此方法可能会在返回之前等待其完成。如果这个移相器有一个父对象,并且这个移相器之前没有注册方,那么这个子移相器也会向它的父对象注册。如果此移相器终止,则注册尝试无效,并返回负值。
      返回:
      此注册所应用的到达阶段编号。如果此值为负,则此移相器已终止,在这种情况下注册无效。
      抛出:
      IllegalStateException - 如果尝试注册超过支持的最大参与方数量
    • bulkRegister

      public int bulkRegister(int parties)
      将给定数量的新未到达方添加到此移相器。如果正在进行对 onAdvance(int, int) 的调用,则此方法可能会在返回之前等待其完成。如果这个移相器有一个父代,并且给定的参与方数量大于零,并且这个移相器之前没有注册过的派对,那么这个子代移相器也会向它的父代注册。如果此移相器终止,则注册尝试无效,并返回负值。
      参数:
      parties - 进入下一阶段所需的附加方数量
      返回:
      此注册所应用的到达阶段编号。如果此值为负,则此移相器已终止,在这种情况下注册无效。
      抛出:
      IllegalStateException - 如果尝试注册超过支持的最大参与方数量
      IllegalArgumentException - 如果 parties < 0
    • arrive

      public int arrive()
      到达此移相器,无需等待其他人到达。

      未注册方调用该方法属于使用错误。但是,此错误可能会导致 IllegalStateException 仅在此移相器上进行某些后续操作(如果有的话)。

      返回:
      到达阶段编号,如果终止则为负值
      抛出:
      IllegalStateException - 如果不终止,未到达方的数量将变为负数
    • arriveAndDeregister

      public int arriveAndDeregister()
      到达这个移相器并从中注销而不等待其他人到达。取消注册减少了在未来阶段推进所需的各方数量。如果此移相器有父级,并且注销导致此移相器有零方,则此移相器也从其父级注销。

      未注册方调用该方法属于使用错误。但是,此错误可能会导致 IllegalStateException 仅在此移相器上进行某些后续操作(如果有的话)。

      返回:
      到达阶段编号,如果终止则为负值
      抛出:
      IllegalStateException - 如果不终止,注册或未到达的人数将变为负数
    • arriveAndAwaitAdvance

      public int arriveAndAwaitAdvance()
      到达此移相器并等待其他人。等效于 awaitAdvance(arrive()) 。如果您需要等待中断或超时,您可以使用 awaitAdvance 方法的其他形式之一通过类似的构造来安排它。相反,如果您需要在抵达时注销,请使用 awaitAdvance(arriveAndDeregister())

      未注册方调用该方法属于使用错误。但是,此错误可能会导致 IllegalStateException 仅在此移相器上进行某些后续操作(如果有的话)。

      返回:
      到达阶段编号,或(负)当前阶段如果终止
      抛出:
      IllegalStateException - 如果不终止,未到达方的数量将变为负数
    • awaitAdvance

      public int awaitAdvance(int phase)
      等待此移相器的相位从给定的相位值前进,如果当前相位不等于给定的相位值或此移相器终止,则立即返回。
      参数:
      phase - 到达阶段编号,如果终止则为负值;此参数通常是先前调用 arrivearriveAndDeregister 返回的值。
      返回:
      下一个到达阶段编号,如果它是负数,则为参数,如果终止,则为(负数)当前阶段
    • awaitAdvanceInterruptibly

      public int awaitAdvanceInterruptibly(int phase) throws InterruptedException
      等待此移相器的相位从给定的相位值前进,如果在等待期间被中断则抛出InterruptedException,或者如果当前相位不等于给定的相位值或此移相器终止则立即返回。
      参数:
      phase - 到达阶段编号,如果终止则为负值;此参数通常是先前调用 arrivearriveAndDeregister 返回的值。
      返回:
      下一个到达阶段编号,如果它是负数,则为参数,如果终止,则为(负数)当前阶段
      抛出:
      InterruptedException - 如果线程在等待时中断
    • awaitAdvanceInterruptibly

      public int awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit  unit) throws InterruptedException , TimeoutException
      等待此移相器的相位从给定的相位值前进或给定的超时时间过去,如果在等待时被中断则抛出 InterruptedException,或者如果当前相位不等于给定的相位值或此移相器终止则立即返回。
      参数:
      phase - 到达阶段编号,如果终止则为负值;此参数通常是先前调用 arrivearriveAndDeregister 返回的值。
      timeout - 放弃前等待多长时间,以 unit 为单位
      unit - 一个 TimeUnit 决定如何解释 timeout 参数
      返回:
      下一个到达阶段编号,如果它是负数,则为参数,如果终止,则为(负数)当前阶段
      抛出:
      InterruptedException - 如果线程在等待时中断
      TimeoutException - 如果等待时超时
    • forceTermination

      public void forceTermination()
      强制此移相器进入终止状态。注册方的计数不受影响。如果此移相器是分层移相器组的成员,则终止该组中的所有移相器。如果此移相器已终止,则此方法无效。此方法可用于在一个或多个任务遇到意外异常后协调恢复。
    • getPhase

      public final int getPhase()
      返回当前阶段编号。最大相数为 Integer.MAX_VALUE ,之后它从零重新开始。终止时,阶段编号为负,在这种情况下,终止前的主要阶段可以通过 getPhase() + Integer.MIN_VALUE 获得。
      返回:
      阶段编号,如果终止则为负值
    • getRegisteredParties

      public int getRegisteredParties()
      返回在此移相器上注册的参与方数量。
      返回:
      当事人数量
    • getArrivedParties

      public int getArrivedParties()
      返回已到达此移相器当前阶段的注册方数量。如果此移相器已终止,则返回值是无意义且任意的。
      返回:
      到场人数
    • getUnarrivedParties

      public int getUnarrivedParties()
      返回尚未到达此移相器当前阶段的注册方的数量。如果此移相器已终止,则返回值是无意义且任意的。
      返回:
      未到场人数
    • getParent

      public Phaser  getParent()
      返回此移相器的父级,如果没有则返回 null
      返回:
      此移相器的父级,如果没有则为 null
    • getRoot

      public Phaser  getRoot()
      返回此移相器的根祖先,如果它没有父级,则与此移相器相同。
      返回:
      这个移相器的根祖先
    • isTerminated

      public boolean isTerminated()
      如果此移相器已终止,则返回 true
      返回:
      true 如果这个移相器已经终止
    • onAdvance

      protected boolean onAdvance(int phase, int registeredParties)
      在即将到来的阶段提前时执行操作并控制终止的可重写方法。当推进此移相器的一方到达时调用此方法(当所有其他等待方都处于休眠状态时)。如果此方法返回 true ,则此移相器将在前进时设置为最终终止状态,随后对 isTerminated() 的调用将返回 true。调用此方法抛出的任何(未经检查的)异常或错误都会传播到尝试推进此移相器的一方,在这种情况下不会发生推进。

      此方法的参数提供当前过渡的移相器状态。从 onAdvance 中调用此移相器的到达、注册和等待方法的效果未指定,不应依赖。

      如果此相位器是一组分层相位器的成员,则每次前进时仅为其根相位器调用 onAdvance

      为了支持最常见的用例,此方法的默认实现会在注册方的数量因一方调用 arriveAndDeregister 而变为零时返回 true。您可以通过覆盖此方法以始终返回 false 来禁用此行为,从而在以后的注册中启用继续:

       
       Phaser phaser = new Phaser() {
        protected boolean onAdvance(int phase, int parties) { return false; }
       }; 
      参数:
      phase - 在此相位器前进之前进入此方法的当前阶段编号
      registeredParties - 当前注册方数量
      返回:
      true 如果这个移相器应该终止
    • toString

      public String  toString()
      返回标识此移相器及其状态的字符串。括号中的状态包括字符串 "phase = " 后跟阶段编号,"parties = " 后跟注册方数量, "arrived = " 后跟到达方数量。
      重写:
      toString 在类 Object
      返回:
      标识此移相器及其状态的字符串