美文网首页
rxjava源码笔记

rxjava源码笔记

作者: 刘佳阔 | 来源:发表于2020-04-27 10:18 被阅读0次

title: rxjava源码解析-图床版
date: 2020-04-15 21:03:02
tags: [android工具源码]
typora-root-url: ./rxjava源码解析
typora-copy-images-to: ./rxjava源码解析


总结

rxjava 里广泛使用了 责任链模式.和观察者模式.只要抓住这个两个主线.理解起来就很容易了

image-20200415211539271

附一张网上的图.这里要明确的是,最简单的原理其实就是

Observable.subscribe(observer), 然后.observable(被观察者) 就可以得到observer(观察者)对象.接着就是在subscribe里. 被观察者主动调用观察者的onNext.onComplete或onError.

调用顺序如下

Observable.subscribe(observer)
Observable.subscribeActual(observer)
observer.onSubscribe(Disposable)
ObservableOnSubscribe.subscribe(observer)
observer.onNext()
observer.onComplete()|observer.onError()

可以看到.都是被观察者主动调用观察者的方法.然后被观察者同create方法.把要发送的数据抽象成一个类.ObservableOnSubscribe就是我们要实现来发送数据的被观察者.

接下来的各种操作符的原理.和上边类似.总是每次的都返回了一个新的观察者ObserverB,和一个新的被观察者.ObservableB,对外返回新的的ObservableB,而内部则用观察者ObserverB去观察原有的被观察这.再把请求处理完转换给原有的观察者.这里就相当于 原有的观察者和被观察者都被代理了.

image-20200416113431424

看图就比较清晰了.左下的被观察者ObservableB代理了原来的初始Observable的方法.然后在

在总结下 .原有ObservableA 和原有的SubceriberA 关系是ObservableA.subscribe(SubceriberA )

加一个操作符会产生新的ObservableB 和SubceriberB.同返回出去的是ObservableB.

这时候就变成 ObservableB.subscribe(SubceriberA ). 这方法的内部则又调用了ObservableA.subscribe(SubceriberB ), 这时候 就是SubceriberB 处理原有被观察者的onNext.onComplete了.而处理完成后.又执行 SubceriberA.onNext 和SubceriberA.onComplete.把处理后的结果在转发给原有的观察者.

这就形成了一个责任链.

分析

例子

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
        e.onComplete();
        e.onNext(4);
    }
}).map(new Function<Integer, String>() {
    @Override
    public String apply(Integer integer) throws Exception {
        return integer + "--";
    }
})
        .subscribeOn(Schedulers.io())
        .observeOn(Schedulers.newThread())
        .subscribe(
                new Observer<String>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(String s) {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                }
        );

创建数据源

ObservableOnSubscribe

先看这个类.这是我们发送数据源的实现类.内部只有一个函数subscribe( ObservableEmitter<T> e).我们通过这个函数.把被观察的数据通过 ObservableEmitter发送出去

ObservableEmitter的方法有 onNext,onError,onComplete,setDisposable,setCancellable,其实就是发送数据好取消数据..

Observable.create

RxJavaPlugins是一个hook类.用来可以观察到整个rxjava流程的任意步骤.我们可以直接忽略他.

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

可以看到直接创建了ObservableCreate.并传入上边的ObservableOnSubscribe.而ObservableCreate也是一个Observable类.也是被观察者. 到这里也就实现了新的被观察者,并且被返回给外部.

ObservableCreate类讲解

总的来说.所有的被观察这内部都会有一个观察者类, 而被观察者通过subscribeActual来把请求转向上层被观察者.响应由内部类的观察者接收. 内部的观察者处理完成后.再把响应转给下层的观察者. 这就实现了这个被观察者代理类 的代理功能呢.


public final class ObservableCreate<T> extends Observable<T> {
    1.保留原有的被观察者
public ObservableCreate(ObservableOnSubscribe<T> source) {
    this.source = source;
}
2.可以看到创建新的观察者.并由新的观察者CreateEmitter来观察原有的被观察者.
 protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        
        observer.onSubscribe(parent);
        3.这句最重要.这里又会调用source的 
        source.subscribe(parent);
    }
 4.第一次的观察者只是把请求转发出去.这里代码有删减   
static final class CreateEmitter<T> extends implements ObservableEmitter<T>{

