带着疑问分析RxJava1.x原理:
事件流源头(observable)怎么发出数据
响应者(subscriber)怎么收到数据
怎么对事件流进行操作(operator/transformer)
整个过程的调度(scheduler)
响应式编程
响应式编程是一种基于异步数据流概念的编程模式。响应式编程依赖事件,事件可以被等待,可以触发过程,也可以触发其他事件。
Rx借助可观测的序列提供一种简单的方式来创建异步的,基于事件驱动的程序。
操作符
- 转换类操作符
map
flatMap
concatMap
flatMapIterable
switchMap
scan
groupBy
- 过滤类操作符
filter
take
takeLast
taskUntil
debounce
distinct
distinctUntilchanged
skip
skipLast
如果在最后一个事件的等待时间内重新发出了事件,则以该事件作为最后一个事件。直到最后一个事件过了等待时间后才返回。具体例子可看下幅图:
debounce distinct
- 组合类操作符
merge
zip
join
combineLatest
and/when/then
switch
startSwitch
源码解析
Observable/Subscriber
Observable和Subject是两个“生产”实体,Observer和Subscriber是两个“消费”实体。
Observable.create()方法构造了一个被观察者Observable对象,同时将new出来的OnSubscribe赋值给了该Observable的成员变量onSubscribe。
Subscriber 继承了 Subscription,用于取消订阅。
public abstract class Subscriber<T> implements Observer<T>, Subscription
事件传递流程
如果传入的是Action
,则先封装成Subscriber
。对传入的Subscriber
进行包装,包装为 SafeSubscriber
,SafeSubscriber
是subscriber
的一个代理,对subscriber的一系列方法做了严格的安全校验。保证onCompleted()和onError()只会有一个被执行且只执行一次,一旦它们其中方法被调用过后onNext()就不再执行了。
onStart()
就是在我们调用 subscribe()
的线程执行的。
obsevable.subscribe(observer)
的显式调用流程
obsevable.subscribe(observer)
的内部调用流程
操作符流程
map操作符流程Schedulers执行线程
执行线程线程执行的内部调用过程
subscribeOn 影响它上面的调用执行时所在的线程。
observeOn 影响它下面的调用执行时所在的线程。
subscribeOn与操作符的原理一致,创造一个新的Observable用于进行干预操作,并通过线程池executor最终实现了线程切换。当不指定observeOn时,SubscriberOn()对上下游的线程都有影响。
observeOn切换线程是通过lift来实现。Lift的功能是做包装,将上游对下游的on***()事件传给包装好的Operator。
Operator继承了Function,主要是控制上下游事件发送的速率,最终将上游的事件发送给内部静态类ObserveOnSubscriber(继承了Action)。具体的处理操作会将封装好的Action发送到线程池中。每个observeOn都会对它所管辖的下游Observalbe生效。
通过schedule()将新观察者ObserveOnSubscriber发送给subscriberOne的所有事件换到了recursiveScheduler所对应的线程。subscriberOne的onNext()/onCompleted()/onError()方法丢到了recursiveScheduler对应的线程中执行。recursiveScheduler是一个Worker,在执行schedule()时创建了一个Runnable,在run()方法中调用了observeOnSubscriber.call()。
整个流程传递ScheduledAction是Runnable,将上游Observable.call()事件和Subscriber.onNext/onError/onCompleted事件都封装成Action事件放入ScheduledAction这个实际的Runnable方法中,并交由Worker的schedule()方法处理。
由于指定了Thread(io/newThread/mainThread),内部会先将ThreadFactory创建的线程放入只有一个核心进程的ScheduledExecutorService线程池中。在scheduler()方法被调用时执行该Runnable。
Scheduler管理Work,Work内部通过线程池ScheduledExecutorService执行call()方法中封装的Runnable对象。
backpressure
backpressure主要通过Producer实现。原理是让subscriber向observable主动请求数据,通过producer成为observable和subscriber的数据通信的协调桥梁。
大多数异步操作符,比如observeOn
会有一个限定大小的Buffer,
在内部,Observable通过给Subscriber调用setProducer方法,方便Subscriber之后通过记录onNext()调用频率(即上游下发事件速率),调用Observable.request(n)方法,控制上游Observable发送事件的速率。
hook
在众多节点(创建Observable,获取Scheduler等)时,通过hook可进行任意想要的操作,记录、修饰、甚至抛出异常。
通过RxJavaPlugins及RxJavaHook类对关心的节点(hook point)插桩,让我们可以控制(manipulate)程序在这些节点的行为。
为什么subscribeOn 只有第一次调用生效?
subscribeOn 的作用域就是调用前序列中所有的 Todo List 任务清单(Observable.OnSubscribe),当我们执行 subscribe() 时,这些任务清单就会执行在 subscribeOn 指定的工作线程,而第二个 subscribeOn 早就没有任务可做了,所以无法生效。
网友评论