title: rxjava源码解析-图床版
date: 2020-04-15 21:03:02
tags: [android工具源码]
typora-root-url: ./rxjava源码解析
typora-copy-images-to: ./rxjava源码解析
总结
rxjava 里广泛使用了 责任链模式.和观察者模式.只要抓住这个两个主线.理解起来就很容易了
image-20200415211539271附一张网上的图.这里要明确的是,最简单的原理其实就是
Observable.subscribe(observer), 然后.observable(被观察者) 就可以得到observer(观察者)对象.接着就是在subscribe里. 被观察者主动调用观察者的onNext.onComplete或onError.
调用顺序如下
Observable.subscribe(observer)
Observable.subscribeActual(observer)
observer.onSubscribe(Disposable)
ObservableOnSubscribe.subscribe(observer)
observer.onNext()
observer.onComplete()|observer.onError()
可以看到.都是被观察者主动调用观察者的方法.然后被观察者同create方法.把要发送的数据抽象成一个类.ObservableOnSubscribe就是我们要实现来发送数据的被观察者.
接下来的各种操作符的原理.和上边类似.总是每次的都返回了一个新的观察者ObserverB,和一个新的被观察者.ObservableB,对外返回新的的ObservableB,而内部则用观察者ObserverB去观察原有的被观察这.再把请求处理完转换给原有的观察者.这里就相当于 原有的观察者和被观察者都被代理了.
image-20200416113431424看图就比较清晰了.左下的被观察者ObservableB代理了原来的初始Observable的方法.然后在
在总结下 .原有ObservableA 和原有的SubceriberA 关系是ObservableA.subscribe(SubceriberA )
加一个操作符会产生新的ObservableB 和SubceriberB.同返回出去的是ObservableB.
这时候就变成 ObservableB.subscribe(SubceriberA ). 这方法的内部则又调用了ObservableA.subscribe(SubceriberB ), 这时候 就是SubceriberB 处理原有被观察者的onNext.onComplete了.而处理完成后.又执行 SubceriberA.onNext 和SubceriberA.onComplete.把处理后的结果在转发给原有的观察者.
这就形成了一个责任链.
分析
例子
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
e.onNext(4);
}
}).map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return integer + "--";
}
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.subscribe(
new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
}
);
创建数据源
ObservableOnSubscribe
先看这个类.这是我们发送数据源的实现类.内部只有一个函数subscribe( ObservableEmitter<T> e).我们通过这个函数.把被观察的数据通过 ObservableEmitter发送出去
ObservableEmitter的方法有 onNext,onError,onComplete,setDisposable,setCancellable,其实就是发送数据好取消数据..
Observable.create
RxJavaPlugins是一个hook类.用来可以观察到整个rxjava流程的任意步骤.我们可以直接忽略他.
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
可以看到直接创建了ObservableCreate.并传入上边的ObservableOnSubscribe.而ObservableCreate也是一个Observable类.也是被观察者. 到这里也就实现了新的被观察者,并且被返回给外部.
ObservableCreate类讲解
总的来说.所有的被观察这内部都会有一个观察者类, 而被观察者通过subscribeActual来把请求转向上层被观察者.响应由内部类的观察者接收. 内部的观察者处理完成后.再把响应转给下层的观察者. 这就实现了这个被观察者代理类 的代理功能呢.
public final class ObservableCreate<T> extends Observable<T> {
1.保留原有的被观察者
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
2.可以看到创建新的观察者.并由新的观察者CreateEmitter来观察原有的被观察者.
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
3.这句最重要.这里又会调用source的
source.subscribe(parent);
}
4.第一次的观察者只是把请求转发出去.这里代码有删减
static final class CreateEmitter<T> extends implements ObservableEmitter<T>{
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;}
@Override
public void onNext(T t) {
observer.onNext(t);
}
@Override
public void onComplete() {
observer.onComplete();
}
}
}
可见.执行完create后.返回的Observable就是新创建的ObservableCreate
map函数,数据源转换
负责对数据源进行转换.map也是返回一个新的Observable
上边代码如下
map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return integer + "--";
}
})
看方法内部,则是包装了转换的function. 返回ObservableMap 作为一个Observable
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
ObservableMap类讲解
ObservableMap继承AbstractObservableWithUpstream.在向上继承自Observable,同样是Observable的代理类
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
//1.保存上层被观察者.和本次的转换函数function
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
2.同样讨论.新建观察者 来观察上层的Observable被观察者,然后再把请求转发给下次的Observable.也就是这个方法里的形参t
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
3.actual是下层观察者observer
MapObserver(Observer actual, Function mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
4.上层调用他的onNext后.把数据用 function.apply 先处理下.返回的结果在发给下次观察者.
v = mapper.apply(t);
actual.onNext(v);
}
}
看完一个讨论.在看就很明了了. 可以说.这就是一种 "欺上瞒下"的操作.
subscribeOn,切换数据发射线程
用来调度在哪个线程发送数据.返回的同样是个observable 的子类ObservableSubscribeOn.
我们可以想一下.这里既然是线程切换.而且是指定发射源.那么肯定就是在指定的线程向外执行上层的发射操作.
ObservableSubscribeOn
这也是继承AbstractObservableWithUpstream的被观察者.他内部进行现场的切换
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
1.传入的源观察者.和现场调度类
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
2.老规矩.创建新的观察者.这里看到 onSubscribe是在原来线程执行的
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
3.SubscribeTask是runnable.run方法执行source.subscribe(parent);
这里通过scheduler进行了线程切换.把订阅的方法放在指定线程中执行.那么从这个被观察者向上的所有订阅过程.
就都在指定线程中执行了.
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
static final class SubscribeOnObserver<T> implements Observer<T> {
final Observer<? super T> actual;
SubscribeOnObserver(Observer<? super T> actual) {
this.actual = actual;
this.s = new AtomicReference<Disposable>();
}
这里的观察者只是把请求转发给下层的观察者了.注意.这里仍然是在schdule指定的线程中向下层转发的.
因为上层被观察者的subscribe方法发生在指定线程
@Override
public void onNext(T t) {
actual.onNext(t);
}
@Override
public void onError(Throwable t) {
actual.onError(t);
}
@Override
public void onComplete() {
actual.onComplete();
}
}
在指定线程执行订阅过程
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
}
可以看到.很简单.就是把上层的observable.subscribe 放在指定线程执行. 那么从subscribe之后的方法.就都在指定线程执行了.
observeOn切换数据接收线程
指定在哪个线程接收观察的事件.也就是观察者运行在哪个线程.同样会返回一个observable的子类ObservableObserveOn.
ObservableObserveOn
同样还是继承AbstractObservableWithUpstream的一个observable
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
1.Scheduler 负责线程调度,
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
2.创建调度器.这里和subscribOn就不一样了.那里是整个source.subscribe()都在调度器指定的线程里执行.
这里则是在原因线程subscribe.只是观察者传入的这个调度器
protected void subscribeActual(Observer<? super T> observer) {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
3.观察者.可以看到.实现了runnable接口
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
final Observer<? super T> actual;
final Scheduler.Worker worker;
4. 存储上层观察者调用的 队列
SimpleQueue<T> queue;
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.actual = actual;
this.worker = worker;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
5.代码有删减. 这就是获取的queue,在原来线程执行onSubscribe
public void onSubscribe(Disposable s) {
queue = new SpscLinkedArrayQueue<T>(bufferSize);
}
}
6.把上层发送下来的源信息加入队列.然后进行调度
public void onNext(T t) {
queue.offer(t);
schedule();
}
@Override
public void onError(Throwable t) {
error = t;
done = true
schedule();
}
同样还是调度
public void onComplete() {
done = true;
schedule();
}
7.worker是传进来的调度器.而我们这个observer本来就继承自runnable.
这里其实就是加入线程池或者线程中执行runnable.
所以就是在特定线程执行我们这类里run方法.我们跳过调度.直接看run.
run里执行了 drainNormal
void schedule() {
worker.schedule(this);
}
8.简化逻辑.就是循环取出队列的源事件.调用下次observe进行处理.这是在我们指定的观察线程中执行
也就是从这个observer以后的所有observer 都在这个observeOn指定的线程观察
void drainNormal() {
final SimpleQueue<T> q = queue;
final Observer<? super T> a = actual;
for (;;) {
v = q.poll();
a.onNext(v);
}
}
}
public void run() {
drainNormal();
}
}
}
可以看到区别了. subscribeOn 影响他的所有上层的事件发送在指定线程.而ObserverOn觉得他之后的观察者observer在指定的线程监听. 一个是影响上层.一个是影响下层
subscribe
最后的观察者.这个方法返回空.并且是整个观察责任链的中止.也可以是Observable发送事件的开始.
内部调用的subscribeActual是空方法.需要各个Observable来实现.因为这是最后方法.他的执行对象就是我们最后穿件的那个Observable.也就是 observeOn返回的observable.然后在层层向上调用source.subscribe(),来达到最顶层的数据源.
public final void subscribe(Observer<? super T> observer) {
subscribeActual(observer);
}
}
来一个网图.慢慢看就理解了.
img线程调度
上文通过Scheduler 来指定线程.这里有两个类Schedulers和Scheduler.
Scheduler是真正的调度器. Schedulers则提供了各种实现好的调度器供选择.
简单看下shecule源码.主要方法就是start.shundown.scheduleDirect.用来调度请求在指定线程.有三个内部类Worker,DisposeTask,PeriodicDirectTask配合调度.
public abstract class Scheduler {
public abstract Worker createWorker();
public void start() {}
public void shutdown() {}
1.直接进行新任务调度.由worker来决定在哪个线程执行任务.
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
2.任务执行线程,按顺序的把任务在某线程执行
public abstract static class Worker implements Disposable {
3.看到.抽象方法.需要子类实现决定在哪个线程执行任务
public abstract Disposable schedule( Runnable run, long delay, TimeUnit unit);
}
4.代码省略.就是对传入的runnable的包装执行.
static final class DisposeTask implements Runnable, Disposable {
final Runnable decoratedRun;
final Worker w;
Thread runner;
DisposeTask(Runnable decoratedRun, Worker w) {
this.decoratedRun = decoratedRun;
this.w = w;
}
@Override
public void run() {
decoratedRun.run();
}
}
}
可以看到.schedule还是把任务用dispostTask包装后.交给Worker来执行了. 而worker的抽象方法schedule由子类实现.具体的线程执行过程.
我们以一个较为复杂的Schedulers.io()来看. 首先明确的是. Schedulers.io()是整个APP都共享的.而针对我们每次的一个observable.subscribe(observer) 只会产生一个worker.任务.加入到io调度中. 每一个数据发射监听.都是在worker.shedule中来执行.
IoScheduler 作为整个程序的一个调度测量.里边使用了工作池.用来复用worker
public final class IoScheduler extends Scheduler {
创造工作线程池的工厂
static final RxThreadFactory WORKER_THREAD_FACTORY;
final ThreadFactory threadFactory;
保存复用worker 的对象池
final AtomicReference<CachedWorkerPool> pool;
默认对象池,数量为空.
static final CachedWorkerPool NONE;
1.创建schedule. pool是workder的对象复用池.默认为空
public IoScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
this.pool = new AtomicReference<CachedWorkerPool>(NONE);
start();
}
2. 开始时初始化对象池
public void start() {
CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);
if (!pool.compareAndSet(NONE, update)) {
update.shutdown();
}
}
3.结束时回收对象池. 重新置位空 然后对象池执行shutdown
public void shutdown() {
for (;;) {
CachedWorkerPool curr = pool.get();
if (curr == NONE) {
return;
}
if (pool.compareAndSet(curr, NONE)) {
curr.shutdown();
return;
}
}
}
创建 eventLoopWorker. 这个worker其实是一个包装类
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
对象池中worker梳理
public int size() {
return pool.get().allWorkers.size();
}
}
这个ioschedule中其实并没有太多东西.主要是一个worker的对象池. 更多的功能在他内部的几个内部类中.CachedWorkerPool 是真正的worker对象池.EventLoopWorker.对worker 包装.这个忽略不看ThreadWorker继承自NewThreadWorker,是真正执行observable观察事件的对象.
CachedWorkerPool
work对象池.一个ioschedule只有一个pool
static final class CachedWorkerPool implements Runnable {
1.保存worker的队列
private final ConcurrentLinkedQueue<ThreadWorker> expiringWorkerQueue;
final CompositeDisposable allWorkers;
2.清理无用worker的线程池.他执行的对象就是CachedWorkerPool.并且只有一个对象.定期执行清理工作
private final ScheduledExecutorService evictorService;
3.创建线程池.传入this. 定期执行run方法进行清理
CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
ScheduledExecutorService evictor = null;
if (unit != null) {
evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
}
evictorService = evictor;
evictorTask = task;
}
4 run方法清理过去worker
public void run() {
evictExpiredWorkers();
}
5.很简单.每个worker有个过期时间.和现在对吧.超过了就从worker队列中删除
void evictExpiredWorkers() {
long currentTimestamp = now();
for (ThreadWorker threadWorker : expiringWorkerQueue) {
if (threadWorker.getExpirationTime() <= currentTimestamp) {
if (expiringWorkerQueue.remove(threadWorker)) {
allWorkers.remove(threadWorker);
}
}
}
}
6.从对象池中获取一个worker.有就返回,没有就创建一个ThreadWorker在返回
ThreadWorker get() {
while (!expiringWorkerQueue.isEmpty()) {
ThreadWorker threadWorker = expiringWorkerQueue.poll();
if (threadWorker != null) {
return threadWorker;
}
}
ThreadWorker w = new ThreadWorker(threadFactory);
allWorkers.add(w);
return w;
}
}
核心功能就是缓存worker. 定亲清理过期的worker. 当然这个过期的worker是得先被release释放后的.
ThreadWorker和NewThreadWorker
继承NewThreadWorker.是真正的事件执行者.对NewThreadWorker的包装.加入了 一个过期时间.我们看他的父类
这里还要记住.我们写的每个observable这一套.对应一个Worker.
看NewThreadWorker源码
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
1内部初始化了一个线程池
private final ScheduledExecutorService executor;
public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);
}
执行任务的真正的地方,这里默认最后一个参数是null,delaytime是0
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
Future<?> f;
可以看到是把run交给线程池去执行了.
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
return sr;
}
可以看到.每个worker都创建了一个线程池. 然后在schedule的时候.把任务交给线程池去完成.
这样在线程池中会调用传入的runnable的run方案, 从而实现在指定线程的调用.这就是上边observableOn和subscribeOn里的线程切换逻辑.
网友评论