        final Observer<? super T> observer;
        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;}

        @Override
        public void onNext(T t) {
                observer.onNext(t);
        }
        @Override
        public void onComplete() {
                    observer.onComplete();
        }
    } 
}    

可见.执行完create后.返回的Observable就是新创建的ObservableCreate

map函数,数据源转换

负责对数据源进行转换.map也是返回一个新的Observable

上边代码如下

map(new Function<Integer, String>() {
    @Override
    public String apply(Integer integer) throws Exception {
        return integer + "--";
    }
})
看方法内部,则是包装了转换的function. 返回ObservableMap 作为一个Observable
 public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    }

ObservableMap类讲解

ObservableMap继承AbstractObservableWithUpstream.在向上继承自Observable,同样是Observable的代理类

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;
        //1.保存上层被观察者.和本次的转换函数function
    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }
    2.同样讨论.新建观察者 来观察上层的Observable被观察者,然后再把请求转发给下次的Observable.也就是这个方法里的形参t
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }


    static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;
            3.actual是下层观察者observer
        MapObserver(Observer actual, Function mapper) {
            super(actual);
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
        4.上层调用他的onNext后.把数据用 function.apply 先处理下.返回的结果在发给下次观察者.
         v = mapper.apply(t);
         actual.onNext(v);
        }
    }

看完一个讨论.在看就很明了了. 可以说.这就是一种 "欺上瞒下"的操作.

subscribeOn,切换数据发射线程

用来调度在哪个线程发送数据.返回的同样是个observable 的子类ObservableSubscribeOn.

我们可以想一下.这里既然是线程切换.而且是指定发射源.那么肯定就是在指定的线程向外执行上层的发射操作.

ObservableSubscribeOn

这也是继承AbstractObservableWithUpstream的被观察者.他内部进行现场的切换

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
        1.传入的源观察者.和现场调度类
    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }
        2.老规矩.创建新的观察者.这里看到 onSubscribe是在原来线程执行的
        public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

        s.onSubscribe(parent);
    3.SubscribeTask是runnable.run方法执行source.subscribe(parent);
    这里通过scheduler进行了线程切换.把订阅的方法放在指定线程中执行.那么从这个被观察者向上的所有订阅过程.
    就都在指定线程中执行了.
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

    static final class SubscribeOnObserver<T>  implements Observer<T> {

        final Observer<? super T> actual;
        SubscribeOnObserver(Observer<? super T> actual) {
            this.actual = actual;
            this.s = new AtomicReference<Disposable>();
        }
            这里的观察者只是把请求转发给下层的观察者了.注意.这里仍然是在schdule指定的线程中向下层转发的.
            因为上层被观察者的subscribe方法发生在指定线程
        @Override
        public void onNext(T t) {
            actual.onNext(t);
        }
        @Override
        public void onError(Throwable t) {
            actual.onError(t);
        }

        @Override
        public void onComplete() {
            actual.onComplete();
        }

    }
        在指定线程执行订阅过程
    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            source.subscribe(parent);
        }
    }
}

可以看到.很简单.就是把上层的observable.subscribe 放在指定线程执行. 那么从subscribe之后的方法.就都在指定线程执行了.

observeOn切换数据接收线程

指定在哪个线程接收观察的事件.也就是观察者运行在哪个线程.同样会返回一个observable的子类ObservableObserveOn.

ObservableObserveOn

