- java.lang.Object
-
- java.util.concurrent.SubmissionPublisher<T>
-
- 参数类型
-
T
- 已发布的项目类型
- 实现的所有接口
-
AutoCloseable
,Flow.Publisher<T>
public class SubmissionPublisher<T> extends Object implements Flow.Publisher<T>, AutoCloseable
一个Flow.Publisher
,它将提交的(非空)项目异步发送给当前订阅者,直到它被关闭。 除非遇到丢弃或异常,否则每个当前订户以相同的顺序接收新提交的项目。 使用SubmissionPublisher允许项目生成器充当兼容性reactive-streams Publishers依赖于丢弃处理和/或阻止流控制。SubmissionPublisher使用其构造函数中提供的
Executor
传递给订阅者。 Executor的最佳选择取决于预期的用途。 如果提交的项目的生成器在不同的线程中运行,并且可以估计订户的数量,请考虑使用Executors.newFixedThreadPool(int)
。 否则请考虑使用默认值,通常为ForkJoinPool.commonPool()
。缓冲允许生产者和消费者以不同的速率进行瞬时操作。 每个订户使用独立的缓冲区。 缓冲区在首次使用时创建,并根据需要扩展到给定的最大值。 (强制执行的容量可以四舍五入到最接近的2的幂和/或由此实现支持的最大值限制。)
request
的调用不直接导致缓冲区扩展,但如果未填充的请求超过最大容量,则风险饱和。 默认值Flow.defaultBufferSize()
可以为基于预期费率,资源和使用选择容量提供有用的起点。可以在多个源之间共享单个SubmissionPublisher。 在发布项目之前的源线程中的动作或在每个订户的相应访问之后发出信号happen-before动作。 但是报告的滞后和需求估计是为监控而设计的,而不是用于同步控制,可能反映了过时或过时的不准确观点。
发布方法支持有关缓冲区饱和时要执行的操作的不同策略。 方法
submit
阻塞,直到资源可用。 这是最简单的,但反应最少。offer
方法可能会丢弃项目(立即或有限超时),但提供插入处理程序然后重试的机会。如果任何Subscriber方法抛出异常,则其订阅将被取消。 如果处理程序是作为一个构造函数的参数,它会被取消之前在方法中的异常调用
onNext
,但在方法例外onSubscribe
,onError
和onComplete
不入账或取消之前办理。 如果提供的Executor在尝试执行任务时抛出RejectedExecutionException
(或任何其他RuntimeException或Error),或者drop处理程序在处理已删除的项时抛出异常,则会重新抛出异常。 在这些情况下,并非所有订阅者都已发布已发布的项目。 在这些情况下,通常是closeExceptionally
的良好做法。方法
consume(Consumer)
简化了对通用情况的支持,其中订户的唯一动作是使用提供的功能请求和处理所有项目。此类还可以作为生成项的子类的便捷基础,并使用此类中的方法发布它们。 例如,这是一个定期发布供应商生成的项目的类。 (实际上,您可以添加独立启动和停止生成的方法,在发布者之间共享Executor等,或者使用SubmissionPublisher作为组件而不是超类。)
class PeriodicPublisher<T> extends SubmissionPublisher<T> { final ScheduledFuture<?> periodicTask; final ScheduledExecutorService scheduler; PeriodicPublisher(Executor executor, int maxBufferCapacity, Supplier<? extends T> supplier, long period, TimeUnit unit) { super(executor, maxBufferCapacity); scheduler = new ScheduledThreadPoolExecutor(1); periodicTask = scheduler.scheduleAtFixedRate( () -> submit(supplier.get()), 0, period, unit); } public void close() { periodicTask.cancel(false); scheduler.shutdown(); super.close(); } }
以下是
Flow.Processor
实现的示例。 它为发布者提供了单步请求,以简化说明。 更自适应的版本可以使用从submit
返回的滞后估计以及其他实用方法来监视流量。class TransformProcessor<S,T> extends SubmissionPublisher<T> implements Flow.Processor<S,T> { final Function<? super S, ? extends T> function; Flow.Subscription subscription; TransformProcessor(Executor executor, int maxBufferCapacity, Function<? super S, ? extends T> function) { super(executor, maxBufferCapacity); this.function = function; } public void onSubscribe(Flow.Subscription subscription) { (this.subscription = subscription).request(1); } public void onNext(S item) { subscription.request(1); submit(function.apply(item)); } public void onError(Throwable ex) { closeExceptionally(ex); } public void onComplete() { close(); } }
- 从以下版本开始:
- 9
-
-
构造方法摘要
构造方法 构造器 描述 SubmissionPublisher()
使用ForkJoinPool.commonPool()
创建一个新的SubmissionPublisher,用于异步传递给订阅者(除非它不支持至少两个的并行级别,在这种情况下,创建一个新的Thread来运行每个任务),最大缓冲区容量为Flow.defaultBufferSize()
,并且没有方法onNext
中的订阅者异常处理程序。SubmissionPublisher(Executor executor, int maxBufferCapacity)
使用给定的Executor创建一个新的SubmissionPublisher,用于向订阅者进行异步传递,具有给定的每个订阅者的最大缓冲区大小,并且没有方法onNext
中的订阅者异常处理程序。SubmissionPublisher(Executor executor, int maxBufferCapacity, BiConsumer<? super Flow.Subscriber<? super T>,? super Throwable> handler)
使用给定的Executor创建一个新的SubmissionPublisher,用于向订阅者进行异步传递,并为每个订阅者提供给定的最大缓冲区大小,如果为非null,则在任何Subscriber在方法onNext
中引发异常时调用给定的处理程序。
-
方法摘要
所有方法 实例方法 具体的方法 变量和类型 方法 描述 void
close()
除非已经关闭,否则onComplete
向当前订户发出信号,并且不允许随后的发布尝试。void
closeExceptionally(Throwable error)
除非已经关闭,否则问题onError
向给定错误的当前订户发出信号,并且不允许随后的发布尝试。CompletableFuture<Void>
consume(Consumer<? super T> consumer)
使用给定的Consumer函数处理所有已发布的项目。int
estimateMaximumLag()
返回所有当前订阅者生成但尚未消费的最大项目数的估计值。long
estimateMinimumDemand()
返回所有当前订户中所请求的最小项目数(通过request
)但尚未生成的估计值。Throwable
getClosedException()
返回与closeExceptionally
关联的异常,如果未关闭或正常关闭,则返回null。Executor
getExecutor()
返回用于异步传递的Executor。int
getMaxBufferCapacity()
返回每用户缓冲区容量的最大值。int
getNumberOfSubscribers()
返回当前订阅者的数量。List<Flow.Subscriber<? super T>>
getSubscribers()
返回当前订阅者列表以进行监视和跟踪,而不是在订阅者上调用Flow.Subscriber
方法。boolean
hasSubscribers()
如果此发布者有任何订阅者,则返回true。boolean
isClosed()
如果此发布者不接受提交,则返回true。boolean
isSubscribed(Flow.Subscriber<? super T> subscriber)
如果当前订阅了给定订阅服务器,则返回true。int
offer(T item, long timeout, TimeUnit unit, BiPredicate<Flow.Subscriber<? super T>,? super T> onDrop)
如果可能,通过异步调用其onNext
方法将给定项目发布给每个当前订阅者,阻止任何订阅的资源不可用,直到指定的超时或直到调用者线程被中断,此时给定的处理程序(如果非-null)被调用,如果返回true,则重试一次。int
offer(T item, BiPredicate<Flow.Subscriber<? super T>,? super T> onDrop)
如果可能,通过异步调用其onNext
方法将每个当前订阅者发布给定项。int
submit(T item)
通过异步调用其onNext
方法将给定项发布到每个当前订户,在任何订户的资源不可用时不间断地阻止。void
subscribe(Flow.Subscriber<? super T> subscriber)
除非已订阅,否则添加给定的订阅者。
-
-
-
构造方法详细信息
-
SubmissionPublisher
public SubmissionPublisher(Executor executor, int maxBufferCapacity, BiConsumer<? super Flow.Subscriber<? super T>,? super Throwable> handler)
使用给定的Executor创建一个新的SubmissionPublisher,用于向订阅者进行异步传递,具有给定的每个订阅者的最大缓冲区大小,如果为非null,则在任何Subscriber在方法onNext
中抛出异常时调用给定的处理程序。- 参数
-
executor
- 用于异步传递的执行程序,支持创建至少一个独立线程 -
maxBufferCapacity
- 每个用户缓冲区的最大容量(强制容量可以四舍五入到最接近的2的幂和/或由此实现支持的最大值限制;方法getMaxBufferCapacity()
返回实际值) -
handler
- 如果为非null,则在方法onNext
抛出异常时调用的过程 - 异常
-
NullPointerException
- 如果executor为null -
IllegalArgumentException
- 如果maxBufferCapacity不是正数
-
SubmissionPublisher
public SubmissionPublisher(Executor executor, int maxBufferCapacity)
使用给定的Executor创建一个新的SubmissionPublisher,用于向订阅者进行异步传递,具有给定的每个订阅者的最大缓冲区大小,并且没有方法onNext
中的订阅者异常处理程序。- 参数
-
executor
- 用于异步传递的执行程序,支持创建至少一个独立线程 -
maxBufferCapacity
- 每个用户缓冲区的最大容量(强制容量可以四舍五入到最接近的2的幂和/或由此实现支持的最大值限制;方法getMaxBufferCapacity()
返回实际值) - 异常
-
NullPointerException
- 如果executor为null -
IllegalArgumentException
- 如果maxBufferCapacity不是正数
-
SubmissionPublisher
public SubmissionPublisher()
使用ForkJoinPool.commonPool()
创建一个新的SubmissionPublisher,用于异步传递给订阅者(除非它不支持至少两个并行级别,在这种情况下,创建一个新的Thread来运行每个任务),最大缓冲区容量为Flow.defaultBufferSize()
,并且没有方法onNext
中的订阅者异常处理程序。
-
-
方法详细信息
-
subscribe
public void subscribe(Flow.Subscriber<? super T> subscriber)
除非已订阅,否则添加给定的订阅者。 如果已经认购,认购的onError
方法被调用,在与现有的订阅IllegalStateException
。 否则,成功后,将使用新的Flow.Subscription
异步调用订阅者的onSubscribe
方法。 如果onSubscribe
引发异常,则取消订阅。 否则,如果此SubmissionPublisher异常关闭,则调用订户的onError
方法以及相应的异常,或者如果无异常关闭,则调用订户的onComplete
方法。 订户可以使通过调用接收项目request
新订阅的方法,并且可以通过调用其退订cancel
方法。- Specified by:
-
subscribe
在界面Flow.Publisher<T>
- 参数
-
subscriber
- 订户 - 异常
-
NullPointerException
- 如果订户为空
-
submit
public int submit(T item)
通过异步调用其onNext
方法将给定项发布到每个当前订户,在任何订户的资源不可用时不间断地阻止。 此方法返回所有当前订户中的最大滞后(已提交但尚未消耗的项目数)的估计值。 如果有任何订阅者,则此值至少为1(对此提交的项目进行计算),否则为零。如果此发布者的Executor在尝试异步通知订阅者时抛出RejectedExecutionException(或任何其他RuntimeException或Error),则会重新抛出此异常,在这种情况下,并非所有订阅者都已发出此项。
- 参数
-
item
- 要发布的(非null)项目 - 结果
- 用户估计的最大延迟
- 异常
-
IllegalStateException
- 如果关闭 -
NullPointerException
- 如果item为null -
RejectedExecutionException
- 如果由Executor抛出
-
offer
public int offer(T item, BiPredicate<Flow.Subscriber<? super T>,? super T> onDrop)
如果可能,通过异步调用其onNext
方法将给定项发布给每个当前订阅者。 如果超出资源限制,则可以由一个或多个订户删除该项,在这种情况下,调用给定的处理程序(如果非空),如果它返回true,则重试一次。 调用处理程序时,其他线程对此类中方法的其他调用将被阻止。 除非确保恢复,否则选项通常仅限于记录错误和/或向订户发出onError
信号。此方法返回状态指示符:如果为负,则表示(负)丢弃数(尝试向订阅者发出项目失败)。 否则,它是对所有当前订户中的最大滞后(已提交但尚未消耗的项目数)的估计。 如果有任何订阅者,则此值至少为1(对此提交的项目进行计算),否则为零。
如果此发布者的Executor在尝试异步通知订阅者时抛出RejectedExecutionException(或任何其他RuntimeException或Error),或者drop handler在处理已删除的项时抛出异常,则会重新抛出此异常。
- 参数
-
item
- 要发布的(非null)项目 -
onDrop
- 如果为非null,则处理程序在下降到订阅者时调用,具有订阅者和项目的参数; 如果它返回true,则重新尝试(一次) - 结果
- 如果为负,则为(负)滴数; 否则估计最大滞后
- 异常
-
IllegalStateException
- 如果关闭 -
NullPointerException
- 如果item为null -
RejectedExecutionException
- 如果由Executor抛出
-
offer
public int offer(T item, long timeout, TimeUnit unit, BiPredicate<Flow.Subscriber<? super T>,? super T> onDrop)
如果可能,通过异步调用其onNext
方法,阻止任何订阅的资源不可用,直到指定的超时或直到调用者线程被中断,此时给定的处理程序(如果可能),将给定项目发布给每个当前订阅者-null)被调用,如果返回true,则重试一次。 (丢弃处理程序可以通过检查当前线程是否被中断来区分超时和中断。)在调用处理程序时,其他线程对此类中方法的其他调用被阻止。 除非确保恢复,否则选项通常仅限于记录错误和/或向订户发出onError
信号。此方法返回状态指示符:如果为负,则表示(负)丢弃数(尝试向订阅者发出项目失败)。 否则,它是对所有当前订户中的最大滞后(已提交但尚未消耗的项目数)的估计。 如果有任何订阅者,则此值至少为1(对此提交的项目进行计算),否则为零。
如果此发布者的Executor在尝试异步通知订阅者时抛出RejectedExecutionException(或任何其他RuntimeException或Error),或者drop handler在处理已删除的项时抛出异常,则会重新抛出此异常。
- 参数
-
item
- 要发布的(非null)项目 -
timeout
- 在放弃之前等待任何用户的资源需要多长时间,单位为unit
-
unit
-一个TimeUnit
确定如何解释timeout
参数 -
onDrop
- 如果为非null,则处理程序在使用订阅者和项目的参数下降到订阅者时调用; 如果它返回true,则重新尝试(一次) - 结果
- 如果为负,则为(负)滴数; 否则估计最大滞后
- 异常
-
IllegalStateException
- 如果关闭 -
NullPointerException
- 如果item为null -
RejectedExecutionException
- 如果由Executor抛出
-
close
public void close()
除非已经关闭,否则问题onComplete
向当前订户发出信号,并且不允许随后的发布尝试。 返回时,这种方法并不能保证所有的用户都尚未完成。- Specified by:
-
close
在界面AutoCloseable
-
closeExceptionally
public void closeExceptionally(Throwable error)
- 参数
-
error
- 发送给订阅者的onError
参数 - 异常
-
NullPointerException
- 如果错误为空
-
isClosed
public boolean isClosed()
如果此发布者不接受提交,则返回true。- 结果
- 如果关闭则为true
-
getClosedException
public Throwable getClosedException()
返回与closeExceptionally
关联的异常,如果未关闭或正常关闭,则返回null。- 结果
- 异常,如果没有则为null
-
hasSubscribers
public boolean hasSubscribers()
如果此发布者有任何订阅者,则返回true。- 结果
- 如果此发布者有任何订阅者,则为true
-
getNumberOfSubscribers
public int getNumberOfSubscribers()
返回当前订阅者的数量。- 结果
- 当前订阅者的数量
-
getExecutor
public Executor getExecutor()
返回用于异步传递的Executor。- 结果
- Executor用于异步传递
-
getMaxBufferCapacity
public int getMaxBufferCapacity()
返回每用户缓冲区容量的最大值。- 结果
- 最大每用户缓冲容量
-
getSubscribers
public List<Flow.Subscriber<? super T>> getSubscribers()
返回当前订阅者列表以进行监视和跟踪,而不是在订阅者上调用Flow.Subscriber
方法。- 结果
- 当前订阅者列表
-
isSubscribed
public boolean isSubscribed(Flow.Subscriber<? super T> subscriber)
如果当前订阅了给定订阅服务器,则返回true。- 参数
-
subscriber
- 订户 - 结果
- 如果当前订阅,则为true
- 异常
-
NullPointerException
- 如果订户为空
-
estimateMinimumDemand
public long estimateMinimumDemand()
返回所有当前订户中所请求的最小项目数(通过request
)但尚未生成的估计值。- 结果
- 估计,如果没有订户,则为零
-
estimateMaximumLag
public int estimateMaximumLag()
返回所有当前订阅者生成但尚未消费的最大项目数的估计值。- 结果
- 估计
-
consume
public CompletableFuture<Void> consume(Consumer<? super T> consumer)
使用给定的Consumer函数处理所有已发布的项目。 返回当此发布者发出onComplete
信号时正常完成的CompletableFuture,或者在发生任何错误时异常完成,或者消费者抛出异常,或者返回的CompletableFuture被取消,在这种情况下不再处理其他项目。- 参数
-
consumer
- 应用于每个onNext项的函数 - 结果
- 一个CompletableFuture,当发布者发出完整信号时,通常会完成,特别是在出现任何错误或取消时
- 异常
-
NullPointerException
- 如果消费者为空
-
-