- java.lang.Object
-
- java.util.concurrent.ForkJoinTask<T>
-
- java.util.concurrent.CountedCompleter<T>
-
- 实现的所有接口
-
Serializable
,Future<T>
public abstract class CountedCompleter<T> extends ForkJoinTask<T>
ForkJoinTask
,触发时执行完成操作,并且没有剩余的待处理操作。 与其他形式的ForkJoinTasks相比,CountedCompleters在子任务停顿和阻塞的情况下通常更强大,但编程不太直观。 CountedCompleter的使用类似于其他基于完成的组件(例如CompletionHandler
)的使用,除了可能需要多个未决的完成来触发完成操作onCompletion(CountedCompleter)
,而不仅仅是一个。 除非另有初始化, pending count开始于零,但也可以是(原子),使用方法改变setPendingCount(int)
,addToPendingCount(int)
,和compareAndSetPendingCount(int, int)
。 在调用tryComplete()
时 ,如果挂起的操作计数非零,则递减; 否则,执行完成动作,并且如果该完成者本身具有完成者,则该过程继续其完成者。 与相关同步组件(如Phaser
和Semaphore
)的情况一样 ,这些方法仅影响内部计数; 他们没有建立任何进一步的内部簿记。 特别是,未维护待处理任务的身份。 如下所示,您可以创建在需要时记录部分或全部待处理任务或其结果的子类。 如下所示,还提供了支持完成遍历的定制的实用方法。 但是,由于CountedCompleters仅提供基本同步机制,因此创建进一步的抽象子类可能很有用,这些子类维护适用于一组相关用法的链接,字段和其他支持方法。具体的CountedCompleter类必须定义方法
compute()
,在大多数情况下(如下图所示),在返回之前调用tryComplete()
。 该类还可以可选地覆盖方法onCompletion(CountedCompleter)
以在正常完成时执行动作,并且方法onExceptionalCompletion(Throwable, CountedCompleter)
可以在任何异常时执行动作。CountedCompleters通常不承担结果,在这种情况下,它们通常被声明为
CountedCompleter<Void>
,并且将始终返回null
作为结果值。 在其他情况下,您应覆盖方法getRawResult()
以提供join(), invoke()
和相关方法的结果。 通常,此方法应返回CountedCompleter对象的字段值(或一个或多个字段的函数),该对象在完成时保存结果。 默认情况下,方法setRawResult(T)
在CountedCompleters中不起作用。 有可能(但很少适用)覆盖此方法以维护保存结果数据的其他对象或字段。一个不具有完成者的CountedCompleter (即,
getCompleter()
返回null
)可以用作具有此附加功能的常规ForkJoinTask。 但是,任何具有另一个完成者的完成者仅用作其他计算的内部帮助者,因此其自身的任务状态(如Future.isDone()
等方法中所报告的 )是任意的; 这种状况只有在明确调用改变complete(T)
,ForkJoinTask.cancel(boolean)
,ForkJoinTask.completeExceptionally(Throwable)
或方法的特殊结束后compute
。 在任何异常完成时,异常可以被转发到任务的完成者(及其完成者,等等),如果存在并且尚未完成。 同样,取消内部的CountedCompleter只会对该完成者产生局部影响,因此通常不常用。示例用法。
并行递归分解。 CountedCompleters可以安排在类似于
RecursiveAction
经常使用的树中,尽管设置它们的结构通常会有所不同。 这里,每个任务的完成者是计算树中的父。 尽管它们需要更多的簿记,但在对阵列或集合的每个元素应用可能耗时的操作(无法进一步细分)时,CountedCompleters可能是更好的选择; 特别是当某些元素的操作完成时间与其他元素完全不同时,或者由于内在的变化(例如I / O)或者诸如垃圾收集之类的辅助效果。 由于CountedCompleters提供了自己的延续,因此其他任务无需阻止等待执行它们。例如,这是一个实用程序方法的初始版本,它使用二分二递归分解将工作分成单个部分(叶子任务)。 即使工作被分成单独的调用,基于树的技术通常比直接分支叶子任务更可取,因为它们减少了线程间的通信并改善了负载平衡。 在递归的情况下,完成每对子任务的第二个触发其父级的完成(因为没有执行结果组合,方法
onCompletion
的默认无操作实现不被覆盖)。 实用程序方法设置root任务并调用它(这里隐式使用ForkJoinPool.commonPool()
)。 始终将待处理计数设置为子任务数并在返回之前立即调用tryComplete()
是简单且可靠的(但不是最佳的)。public static <E> void forEach(E[] array, Consumer<E> action) { class Task extends CountedCompleter<Void> { final int lo, hi; Task(Task parent, int lo, int hi) { super(parent); this.lo = lo; this.hi = hi; } public void compute() { if (hi - lo >= 2) { int mid = (lo + hi) >>> 1; // must set pending count before fork setPendingCount(2); new Task(this, mid, hi).fork(); // right child new Task(this, lo, mid).fork(); // left child } else if (hi > lo) action.accept(array[lo]); tryComplete(); } } new Task(null, 0, array.length).invoke(); }
tryComplete()
的调用,但代价是待计数看起来“一个接一个”。public void compute() { if (hi - lo >= 2) { int mid = (lo + hi) >>> 1; setPendingCount(1); // looks off by one, but correct! new Task(this, mid, hi).fork(); // right child new Task(this, lo, mid).compute(); // direct invoke } else { if (hi > lo) action.accept(array[lo]); tryComplete(); } }
onCompletion(CountedCompleter)
方法,因此tryComplete
可以替换为propagateCompletion()
。public void compute() { int n = hi - lo; for (; n >= 2; n /= 2) { addToPendingCount(1); new Task(this, lo + n/2, lo + n).fork(); } if (n > 0) action.accept(array[lo]); propagateCompletion(); }
public static <E> void forEach(E[] array, Consumer<E> action) { class Task extends CountedCompleter<Void> { final int lo, hi; Task(Task parent, int lo, int hi) { super(parent, 31 - Integer.numberOfLeadingZeros(hi - lo)); this.lo = lo; this.hi = hi; } public void compute() { for (int n = hi - lo; n >= 2; n /= 2) new Task(this, lo + n/2, lo + n).fork(); action.accept(array[lo]); propagateCompletion(); } } if (array.length > 0) new Task(null, 0, array.length).invoke(); }
搜索。 CountedCompleters树可以在数据结构的不同部分中搜索值或属性,并在找到结果后立即在
AtomicReference
中报告结果。 其他人可以轮询结果以避免不必要的工作。 (你还可以另外cancel其他任务,但通常更简单,更高效,让他们注意到结果已设置,如果是这样,则跳过进一步处理。)再次使用完全分区的数组进行说明(再次,在实践中,叶任务将几乎总是处理多个元素):class Searcher<E> extends CountedCompleter<E> { final E[] array; final AtomicReference<E> result; final int lo, hi; Searcher(CountedCompleter<?> p, E[] array, AtomicReference<E> result, int lo, int hi) { super(p); this.array = array; this.result = result; this.lo = lo; this.hi = hi; } public E getRawResult() { return result.get(); } public void compute() { // similar to ForEach version 3 int l = lo, h = hi; while (result.get() == null && h >= l) { if (h - l >= 2) { int mid = (l + h) >>> 1; addToPendingCount(1); new Searcher(this, array, result, mid, h).fork(); h = mid; } else { E x = array[l]; if (matches(x) && result.compareAndSet(null, x)) quietlyCompleteRoot(); // root task is now joinable break; } } tryComplete(); // normally complete whether or not found } boolean matches(E e) { ... } // return true if found public static <E> E search(E[] array) { return new Searcher<E>(null, array, new AtomicReference<E>(), 0, array.length).invoke(); } }
compareAndSet
一个共同的结果,后无条件调用tryComplete
可以是有条件的(if (result.get() == null) tryComplete();
),因为没有进一步的簿记需要一次根本任务完成管理落成。记录子任务。 将多个子任务的结果组合在一起的CountedCompleter任务通常需要在方法
onCompletion(CountedCompleter)
中访问这些结果。 如下面的类所示(执行map-reduce的简化形式,其中映射和缩减都是E
类型),在分而治之的设计中执行此操作的一种方法是将每个子任务记录为其兄弟,以便它可以可以通过方法onCompletion
访问。 这种技术适用于左右结果组合顺序无关紧要的减少; 有序减少需要明确的左/右指定。 在上述示例中看到的其他流线型的变体也可以适用。class MyMapper<E> { E apply(E v) { ... } } class MyReducer<E> { E apply(E x, E y) { ... } } class MapReducer<E> extends CountedCompleter<E> { final E[] array; final MyMapper<E> mapper; final MyReducer<E> reducer; final int lo, hi; MapReducer<E> sibling; E result; MapReducer(CountedCompleter<?> p, E[] array, MyMapper<E> mapper, MyReducer<E> reducer, int lo, int hi) { super(p); this.array = array; this.mapper = mapper; this.reducer = reducer; this.lo = lo; this.hi = hi; } public void compute() { if (hi - lo >= 2) { int mid = (lo + hi) >>> 1; MapReducer<E> left = new MapReducer(this, array, mapper, reducer, lo, mid); MapReducer<E> right = new MapReducer(this, array, mapper, reducer, mid, hi); left.sibling = right; right.sibling = left; setPendingCount(1); // only right is pending right.fork(); left.compute(); // directly execute left } else { if (hi > lo) result = mapper.apply(array[lo]); tryComplete(); } } public void onCompletion(CountedCompleter<?> caller) { if (caller != this) { MapReducer<E> child = (MapReducer<E>)caller; MapReducer<E> sib = child.sibling; if (sib == null || sib.result == null) result = child.result; else result = reducer.apply(child.result, sib.result); } } public E getRawResult() { return result; } public static <E> E mapReduce(E[] array, MyMapper<E> mapper, MyReducer<E> reducer) { return new MapReducer<E>(null, array, mapper, reducer, 0, array.length).invoke(); } }
onCompletion
采用与组合结果的许多完成设计共同的形式。 这个回调式方法在每个任务中被触发一次,在两个不同的上下文中,其中挂起的计数是或变为零:(1)由任务本身,如果其挂起的计数在调用tryComplete
为零,或者(2)当它们完成并将未决计数减少到零时,通过它的任何子任务。caller
参数区分了案例。 通常,当呼叫者是this
,不需要采取任何措施。 否则,可以使用调用者参数(通常通过强制转换)来提供要组合的值(和/或指向其他值的链接)。 假设正确使用挂起计数,onCompletion
内的操作在完成任务及其子任务时发生(一次)。 此方法中不需要其他同步,以确保访问此任务的字段或其他已完成任务的线程安全性。完成遍历 。 如果使用
onCompletion
处理完成不适用或不方便,则可以使用方法firstComplete()
和nextComplete()
创建自定义遍历。 例如,要定义仅以第三个ForEach示例的形式拆分右侧任务的MapReducer,完成必须协同地减少未使用的子任务链接,这可以按如下方式完成:class MapReducer<E> extends CountedCompleter<E> { // version 2 final E[] array; final MyMapper<E> mapper; final MyReducer<E> reducer; final int lo, hi; MapReducer<E> forks, next; // record subtask forks in list E result; MapReducer(CountedCompleter<?> p, E[] array, MyMapper<E> mapper, MyReducer<E> reducer, int lo, int hi, MapReducer<E> next) { super(p); this.array = array; this.mapper = mapper; this.reducer = reducer; this.lo = lo; this.hi = hi; this.next = next; } public void compute() { int l = lo, h = hi; while (h - l >= 2) { int mid = (l + h) >>> 1; addToPendingCount(1); (forks = new MapReducer(this, array, mapper, reducer, mid, h, forks)).fork(); h = mid; } if (h > l) result = mapper.apply(array[l]); // process completions by reducing along and advancing subtask links for (CountedCompleter<?> c = firstComplete(); c != null; c = c.nextComplete()) { for (MapReducer t = (MapReducer)c, s = t.forks; s != null; s = t.forks = s.next) t.result = reducer.apply(t.result, s.result); } } public E getRawResult() { return result; } public static <E> E mapReduce(E[] array, MyMapper<E> mapper, MyReducer<E> reducer) { return new MapReducer<E>(null, array, mapper, reducer, 0, array.length, null).invoke(); } }
触发。 一些CountedCompleters本身从不分叉,而是在其他设计中用作管道; 包括完成一个或多个异步任务触发另一个异步任务的那些。 例如:
class HeaderBuilder extends CountedCompleter<...> { ... } class BodyBuilder extends CountedCompleter<...> { ... } class PacketSender extends CountedCompleter<...> { PacketSender(...) { super(null, 1); ... } // trigger on second completion public void compute() { } // never called public void onCompletion(CountedCompleter<?> caller) { sendPacket(); } } // sample use: PacketSender p = new PacketSender(); new HeaderBuilder(p, ...).fork(); new BodyBuilder(p, ...).fork();
- 从以下版本开始:
- 1.8
- 另请参见:
- Serialized Form
-
-
构造方法摘要
构造方法 变量 构造器 描述 protected
CountedCompleter()
创建一个新的CountedCompleter,没有完成者,初始挂起计数为零。protected
CountedCompleter(CountedCompleter<?> completer)
使用给定的完成符创建一个新的CountedCompleter,初始挂起计数为零。protected
CountedCompleter(CountedCompleter<?> completer, int initialPendingCount)
使用给定的完成者和初始挂起计数创建一个新的CountedCompleter。
-
方法摘要
所有方法 实例方法 抽象方法 具体的方法 变量和类型 方法 描述 void
addToPendingCount(int delta)
将(原子地)给定值添加到挂起计数中。boolean
compareAndSetPendingCount(int expected, int count)
仅在当前保持给定预期值时,将待处理计数(原子地)设置为给定计数。void
complete(T rawResult)
abstract void
compute()
该任务执行的主要计算。int
decrementPendingCountUnlessZero()
如果挂起的计数非零,则(原子地)递减它。protected boolean
exec()
实现CountedCompleters的执行约定。CountedCompleter<?>
firstComplete()
如果此任务的挂起计数为零,则返回此任务; 否则减少其待处理的计数并返回null
。CountedCompleter<?>
getCompleter()
返回在此任务的构造函数中建立的完成null
如果没有,则返回null
。int
getPendingCount()
返回当前挂起的计数。T
getRawResult()
返回计算结果。CountedCompleter<?>
getRoot()
返回当前计算的根; 即,如果没有完成者,则执行此任务,否则完成者的根。void
helpComplete(int maxTasks)
如果此任务尚未完成,则尝试最多处理此任务在完成路径上的给定数量的其他未处理任务(如果已知存在)。CountedCompleter<?>
nextComplete()
如果此任务没有完成者,则调用ForkJoinTask.quietlyComplete()
并返回null
。void
onCompletion(CountedCompleter<?> caller)
调用方法tryComplete()
并且挂起计数为零时,或者在调用无条件方法complete(T)
时执行操作。boolean
onExceptionalCompletion(Throwable ex, CountedCompleter<?> caller)
当方法执行一个动作ForkJoinTask.completeExceptionally(Throwable)
调用或方法compute()
抛出一个异常,而这个任务尚未否则正常完成。void
propagateCompletion()
等效于tryComplete()
,但不沿完成路径调用onCompletion(CountedCompleter)
:如果挂起计数非零,则递减计数; 否则,类似地尝试完成此任务的完成,如果存在,则将此任务标记为完成。void
quietlyCompleteRoot()
相当于getRoot().quietlyComplete()
。void
setPendingCount(int count)
将挂起计数设置为给定值。protected void
setRawResult(T t)
结果承载CountedCompleters的方法可以选择用于帮助维护结果数据。void
tryComplete()
如果挂起的计数非零,则递减计数; 否则调用onCompletion(CountedCompleter)
,然后类似地尝试完成此任务的完成,如果存在,则将此任务标记为完成。-
声明方法的类 java.util.concurrent.ForkJoinTask
adapt, adapt, adapt, cancel, compareAndSetForkJoinTaskTag, completeExceptionally, fork, get, get, getException, getForkJoinTaskTag, getPool, getQueuedTaskCount, getSurplusQueuedTaskCount, helpQuiesce, inForkJoinPool, invoke, invokeAll, invokeAll, invokeAll, isCompletedAbnormally, isCompletedNormally, join, peekNextLocalTask, pollNextLocalTask, pollSubmission, pollTask, quietlyComplete, quietlyInvoke, quietlyJoin, reinitialize, setForkJoinTaskTag, tryUnfork
-
声明方法的类 java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
声明方法的接口 java.util.concurrent.Future
isCancelled, isDone
-
-
-
-
构造方法详细信息
-
CountedCompleter
protected CountedCompleter(CountedCompleter<?> completer, int initialPendingCount)
使用给定的完成者和初始挂起计数创建一个新的CountedCompleter。- 参数
-
completer
- 此任务的完成者,或null
如果没有) -
initialPendingCount
- 初始待处理计数
-
CountedCompleter
protected CountedCompleter(CountedCompleter<?> completer)
使用给定的完成符创建一个新的CountedCompleter,初始挂起计数为零。- 参数
-
completer
- 此任务的完成者,如果没有,null
-
CountedCompleter
protected CountedCompleter()
创建一个新的CountedCompleter,没有完成者,初始挂起计数为零。
-
-
方法详细信息
-
compute
public abstract void compute()
该任务执行的主要计算。
-
onCompletion
public void onCompletion(CountedCompleter<?> caller)
调用方法tryComplete()
并且挂起计数为零时,或者调用无条件方法complete(T)
时执行操作。 默认情况下,此方法不执行任何操作 您可以通过检查给定调用者参数的标识来区分个案。 如果不等于this
,那么它通常是一个子任务,可能包含要组合的结果(和/或其他结果的链接)。- 参数
-
caller
- 调用此方法的任务(可能是此任务本身)
-
onExceptionalCompletion
public boolean onExceptionalCompletion(Throwable ex, CountedCompleter<?> caller)
当方法执行一个动作ForkJoinTask.completeExceptionally(Throwable)
调用或方法compute()
抛出一个异常,而这个任务尚未否则正常完成。 在进入此方法时,此任务ForkJoinTask.isCompletedAbnormally()
。 此方法的返回值控制进一步传播:如果true
并且此任务具有尚未完成的完成true
,则该完成true
也会异常完成,但与此完成true
相同。 除了返回true
之外,此方法的默认实现不执行任何true
。- 参数
-
ex
- 例外 -
caller
- 调用此方法的任务(可能是此任务本身) - 结果
-
true
如果此异常应传播到此任务的完成者(如果存在)
-
getCompleter
public final CountedCompleter<?> getCompleter()
返回在此任务的构造函数中建立的完成null
如果没有,则返回null
。- 结果
- 完成者
-
getPendingCount
public final int getPendingCount()
返回当前挂起的计数。- 结果
- 当前待处理的计数
-
setPendingCount
public final void setPendingCount(int count)
将挂起计数设置为给定值。- 参数
-
count
- 伯爵
-
addToPendingCount
public final void addToPendingCount(int delta)
将(原子地)给定值添加到挂起计数中。- 参数
-
delta
- 要添加的值
-
compareAndSetPendingCount
public final boolean compareAndSetPendingCount(int expected, int count)
仅在当前保持给定预期值时,将待处理计数(原子地)设置为给定计数。- 参数
-
expected
- 预期值 -
count
- 新值 - 结果
-
true
如果成功
-
decrementPendingCountUnlessZero
public final int decrementPendingCountUnlessZero()
如果挂起的计数非零,则(原子地)递减它。- 结果
- 进入此方法时的初始(未减少)挂起计数
-
getRoot
public final CountedCompleter<?> getRoot()
返回当前计算的根; 即,如果没有完成者,则执行此任务,否则完成者的根。- 结果
- 当前计算的根源
-
tryComplete
public final void tryComplete()
如果挂起的计数非零,则递减计数; 否则调用onCompletion(CountedCompleter)
,然后类似地尝试完成此任务的完成,如果存在,则将此任务标记为完成。
-
propagateCompletion
public final void propagateCompletion()
等效于tryComplete()
但不沿完成路径调用onCompletion(CountedCompleter)
:如果挂起计数非零,则递减计数; 否则,类似地尝试完成此任务的完成,如果存在,则将此任务标记为完成。 在onCompletion
不应该或不需要为每个完成onCompletion
调用onCompletion
情况下,该方法可能是有用的。
-
complete
public void complete(T rawResult)
无论挂起计数如何,调用onCompletion(CountedCompleter)
,将此任务标记为完成,并在此任务的完成符上进一步触发tryComplete()
(如果存在)。 给定rawResult作为参数传递给setRawResult(T)
调用之前onCompletion(CountedCompleter)
或纪念这一任务已完成; 它的值仅对覆盖setRawResult
类有意义。 此方法不会修改挂起计数。一旦获得任何一个(相对于所有)几个子任务结果,当强制完成时,该方法可能是有用的。 但是,在未覆盖
setRawResult
的常见(和推荐)情况下,可以使用quietlyCompleteRoot()
更简单地获得此效果。- 重写:
-
complete
类ForkJoinTask<T>
- 参数
-
rawResult
- 原始结果
-
firstComplete
public final CountedCompleter<?> firstComplete()
- 结果
-
此任务,如果挂起计数为零,
null
-
nextComplete
public final CountedCompleter<?> nextComplete()
如果此任务没有完成者,则调用ForkJoinTask.quietlyComplete()
并返回null
。 或者,如果完成者的待处理计数非零,则递减计数并返回null
。 否则,返回完成者。 此方法可用作同类任务层次结构的完成遍历循环的一部分:for (CountedCompleter<?> c = firstComplete(); c != null; c = c.nextComplete()) { // ... process c ... }
- 结果
-
完成者,如果没有,
null
-
quietlyCompleteRoot
public final void quietlyCompleteRoot()
相当于getRoot().quietlyComplete()
。
-
helpComplete
public final void helpComplete(int maxTasks)
如果此任务尚未完成,则尝试最多处理此任务在完成路径上的给定数量的其他未处理任务(如果已知存在)。- 参数
-
maxTasks
- 要处理的最大任务数。 如果小于或等于零,则不处理任务。
-
exec
protected final boolean exec()
实现CountedCompleters的执行约定。- Specified by:
-
exec
在类ForkJoinTask<T>
- 结果
-
true
如果已知此任务已正常完成
-
getRawResult
public T getRawResult()
返回计算结果。 默认情况下,返回null
,这适用于Void
操作,但在其他情况下应该被覆盖,几乎总是返回完成时保存结果的字段的字段或函数。- Specified by:
-
getRawResult
在类ForkJoinTask<T>
- 结果
- 计算的结果
-
setRawResult
protected void setRawResult(T t)
结果承载CountedCompleters的方法可以选择用于帮助维护结果数据。 默认情况下,什么也不做。 不建议覆盖。 但是,如果重写此方法以更新现有对象或字段,则通常必须将其定义为线程安全的。- Specified by:
-
setRawResult
在ForkJoinTask<T>
类 - 参数
-
t
- 该值
-
-