同样还是继承AbstractObservableWithUpstream的一个observable

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    1.Scheduler 负责线程调度,
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

    2.创建调度器.这里和subscribOn就不一样了.那里是整个source.subscribe()都在调度器指定的线程里执行.
    这里则是在原因线程subscribe.只是观察者传入的这个调度器
    protected void subscribeActual(Observer<? super T> observer) {
            Scheduler.Worker w = scheduler.createWorker();
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    }
        3.观察者.可以看到.实现了runnable接口
    static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
    implements Observer<T>, Runnable {

        final Observer<? super T> actual;
        final Scheduler.Worker worker;
            4.  存储上层观察者调用的 队列
        SimpleQueue<T> queue;

        ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
            this.actual = actual;
            this.worker = worker;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }

        5.代码有删减. 这就是获取的queue,在原来线程执行onSubscribe
        public void onSubscribe(Disposable s) {
                queue = new SpscLinkedArrayQueue<T>(bufferSize);

            }
        }

        6.把上层发送下来的源信息加入队列.然后进行调度
        public void onNext(T t) {
                queue.offer(t);
            schedule();
        }

        @Override
        public void onError(Throwable t) {
         error = t;
         done = true
            schedule();
        }

        同样还是调度
        public void onComplete() {
         done = true;
            schedule();
        }
                7.worker是传进来的调度器.而我们这个observer本来就继承自runnable.
                这里其实就是加入线程池或者线程中执行runnable.
                所以就是在特定线程执行我们这类里run方法.我们跳过调度.直接看run.
                run里执行了 drainNormal
        void schedule() {
             worker.schedule(this);
        }
            8.简化逻辑.就是循环取出队列的源事件.调用下次observe进行处理.这是在我们指定的观察线程中执行
            也就是从这个observer以后的所有observer 都在这个observeOn指定的线程观察
        void drainNormal() {
            final SimpleQueue<T> q = queue;
            final Observer<? super T> a = actual;

                for (;;) {
                        v = q.poll();
                    a.onNext(v);
                }
            }
        }

        public void run() {
            drainNormal();
        }

    }
}

可以看到区别了. subscribeOn 影响他的所有上层的事件发送在指定线程.而ObserverOn觉得他之后的观察者observer在指定的线程监听. 一个是影响上层.一个是影响下层

subscribe

最后的观察者.这个方法返回空.并且是整个观察责任链的中止.也可以是Observable发送事件的开始.

内部调用的subscribeActual是空方法.需要各个Observable来实现.因为这是最后方法.他的执行对象就是我们最后穿件的那个Observable.也就是 observeOn返回的observable.然后在层层向上调用source.subscribe(),来达到最顶层的数据源.

public final void subscribe(Observer<? super T> observer) {
        subscribeActual(observer);
    }
}

来一个网图.慢慢看就理解了.

img

线程调度

上文通过Scheduler 来指定线程.这里有两个类Schedulers和Scheduler.

Scheduler是真正的调度器. Schedulers则提供了各种实现好的调度器供选择.

简单看下shecule源码.主要方法就是start.shundown.scheduleDirect.用来调度请求在指定线程.有三个内部类Worker,DisposeTask,PeriodicDirectTask配合调度.

public abstract class Scheduler {
     public abstract Worker createWorker();
     public void start() {}
     public void shutdown() {}
     1.直接进行新任务调度.由worker来决定在哪个线程执行任务.
        public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();

        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        DisposeTask task = new DisposeTask(decoratedRun, w);

        w.schedule(task, delay, unit);

        return task;
    }
    2.任务执行线程,按顺序的把任务在某线程执行
      public abstract static class Worker implements Disposable {
    3.看到.抽象方法.需要子类实现决定在哪个线程执行任务  
      public abstract Disposable schedule( Runnable run, long delay, TimeUnit unit);

      }
      
    4.代码省略.就是对传入的runnable的包装执行. 
    static final class DisposeTask implements Runnable, Disposable {
        final Runnable decoratedRun;
        final Worker w;

        Thread runner;

        DisposeTask(Runnable decoratedRun, Worker w) {
            this.decoratedRun = decoratedRun;
            this.w = w;
        }

        @Override
        public void run() {
                decoratedRun.run();
        }

    }
}

可以看到.schedule还是把任务用dispostTask包装后.交给Worker来执行了. 而worker的抽象方法schedule由子类实现.具体的线程执行过程.

我们以一个较为复杂的Schedulers.io()来看. 首先明确的是. Schedulers.io()是整个APP都共享的.而针对我们每次的一个observable.subscribe(observer) 只会产生一个worker.任务.加入到io调度中. 每一个数据发射监听.都是在worker.shedule中来执行.

IoScheduler 作为整个程序的一个调度测量.里边使用了工作池.用来复用worker

public final class IoScheduler extends Scheduler {
    创造工作线程池的工厂
        static final RxThreadFactory WORKER_THREAD_FACTORY;
    final ThreadFactory threadFactory;
    保存复用worker 的对象池
    final AtomicReference<CachedWorkerPool> pool;
    默认对象池,数量为空.
    static final CachedWorkerPool NONE;
    1.创建schedule. pool是workder的对象复用池.默认为空
    public IoScheduler(ThreadFactory threadFactory) {
        this.threadFactory = threadFactory;
        this.pool = new AtomicReference<CachedWorkerPool>(NONE);
        start();
    }

