理解线程模型
Reactive Streams和Reactive扩展的一个共同目标是通过信号回调这种方式不再武断的遵循线程习惯。它会在现在和某个时刻T执行是Streams的所有。非同步信号也可以保存Subscriber的并发访问(无共享),但是信号和请求可以在两个不对称的线程上执行。
默认情况下,Stream被分配了一个SynchronousDispatcher,并且将会通过Stream.getDispatcher()来通知它直接子级。
多种多样的
Stream工厂,Broadcaster,Stream.dispatchOn和终端的xxxOn方法可能会修改默认的SynchronousDispatcher
Reactor Stream中三大主要可用线程切换的基本了解:
Stream.dispatchOn的作用是Stream下唯一可用的方法用于在给定的dispatcher上分发onError,onComplete和onNext信号。- Processor的行为不支持并发分发,例如
WorkQueueDispatcher。 - request和cancel将会在dispatcher上执行,如果它的上下文准备完毕。否则它将会在当前dispatch执行完毕后执行。
- Processor的行为不支持并发分发,例如
Stream.subscribeOn将只会在已经通过的dispatcher上执行。- 由于唯一一次通过的Dispatcher被称之为
onSubscribe,任何dispatcher可以使用并发分发器,类似WorkQueueDispatcher。 - 第一次请求可能仍然在
onSubscribe线程中执行,如Stream.consume()操作的实例。
- 由于唯一一次通过的Dispatcher被称之为
Stream.process附属的Processor实例也会影响线程。类似RingBufferProcessor的Processor将会使用它自己管理的线程来执行Subscriber。- 如果上下文准备完毕,请求和取消将会在同一个processor上执行。
RingBufferWorkProcessor仅最多分发onNext信号到一个Subscriber上,除非它在执行过程中被中断(重播给一个新的Subscriber)。
常规订阅是由onSubscribe开始请求数据,subscribeOn是一种有效的工具来放大Stream,尤其是无界的。如果一个订阅者向onSubscribe请求Long.MAX_VALUE条数据,它将会成为唯一一个执行请求者,并且它会在分配给这个订阅者的diapatcher上运行。这是无界Stream的默认消费行为。
在无限请求的线程中跳转
Streams.range(1, 100).dispatchOn(Environment.sharedDispatcher()) (2).subscribeOn(Environment.workDispatcher()) (1).consume(); (3)
- 分配一个
onSubscribe的工作队列dispatcher。 - 分配
onNext,onError,onComplete信号的dispatcher。 - 使用
Subscription.request(Long.MAX)来消费StreamonSubscribe
Figure 12. 无界消费者的subscribeOn和dispatchOn/process
然而,当多于1个请求将会变的复杂时,subscribeOn会变的比较无用,例如在限定Stream.capacity(n)的分步消费中。唯一执行请求的线程可能运行在分配给subscribeOn的第一个dispatcher上。
在有限请求的线程中跳转
Streams.range(1, 100).process(RingBufferProcessor.create()) (2).subscribeOn(Environment.workDispatcher()) (1).capacity(1); (3).consume(); (4)
- 分配一个
onSubscribe的工作队列dispatcher。请注意它被放置在subscribeOn将运行在用户的ringBuffer线程后,我们希望把它改成工作调度(dispatcher)。 - 分配一个用于处理
onNext,onError,onComplete的异步Processor。类似dispatchOn的行为。 - 分配
Stream的容量为1,以便下游做适应。 - 通过
Subscription.request(1)在每次onNext时消费数据。
Figure 13. 有界消费者(N < Long.MAX)的subscribeOn和dispatchOn/process
