- java.lang.Object
-
- java.util.concurrent.Phaser
-
public class Phaser extends Object
可重复使用的同步屏障,功能类似于CyclicBarrier
和CountDownLatch
,但支持更灵活的使用。注册。 与其他障碍的情况不同, 登记在移相器上同步的各方数量可能会随时间而变化。 任务可以在任何时间(使用的方法来注册
register()
,bulkRegister(int)
,或构造建立各方的初始数的形式),和(使用任何抵达时任选注销arriveAndDeregister()
)。 与大多数基本同步结构一样,注册和注销仅影响内部计数; 他们没有建立任何进一步的内部簿记,因此任务无法查询他们是否已注册。 (但是,你可以通过继承这个类来引入这样的簿记。)同步。 像
CyclicBarrier
一样,可能会一再等待Phaser
。 方法arriveAndAwaitAdvance()
具有类似于CyclicBarrier.await
的效果。 每一代移相器都有一个相关的相位数。 阶段编号从零开始,并在所有各方到达移相器时前进,在达到Integer.MAX_VALUE
后Integer.MAX_VALUE
零。 使用阶段编号可以在到达移相器时以及在等待其他人时通过可由任何注册方调用的两种方法独立控制操作:- 到达。 方法
arrive()
和arriveAndDeregister()
记录到货。 这些方法不会阻止,但返回相关的到达阶段号 ; 也就是说,到达所应用的相位器的相数。 当给定阶段的最后一方到达时,执行可选动作并且阶段前进。 这些动作由触发相位超前的一方执行,并且通过重写方法onAdvance(int, int)
来安排,该方法还控制终止。 覆盖此方法与为CyclicBarrier
提供屏障操作类似,但更灵活。 - 等候。 方法
awaitAdvance(int)
需要指示到达阶段号的参数,并且当相位器前进到(或已经在)不同阶段时返回。 与使用CyclicBarrier
类似结构不同,方法awaitAdvance
继续等待,即使等待线程被中断。 可以使用可中断和超时版本,但在任务中断或超时等待时遇到的异常不会更改移相器的状态。 如有必要,您可以在调用forceTermination
之后,在这些异常的处理程序中执行任何相关的恢复。 在ForkJoinPool
中执行的任务也可以使用相位器 。 如果池的parallelismLevel可以容纳最大数量的同时阻塞方,则可确保进度。
终止。 移相器可以进入终止状态,可以使用方法
isTerminated()
进行检查。 在终止时,所有同步方法立即返回而不等待提前,如负返回值所示。 同样,终止时注册的尝试也没有效果。 调用onAdvance
返回true
时触发终止。 如果取消注册导致注册方的数量变为零,则默认实现返回true
。 如下所示,当相位器控制具有固定迭代次数的动作时,通常很方便地覆盖该方法以在当前相位数达到阈值时引起终止。 方法forceTermination()
也可用于突然释放等待线程并允许它们终止。分层。 可以对移相器进行分层 (即,以树结构构造)以减少争用。 然而,可以设置具有大量方的否则会经历大量同步争用成本的相位器,使得子组相位器共享共同的父级。 即使它产生更大的每操作开销,这也可以大大增加吞吐量。
在分层阶段的树中,自动管理子阶段与其父母的注册和注销。 只要子移相器的注册方数量变为非零(如
Phaser(Phaser,int)
构造函数register()
或bulkRegister(int)
中所述 ),子移相器就会向其父移植器注册。 每当注册方的数量因调用arriveAndDeregister()
而变为零时,子移相器将从其父级取消注册。监测。 虽然同步方法可以仅由注册方调用,但是相位器的当前状态可以由任何呼叫者监视。 在任何特定时刻共有
getRegisteredParties()
方,其中getArrivedParties()
已到达当前阶段(getPhase()
)。 当剩余的(getUnarrivedParties()
)方到达时,阶段进展。 这些方法返回的值可能反映瞬态,因此通常对同步控制无用。 方法toString()
以便于非正式监视的形式返回这些状态查询的快照。示例用法:
可以使用
Phaser
代替CountDownLatch
来控制为可变数量的方提供服务的一次性动作。 典型的习惯用法是将方法设置为首次注册,然后启动所有操作,然后取消注册,如下所示:void runTasks(List<Runnable> tasks) { Phaser startingGate = new Phaser(1); // "1" to register self // create and start threads for (Runnable task : tasks) { startingGate.register(); new Thread(() -> { startingGate.arriveAndAwaitAdvance(); task.run(); }).start(); } // deregister self to allow threads to proceed startingGate.arriveAndDeregister(); }
使一组线程重复执行给定迭代次数的操作的一种方法是覆盖
onAdvance
:void startTasks(List<Runnable> tasks, int iterations) { Phaser phaser = new Phaser() { protected boolean onAdvance(int phase, int registeredParties) { return phase >= iterations - 1 || registeredParties == 0; } }; phaser.register(); for (Runnable task : tasks) { phaser.register(); new Thread(() -> { do { task.run(); phaser.arriveAndAwaitAdvance(); } while (!phaser.isTerminated()); }).start(); } // allow threads to proceed; don't wait for them phaser.arriveAndDeregister(); }
// ... phaser.register(); while (!phaser.isTerminated()) phaser.arriveAndAwaitAdvance();
相关结构可用于等待上下文中的特定阶段编号,其中您确定阶段永远不会包围
Integer.MAX_VALUE
。 例如:void awaitPhase(Phaser phaser, int phase) { int p = phaser.register(); // assumes caller not already registered while (p < phase) { if (phaser.isTerminated()) // ... deal with unexpected termination else p = phaser.arriveAndAwaitAdvance(); } phaser.arriveAndDeregister(); }
要使用阶段树创建一组
n
任务,您可以使用以下形式的代码,假设具有构造函数的Task类接受在构造时注册的Phaser
。 在调用build(new Task[n], 0, n, new Phaser())
之后,可以启动这些任务,例如通过提交到池:void build(Task[] tasks, int lo, int hi, Phaser ph) { if (hi - lo > TASKS_PER_PHASER) { for (int i = lo; i < hi; i += TASKS_PER_PHASER) { int j = Math.min(i + TASKS_PER_PHASER, hi); build(tasks, i, j, new Phaser(ph)); } } else { for (int i = lo; i < hi; ++i) tasks[i] = new Task(ph); // assumes new Task(ph) performs ph.register() } }
TASKS_PER_PHASER
的最佳值主要取决于预期的同步速率。 低至4的值可能适用于极小的每相任务机构(因此高速率),或者对于极大的任务机构而言可能高达数百。实施说明 :此实施将最大参与方数限制为65535.尝试注册其他参与方会产生
IllegalStateException
。 但是,您可以而且应该创建分层相位器以适应任意大量的参与者。- 从以下版本开始:
- 1.7
- 到达。 方法
-
-
方法摘要
所有方法 实例方法 具体的方法 变量和类型 方法 描述 int
arrive()
到达这个移相器,而不是等待其他人到达。int
arriveAndAwaitAdvance()
到达这个移相器并等待其他人。int
arriveAndDeregister()
到达此移相器并从中取消注册而无需等待其他人到达。int
awaitAdvance(int phase)
等待该相位器的相位从给定相位值前进,如果当前相位不等于给定相位值或该相位器终止则立即返回。int
awaitAdvanceInterruptibly(int phase)
等待该相位器的相位从给定相位值前进,如果在等待时中断则抛出InterruptedException
,或者如果当前相位不等于给定相位值或该相位器终止则立即返回。int
awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit)
等待此相位器的相位从给定相位值或给定超时前进到过去,如果在等待时中断则抛出InterruptedException
,或者如果当前相位不等于给定相位值或该相位器终止则立即返回。int
bulkRegister(int parties)
在此移相器中添加给定数量的新未分支方。void
forceTermination()
强制此移相器进入终止状态。int
getArrivedParties()
返回已到达此移相器当前阶段的已注册方的数量。Phaser
getParent()
返回此移相器的父级,如果没有,则返回null
。int
getPhase()
返回当前阶段号。int
getRegisteredParties()
返回此移相器注册的参与方数量。Phaser
getRoot()
返回此移相器的根祖先,如果它没有父移相器,则与此移相器相同。int
getUnarrivedParties()
返回尚未到达此移相器当前阶段的已注册方的数量。boolean
isTerminated()
如果此移相器已终止,则返回true
。protected boolean
onAdvance(int phase, int registeredParties)
可重写的方法,用于在即将发生相位超前时执行操作,并控制终止。int
register()
为此移相器添加了一个新的未受影响的聚会。String
toString()
返回标识此移相器的字符串及其状态。
-
-
-
构造方法详细信息
-
Phaser
public Phaser()
创建一个新的移相器,没有最初注册的参与方,没有父级,初始阶段号为0.使用此移相器的任何线程都需要首先注册它。
-
Phaser
public Phaser(int parties)
创建一个新的移相器,其中包含指定数量的已注册未获得的参与方,没有父级,初始阶段编号为0。- 参数
-
parties
- 进入下一阶段所需的参与方数量 - 异常
-
IllegalArgumentException
- 如果当事方小于零或大于支持的最大当事方数
-
Phaser
public Phaser(Phaser parent)
相当于Phaser(parent, 0)
。- 参数
-
parent
- 父移相器
-
Phaser
public Phaser(Phaser parent, int parties)
使用给定的父级和已注册的未获得方的数量创建新的移相器。 当给定父级非空且给定方数大于零时,此子移相器将向其父级注册。- 参数
-
parent
- 父移相器 -
parties
- 进入下一阶段所需的参与方数量 - 异常
-
IllegalArgumentException
- 如果当事方小于零或大于最大支持方数
-
-
方法详细信息
-
register
public int register()
为此移相器添加了一个新的未受影响的聚会。 如果正在进行onAdvance(int, int)
的调用,则此方法可能会在返回之前等待其完成。 如果此移相器具有父级,并且此移相器以前没有注册方,则此子移相器也在其父级中注册。 如果此移相器终止,则注册尝试无效,并返回负值。- 结果
- 该注册所适用的到达阶段号。 如果此值为负,则此移相器已终止,在这种情况下,注册无效。
- 异常
-
IllegalStateException
- 如果尝试注册超过支持的最大数量的方
-
bulkRegister
public int bulkRegister(int parties)
在此移相器中添加给定数量的新未分支方。 如果正在进行onAdvance(int, int)
的调用,则此方法可能会在返回之前等待其完成。 如果此移相器具有父级,并且给定数量的方大于零,并且此移相器之前没有注册方,则此子移相器也向其父级注册。 如果此移相器终止,则注册尝试无效,并返回负值。- 参数
-
parties
- 进入下一阶段所需的额外参与方数量 - 结果
- 该注册所适用的到达阶段号。 如果此值为负,则此移相器已终止,在这种情况下,注册无效。
- 异常
-
IllegalStateException
- 如果尝试注册超过最大支持的参与方数量 -
IllegalArgumentException
- 如果parties < 0
-
arrive
public int arrive()
到达这个移相器,而不是等待其他人到达。未注册方调用此方法是一个使用错误。 但是,此错误可能仅在此移相器的某些后续操作时导致
IllegalStateException
(如果有的话)。- 结果
- 到达阶段号,如果终止则为负值
- 异常
-
IllegalStateException
- 如果没有终止,未得到支付的政党数量将变为负数
-
arriveAndDeregister
public int arriveAndDeregister()
到达此移相器并从中取消注册而无需等待其他人到达。 撤销注册减少了在未来阶段推进所需的各方数量。 如果此移相器具有父级,并且取消注册导致此移相器具有零方,则此移相器也从其父级取消注册。未注册方调用此方法是一个使用错误。 但是,此错误可能仅在此移相器的某些后续操作时导致
IllegalStateException
,如果有的话。- 结果
- 到达阶段号,如果终止则为负值
- 异常
-
IllegalStateException
- 如果没有终止,登记或未获得当事人的人数将变为负数
-
arriveAndAwaitAdvance
public int arriveAndAwaitAdvance()
到达这个移相器并等待其他人。 相当于awaitAdvance(arrive())
。 如果您需要等待中断或超时,您可以使用awaitAdvance
方法的其他形式之一进行类似构造。 如果您需要在抵达时取消注册,请使用awaitAdvance(arriveAndDeregister())
。未注册方调用此方法是一个使用错误。 但是,此错误可能仅在此移相器上的某些后续操作时导致
IllegalStateException
(如果有的话)。- 结果
- 到达阶段号,或者(负) current phase如果终止
- 异常
-
IllegalStateException
- 如果没有终止,未得到支付的政党数量将变为负数
-
awaitAdvance
public int awaitAdvance(int phase)
等待该相位器的相位从给定相位值前进,如果当前相位不等于给定相位值或该相位器终止则立即返回。- 参数
-
phase
- 到达阶段号,如果终止则为负值; 此参数通常是先前调用arrive
或arriveAndDeregister
返回的值。 - 结果
- 下一个到达阶段号,或者如果是负数则为参数,或者如果终止 则为 (负) current phase
-
awaitAdvanceInterruptibly
public int awaitAdvanceInterruptibly(int phase) throws InterruptedException
等待该相位器的相位从给定相位值前进,如果在等待时中断则抛出InterruptedException
,或者如果当前相位不等于给定相位值或该相位器终止则立即返回。- 参数
-
phase
- 到达阶段号,如果终止则为负值; 此参数通常是先前调用arrive
或arriveAndDeregister
返回的值。 - 结果
- 下一个到达阶段号,或者如果它是否定的参数,或者如果终止 则为 (负) current phase
- 异常
-
InterruptedException
- 如果线程在等待时中断
-
awaitAdvanceInterruptibly
public int awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
等待该相位器的相位从给定相位值或给定超时前进到过去,如果在等待时中断则抛出InterruptedException
,或者如果当前相位不等于给定相位值或该相位器终止则立即返回。- 参数
-
phase
- 到达阶段号,如果终止则为负值; 此参数通常是先前调用arrive
或arriveAndDeregister
返回的值。 -
timeout
- 放弃前等待多长时间,单位为unit
-
unit
-一个TimeUnit
确定如何解释timeout
参数 - 结果
- 下一个到达阶段号,或者如果是负数则为参数,或者如果终止 则为 (负) current phase
- 异常
-
InterruptedException
- 如果线程在等待时中断 -
TimeoutException
- 如果在等待时超时
-
forceTermination
public void forceTermination()
强制此移相器进入终止状态。 注册方的数量不受影响。 如果此移相器是分层相位器的成员,则该组中的所有移相器都将终止。 如果此移相器已终止,则此方法无效。 在一个或多个任务遇到意外异常后,此方法可用于协调恢复。
-
getPhase
public final int getPhase()
返回当前阶段号。 最大相数为Integer.MAX_VALUE
,之后重新启动为零。 在终止时,阶段编号为负,在这种情况下,终止之前的主要阶段可以通过getPhase() + Integer.MIN_VALUE
获得。- 结果
- 阶段号,如果终止则为负值
-
getRegisteredParties
public int getRegisteredParties()
返回此移相器注册的参与方数量。- 结果
- 派对的数量
-
getArrivedParties
public int getArrivedParties()
返回已到达此移相器当前阶段的已注册方的数量。 如果此移相器已终止,则返回的值无意义且任意。- 结果
- 到达的派对数量
-
getUnarrivedParties
public int getUnarrivedParties()
返回尚未到达此移相器当前阶段的已注册方的数量。 如果此移相器已终止,则返回的值无意义且任意。- 结果
- 没有派对的人数
-
getParent
public Phaser getParent()
返回此移相器的父级,如果没有,则返回null
。- 结果
-
此移相器的父级,如果没有,
null
-
getRoot
public Phaser getRoot()
返回此移相器的根祖先,如果它没有父移相器,则与此移相器相同。- 结果
- 这个移相器的根本祖先
-
isTerminated
public boolean isTerminated()
如果此移相器已终止,则返回true
。- 结果
-
true
此移相器是否已终止
-
onAdvance
protected boolean onAdvance(int phase, int registeredParties)
可重写的方法,用于在即将发生相位超前时执行操作,并控制终止。 在推进该移相器的一方到来时(当所有其他等待方都处于休眠状态时)调用该方法。 如果此方法返回true
,则此移相器将在提前设置为最终终止状态,随后对isTerminated()
的调用将返回true。 通过调用此方法抛出的任何(未经检查的)异常或错误将传播到尝试推进此移相器的一方,在这种情况下不会发生进展。该方法的参数提供了当前转换的主要相位器状态。 从
onAdvance
内onAdvance
此移相器的到达,注册和等待方法的效果未指定,不应依赖。如果此移相器是分层移相器的成员,则仅在每次前进时调用其根移相器
onAdvance
。支持最常见的用例,此方法的默认实现返回
true
当注册方的数量已变为零作为党调用的结果arriveAndDeregister
。 您可以禁用此行为,从而在将来注册时启用,通过重写此方法始终返回false
:Phaser phaser = new Phaser() { protected boolean onAdvance(int phase, int parties) { return false; } }
- 参数
-
phase
- 在此移相器前进之前,此方法的当前阶段编号 -
registeredParties
- 当前的注册用户数 - 结果
-
true
如果此移相器应该终止
-
-