    2. 开始时初始化对象池
    public void start() {
        CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);
        if (!pool.compareAndSet(NONE, update)) {
            update.shutdown();
        }
    }
    3.结束时回收对象池. 重新置位空 然后对象池执行shutdown
    public void shutdown() {
        for (;;) {
            CachedWorkerPool curr = pool.get();
            if (curr == NONE) {
                return;
            }
            if (pool.compareAndSet(curr, NONE)) {
                curr.shutdown();
                return;
            }
        }
    }
        创建 eventLoopWorker. 这个worker其实是一个包装类
    public Worker createWorker() {
        return new EventLoopWorker(pool.get());
    }
        对象池中worker梳理
    public int size() {
        return pool.get().allWorkers.size();
    }
}

这个ioschedule中其实并没有太多东西.主要是一个worker的对象池. 更多的功能在他内部的几个内部类中.CachedWorkerPool 是真正的worker对象池.EventLoopWorker.对worker 包装.这个忽略不看ThreadWorker继承自NewThreadWorker,是真正执行observable观察事件的对象.

CachedWorkerPool

work对象池.一个ioschedule只有一个pool

static final class CachedWorkerPool implements Runnable {
        1.保存worker的队列
    private final ConcurrentLinkedQueue<ThreadWorker> expiringWorkerQueue;
    final CompositeDisposable allWorkers;
    2.清理无用worker的线程池.他执行的对象就是CachedWorkerPool.并且只有一个对象.定期执行清理工作
    private final ScheduledExecutorService evictorService;
        3.创建线程池.传入this. 定期执行run方法进行清理
 CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
            
            ScheduledExecutorService evictor = null;
            if (unit != null) {
                evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
                task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
            }
            evictorService = evictor;
            evictorTask = task;
        }
            4 run方法清理过去worker
        public void run() {
            evictExpiredWorkers();
        }
                5.很简单.每个worker有个过期时间.和现在对吧.超过了就从worker队列中删除
               void evictExpiredWorkers() {
                long currentTimestamp = now();

                for (ThreadWorker threadWorker : expiringWorkerQueue) {
                    if (threadWorker.getExpirationTime() <= currentTimestamp) {
                        if (expiringWorkerQueue.remove(threadWorker)) {
                            allWorkers.remove(threadWorker);
                        }
                    } 
                }
        }
        6.从对象池中获取一个worker.有就返回,没有就创建一个ThreadWorker在返回
         ThreadWorker get() {
            while (!expiringWorkerQueue.isEmpty()) {
                ThreadWorker threadWorker = expiringWorkerQueue.poll();
                if (threadWorker != null) {
                    return threadWorker;
                }
            }
            ThreadWorker w = new ThreadWorker(threadFactory);
            allWorkers.add(w);
            return w;
        }        
}

核心功能就是缓存worker. 定亲清理过期的worker. 当然这个过期的worker是得先被release释放后的.

ThreadWorker和NewThreadWorker

继承NewThreadWorker.是真正的事件执行者.对NewThreadWorker的包装.加入了 一个过期时间.我们看他的父类

这里还要记住.我们写的每个observable这一套.对应一个Worker.

看NewThreadWorker源码

public class NewThreadWorker extends Scheduler.Worker implements Disposable {
    1内部初始化了一个线程池
    private final ScheduledExecutorService executor;

    public NewThreadWorker(ThreadFactory threadFactory) {
        executor = SchedulerPoolFactory.create(threadFactory);
    }
    
    执行任务的真正的地方,这里默认最后一个参数是null,delaytime是0
       public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
 
        Future<?> f;
        可以看到是把run交给线程池去执行了.
            if (delayTime <= 0) {
                f = executor.submit((Callable<Object>)sr);
            } else {
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            sr.setFuture(f);

        return sr;
    }

可以看到.每个worker都创建了一个线程池. 然后在schedule的时候.把任务交给线程池去完成.

这样在线程池中会调用传入的runnable的run方案, 从而实现在指定线程的调用.这就是上边observableOn和subscribeOn里的线程切换逻辑.

相关文章

网友评论

      本文标题:rxjava源码笔记

      本文链接:https://www.haomeiwen.com/subject/zaaewhtx.html