美文网首页
RxJava1.x源码解析

RxJava1.x源码解析

作者: liaowenhao | 来源:发表于2017-09-07 00:26 被阅读106次

带着疑问分析RxJava1.x原理:
事件流源头(observable)怎么发出数据
响应者(subscriber)怎么收到数据
怎么对事件流进行操作(operator/transformer)
整个过程的调度(scheduler)

响应式编程

响应式编程是一种基于异步数据流概念的编程模式。响应式编程依赖事件,事件可以被等待,可以触发过程,也可以触发其他事件。
Rx借助可观测的序列提供一种简单的方式来创建异步的,基于事件驱动的程序。

操作符

  • 转换类操作符
    map flatMap concatMap flatMapIterable switchMap scan groupBy
flatMap concatMap
  • 过滤类操作符
    filter take takeLast taskUntil debounce distinct distinctUntilchanged skip skipLast
debounce

如果在最后一个事件的等待时间内重新发出了事件,则以该事件作为最后一个事件。直到最后一个事件过了等待时间后才返回。具体例子可看下幅图:


debounce distinct
  • 组合类操作符
    merge zip join combineLatest and/when/then switch startSwitch
merge,无序 concat,有序 zip

源码解析

Observable/Subscriber

Observable和Subject是两个“生产”实体,Observer和Subscriber是两个“消费”实体。

Observable.create()方法构造了一个被观察者Observable对象,同时将new出来的OnSubscribe赋值给了该Observable的成员变量onSubscribe。

Subscriber 继承了 Subscription,用于取消订阅。

public abstract class Subscriber<T> implements Observer<T>, Subscription

事件传递流程

如果传入的是Action,则先封装成Subscriber。对传入的Subscriber进行包装,包装为 SafeSubscriber,SafeSubscribersubscriber的一个代理,对subscriber的一系列方法做了严格的安全校验。保证onCompleted()和onError()只会有一个被执行且只执行一次,一旦它们其中方法被调用过后onNext()就不再执行了。

onStart() 就是在我们调用 subscribe() 的线程执行的。

obsevable.subscribe(observer)的显式调用流程

显式调用流程

obsevable.subscribe(observer)的内部调用流程

subscribe内部调用流程

操作符流程

map操作符流程

Schedulers执行线程

执行线程

线程执行的内部调用过程

subscribeOn 影响它上面的调用执行时所在的线程。

observeOn 影响它下面的调用执行时所在的线程。

subscribeOn与操作符的原理一致,创造一个新的Observable用于进行干预操作,并通过线程池executor最终实现了线程切换。当不指定observeOn时,SubscriberOn()对上下游的线程都有影响。

observeOn切换线程是通过lift来实现。Lift的功能是做包装,将上游对下游的on***()事件传给包装好的Operator。
Operator继承了Function,主要是控制上下游事件发送的速率,最终将上游的事件发送给内部静态类ObserveOnSubscriber(继承了Action)。具体的处理操作会将封装好的Action发送到线程池中。每个observeOn都会对它所管辖的下游Observalbe生效。

通过schedule()将新观察者ObserveOnSubscriber发送给subscriberOne的所有事件换到了recursiveScheduler所对应的线程。subscriberOne的onNext()/onCompleted()/onError()方法丢到了recursiveScheduler对应的线程中执行。recursiveScheduler是一个Worker,在执行schedule()时创建了一个Runnable,在run()方法中调用了observeOnSubscriber.call()。

整个流程传递

ScheduledAction是Runnable,将上游Observable.call()事件和Subscriber.onNext/onError/onCompleted事件都封装成Action事件放入ScheduledAction这个实际的Runnable方法中,并交由Worker的schedule()方法处理。

由于指定了Thread(io/newThread/mainThread),内部会先将ThreadFactory创建的线程放入只有一个核心进程的ScheduledExecutorService线程池中。在scheduler()方法被调用时执行该Runnable。

Scheduler管理Work,Work内部通过线程池ScheduledExecutorService执行call()方法中封装的Runnable对象。

backpressure

backpressure主要通过Producer实现。原理是让subscriber向observable主动请求数据,通过producer成为observable和subscriber的数据通信的协调桥梁。

大多数异步操作符,比如observeOn会有一个限定大小的Buffer,

在内部,Observable通过给Subscriber调用setProducer方法,方便Subscriber之后通过记录onNext()调用频率(即上游下发事件速率),调用Observable.request(n)方法,控制上游Observable发送事件的速率。

hook

在众多节点(创建Observable,获取Scheduler等)时,通过hook可进行任意想要的操作,记录、修饰、甚至抛出异常。
通过RxJavaPlugins及RxJavaHook类对关心的节点(hook point)插桩,让我们可以控制(manipulate)程序在这些节点的行为。

为什么subscribeOn 只有第一次调用生效?

subscribeOn 的作用域就是调用前序列中所有的 Todo List 任务清单(Observable.OnSubscribe),当我们执行 subscribe() 时,这些任务清单就会执行在 subscribeOn 指定的工作线程,而第二个 subscribeOn 早就没有任务可做了,所以无法生效。

参考

RxJava系列6-张磊
拆轮子系列:拆 RxJava
RxJava 线程切换源码的一些体会和思考

相关文章

网友评论

      本文标题:RxJava1.x源码解析

      本文链接:https://www.haomeiwen.com/subject/snjbjxtx.html