美文网首页
rxjava源码笔记

rxjava源码笔记

作者: 刘佳阔 | 来源:发表于2020-04-27 10:18 被阅读0次

    title: rxjava源码解析-图床版
    date: 2020-04-15 21:03:02
    tags: [android工具源码]
    typora-root-url: ./rxjava源码解析
    typora-copy-images-to: ./rxjava源码解析


    总结

    rxjava 里广泛使用了 责任链模式.和观察者模式.只要抓住这个两个主线.理解起来就很容易了

    image-20200415211539271

    附一张网上的图.这里要明确的是,最简单的原理其实就是

    Observable.subscribe(observer), 然后.observable(被观察者) 就可以得到observer(观察者)对象.接着就是在subscribe里. 被观察者主动调用观察者的onNext.onComplete或onError.

    调用顺序如下

    Observable.subscribe(observer)
    Observable.subscribeActual(observer)
    observer.onSubscribe(Disposable)
    ObservableOnSubscribe.subscribe(observer)
    observer.onNext()
    observer.onComplete()|observer.onError()
    

    可以看到.都是被观察者主动调用观察者的方法.然后被观察者同create方法.把要发送的数据抽象成一个类.ObservableOnSubscribe就是我们要实现来发送数据的被观察者.

    接下来的各种操作符的原理.和上边类似.总是每次的都返回了一个新的观察者ObserverB,和一个新的被观察者.ObservableB,对外返回新的的ObservableB,而内部则用观察者ObserverB去观察原有的被观察这.再把请求处理完转换给原有的观察者.这里就相当于 原有的观察者和被观察者都被代理了.

    image-20200416113431424

    看图就比较清晰了.左下的被观察者ObservableB代理了原来的初始Observable的方法.然后在

    在总结下 .原有ObservableA 和原有的SubceriberA 关系是ObservableA.subscribe(SubceriberA )

    加一个操作符会产生新的ObservableB 和SubceriberB.同返回出去的是ObservableB.

    这时候就变成 ObservableB.subscribe(SubceriberA ). 这方法的内部则又调用了ObservableA.subscribe(SubceriberB ), 这时候 就是SubceriberB 处理原有被观察者的onNext.onComplete了.而处理完成后.又执行 SubceriberA.onNext 和SubceriberA.onComplete.把处理后的结果在转发给原有的观察者.

    这就形成了一个责任链.

    分析

    例子

    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
            e.onNext(1);
            e.onNext(2);
            e.onNext(3);
            e.onComplete();
            e.onNext(4);
        }
    }).map(new Function<Integer, String>() {
        @Override
        public String apply(Integer integer) throws Exception {
            return integer + "--";
        }
    })
            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.newThread())
            .subscribe(
                    new Observer<String>() {
                        @Override
                        public void onSubscribe(Disposable d) {
    
                        }
    
                        @Override
                        public void onNext(String s) {
    
                        }
    
                        @Override
                        public void onError(Throwable e) {
    
                        }
    
                        @Override
                        public void onComplete() {
    
                        }
                    }
            );
    

    创建数据源

    ObservableOnSubscribe

    先看这个类.这是我们发送数据源的实现类.内部只有一个函数subscribe( ObservableEmitter<T> e).我们通过这个函数.把被观察的数据通过 ObservableEmitter发送出去

    ObservableEmitter的方法有 onNext,onError,onComplete,setDisposable,setCancellable,其实就是发送数据好取消数据..

    Observable.create

    RxJavaPlugins是一个hook类.用来可以观察到整个rxjava流程的任意步骤.我们可以直接忽略他.

    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }
    

    可以看到直接创建了ObservableCreate.并传入上边的ObservableOnSubscribe.而ObservableCreate也是一个Observable类.也是被观察者. 到这里也就实现了新的被观察者,并且被返回给外部.

    ObservableCreate类讲解

    总的来说.所有的被观察这内部都会有一个观察者类, 而被观察者通过subscribeActual来把请求转向上层被观察者.响应由内部类的观察者接收. 内部的观察者处理完成后.再把响应转给下层的观察者. 这就实现了这个被观察者代理类 的代理功能呢.

    
    public final class ObservableCreate<T> extends Observable<T> {
        1.保留原有的被观察者
    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }
    2.可以看到创建新的观察者.并由新的观察者CreateEmitter来观察原有的被观察者.
     protected void subscribeActual(Observer<? super T> observer) {
            CreateEmitter<T> parent = new CreateEmitter<T>(observer);
            
            observer.onSubscribe(parent);
            3.这句最重要.这里又会调用source的 
            source.subscribe(parent);
        }
     4.第一次的观察者只是把请求转发出去.这里代码有删减   
    static final class CreateEmitter<T> extends implements ObservableEmitter<T>{
    
            final Observer<? super T> observer;
            CreateEmitter(Observer<? super T> observer) {
                this.observer = observer;}
    
            @Override
            public void onNext(T t) {
                    observer.onNext(t);
            }
            @Override
            public void onComplete() {
                        observer.onComplete();
            }
        } 
    }    
    

    可见.执行完create后.返回的Observable就是新创建的ObservableCreate

    map函数,数据源转换

    负责对数据源进行转换.map也是返回一个新的Observable

    上边代码如下

    map(new Function<Integer, String>() {
        @Override
        public String apply(Integer integer) throws Exception {
            return integer + "--";
        }
    })
    看方法内部,则是包装了转换的function. 返回ObservableMap 作为一个Observable
     public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
            return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
        }
    
    

    ObservableMap类讲解

    ObservableMap继承AbstractObservableWithUpstream.在向上继承自Observable,同样是Observable的代理类

    public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
        final Function<? super T, ? extends U> function;
            //1.保存上层被观察者.和本次的转换函数function
        public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
            super(source);
            this.function = function;
        }
        2.同样讨论.新建观察者 来观察上层的Observable被观察者,然后再把请求转发给下次的Observable.也就是这个方法里的形参t
        public void subscribeActual(Observer<? super U> t) {
            source.subscribe(new MapObserver<T, U>(t, function));
        }
    
    
        static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
            final Function<? super T, ? extends U> mapper;
                3.actual是下层观察者observer
            MapObserver(Observer actual, Function mapper) {
                super(actual);
                this.mapper = mapper;
            }
    
            @Override
            public void onNext(T t) {
            4.上层调用他的onNext后.把数据用 function.apply 先处理下.返回的结果在发给下次观察者.
             v = mapper.apply(t);
             actual.onNext(v);
            }
        }
    

    看完一个讨论.在看就很明了了. 可以说.这就是一种 "欺上瞒下"的操作.

    subscribeOn,切换数据发射线程

    用来调度在哪个线程发送数据.返回的同样是个observable 的子类ObservableSubscribeOn.

    我们可以想一下.这里既然是线程切换.而且是指定发射源.那么肯定就是在指定的线程向外执行上层的发射操作.

    ObservableSubscribeOn

    这也是继承AbstractObservableWithUpstream的被观察者.他内部进行现场的切换

    public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
        final Scheduler scheduler;
            1.传入的源观察者.和现场调度类
        public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
            super(source);
            this.scheduler = scheduler;
        }
            2.老规矩.创建新的观察者.这里看到 onSubscribe是在原来线程执行的
            public void subscribeActual(final Observer<? super T> s) {
            final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
    
            s.onSubscribe(parent);
        3.SubscribeTask是runnable.run方法执行source.subscribe(parent);
        这里通过scheduler进行了线程切换.把订阅的方法放在指定线程中执行.那么从这个被观察者向上的所有订阅过程.
        就都在指定线程中执行了.
            parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
        }
    
        static final class SubscribeOnObserver<T>  implements Observer<T> {
    
            final Observer<? super T> actual;
            SubscribeOnObserver(Observer<? super T> actual) {
                this.actual = actual;
                this.s = new AtomicReference<Disposable>();
            }
                这里的观察者只是把请求转发给下层的观察者了.注意.这里仍然是在schdule指定的线程中向下层转发的.
                因为上层被观察者的subscribe方法发生在指定线程
            @Override
            public void onNext(T t) {
                actual.onNext(t);
            }
            @Override
            public void onError(Throwable t) {
                actual.onError(t);
            }
    
            @Override
            public void onComplete() {
                actual.onComplete();
            }
    
        }
            在指定线程执行订阅过程
        final class SubscribeTask implements Runnable {
            private final SubscribeOnObserver<T> parent;
    
            SubscribeTask(SubscribeOnObserver<T> parent) {
                this.parent = parent;
            }
    
            @Override
            public void run() {
                source.subscribe(parent);
            }
        }
    }
    

    可以看到.很简单.就是把上层的observable.subscribe 放在指定线程执行. 那么从subscribe之后的方法.就都在指定线程执行了.

    observeOn切换数据接收线程

    指定在哪个线程接收观察的事件.也就是观察者运行在哪个线程.同样会返回一个observable的子类ObservableObserveOn.

    ObservableObserveOn

    同样还是继承AbstractObservableWithUpstream的一个observable

    public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
        final Scheduler scheduler;
        final boolean delayError;
        final int bufferSize;
        1.Scheduler 负责线程调度,
        public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
            super(source);
            this.scheduler = scheduler;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }
    
        2.创建调度器.这里和subscribOn就不一样了.那里是整个source.subscribe()都在调度器指定的线程里执行.
        这里则是在原因线程subscribe.只是观察者传入的这个调度器
        protected void subscribeActual(Observer<? super T> observer) {
                Scheduler.Worker w = scheduler.createWorker();
                source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
            3.观察者.可以看到.实现了runnable接口
        static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
        implements Observer<T>, Runnable {
    
            final Observer<? super T> actual;
            final Scheduler.Worker worker;
                4.  存储上层观察者调用的 队列
            SimpleQueue<T> queue;
    
            ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
                this.actual = actual;
                this.worker = worker;
                this.delayError = delayError;
                this.bufferSize = bufferSize;
            }
    
            5.代码有删减. 这就是获取的queue,在原来线程执行onSubscribe
            public void onSubscribe(Disposable s) {
                    queue = new SpscLinkedArrayQueue<T>(bufferSize);
    
                }
            }
    
            6.把上层发送下来的源信息加入队列.然后进行调度
            public void onNext(T t) {
                    queue.offer(t);
                schedule();
            }
    
            @Override
            public void onError(Throwable t) {
             error = t;
             done = true
                schedule();
            }
    
            同样还是调度
            public void onComplete() {
             done = true;
                schedule();
            }
                    7.worker是传进来的调度器.而我们这个observer本来就继承自runnable.
                    这里其实就是加入线程池或者线程中执行runnable.
                    所以就是在特定线程执行我们这类里run方法.我们跳过调度.直接看run.
                    run里执行了 drainNormal
            void schedule() {
                 worker.schedule(this);
            }
                8.简化逻辑.就是循环取出队列的源事件.调用下次observe进行处理.这是在我们指定的观察线程中执行
                也就是从这个observer以后的所有observer 都在这个observeOn指定的线程观察
            void drainNormal() {
                final SimpleQueue<T> q = queue;
                final Observer<? super T> a = actual;
    
                    for (;;) {
                            v = q.poll();
                        a.onNext(v);
                    }
                }
            }
    
            public void run() {
                drainNormal();
            }
    
        }
    }
    

    可以看到区别了. subscribeOn 影响他的所有上层的事件发送在指定线程.而ObserverOn觉得他之后的观察者observer在指定的线程监听. 一个是影响上层.一个是影响下层

    subscribe

    最后的观察者.这个方法返回空.并且是整个观察责任链的中止.也可以是Observable发送事件的开始.

    内部调用的subscribeActual是空方法.需要各个Observable来实现.因为这是最后方法.他的执行对象就是我们最后穿件的那个Observable.也就是 observeOn返回的observable.然后在层层向上调用source.subscribe(),来达到最顶层的数据源.

    public final void subscribe(Observer<? super T> observer) {
            subscribeActual(observer);
        }
    }
    

    来一个网图.慢慢看就理解了.

    img

    线程调度

    上文通过Scheduler 来指定线程.这里有两个类Schedulers和Scheduler.

    Scheduler是真正的调度器. Schedulers则提供了各种实现好的调度器供选择.

    简单看下shecule源码.主要方法就是start.shundown.scheduleDirect.用来调度请求在指定线程.有三个内部类Worker,DisposeTask,PeriodicDirectTask配合调度.

    public abstract class Scheduler {
         public abstract Worker createWorker();
         public void start() {}
         public void shutdown() {}
         1.直接进行新任务调度.由worker来决定在哪个线程执行任务.
            public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
            final Worker w = createWorker();
    
            final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    
            DisposeTask task = new DisposeTask(decoratedRun, w);
    
            w.schedule(task, delay, unit);
    
            return task;
        }
        2.任务执行线程,按顺序的把任务在某线程执行
          public abstract static class Worker implements Disposable {
        3.看到.抽象方法.需要子类实现决定在哪个线程执行任务  
          public abstract Disposable schedule( Runnable run, long delay, TimeUnit unit);
    
          }
          
        4.代码省略.就是对传入的runnable的包装执行. 
        static final class DisposeTask implements Runnable, Disposable {
            final Runnable decoratedRun;
            final Worker w;
    
            Thread runner;
    
            DisposeTask(Runnable decoratedRun, Worker w) {
                this.decoratedRun = decoratedRun;
                this.w = w;
            }
    
            @Override
            public void run() {
                    decoratedRun.run();
            }
    
        }
    }
    

    可以看到.schedule还是把任务用dispostTask包装后.交给Worker来执行了. 而worker的抽象方法schedule由子类实现.具体的线程执行过程.

    我们以一个较为复杂的Schedulers.io()来看. 首先明确的是. Schedulers.io()是整个APP都共享的.而针对我们每次的一个observable.subscribe(observer) 只会产生一个worker.任务.加入到io调度中. 每一个数据发射监听.都是在worker.shedule中来执行.

    IoScheduler 作为整个程序的一个调度测量.里边使用了工作池.用来复用worker

    public final class IoScheduler extends Scheduler {
        创造工作线程池的工厂
            static final RxThreadFactory WORKER_THREAD_FACTORY;
        final ThreadFactory threadFactory;
        保存复用worker 的对象池
        final AtomicReference<CachedWorkerPool> pool;
        默认对象池,数量为空.
        static final CachedWorkerPool NONE;
        1.创建schedule. pool是workder的对象复用池.默认为空
        public IoScheduler(ThreadFactory threadFactory) {
            this.threadFactory = threadFactory;
            this.pool = new AtomicReference<CachedWorkerPool>(NONE);
            start();
        }
    
        2. 开始时初始化对象池
        public void start() {
            CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);
            if (!pool.compareAndSet(NONE, update)) {
                update.shutdown();
            }
        }
        3.结束时回收对象池. 重新置位空 然后对象池执行shutdown
        public void shutdown() {
            for (;;) {
                CachedWorkerPool curr = pool.get();
                if (curr == NONE) {
                    return;
                }
                if (pool.compareAndSet(curr, NONE)) {
                    curr.shutdown();
                    return;
                }
            }
        }
            创建 eventLoopWorker. 这个worker其实是一个包装类
        public Worker createWorker() {
            return new EventLoopWorker(pool.get());
        }
            对象池中worker梳理
        public int size() {
            return pool.get().allWorkers.size();
        }
    }
    

    这个ioschedule中其实并没有太多东西.主要是一个worker的对象池. 更多的功能在他内部的几个内部类中.CachedWorkerPool 是真正的worker对象池.EventLoopWorker.对worker 包装.这个忽略不看ThreadWorker继承自NewThreadWorker,是真正执行observable观察事件的对象.

    CachedWorkerPool

    work对象池.一个ioschedule只有一个pool

    static final class CachedWorkerPool implements Runnable {
            1.保存worker的队列
        private final ConcurrentLinkedQueue<ThreadWorker> expiringWorkerQueue;
        final CompositeDisposable allWorkers;
        2.清理无用worker的线程池.他执行的对象就是CachedWorkerPool.并且只有一个对象.定期执行清理工作
        private final ScheduledExecutorService evictorService;
            3.创建线程池.传入this. 定期执行run方法进行清理
     CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
                
                ScheduledExecutorService evictor = null;
                if (unit != null) {
                    evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
                    task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
                }
                evictorService = evictor;
                evictorTask = task;
            }
                4 run方法清理过去worker
            public void run() {
                evictExpiredWorkers();
            }
                    5.很简单.每个worker有个过期时间.和现在对吧.超过了就从worker队列中删除
                   void evictExpiredWorkers() {
                    long currentTimestamp = now();
    
                    for (ThreadWorker threadWorker : expiringWorkerQueue) {
                        if (threadWorker.getExpirationTime() <= currentTimestamp) {
                            if (expiringWorkerQueue.remove(threadWorker)) {
                                allWorkers.remove(threadWorker);
                            }
                        } 
                    }
            }
            6.从对象池中获取一个worker.有就返回,没有就创建一个ThreadWorker在返回
             ThreadWorker get() {
                while (!expiringWorkerQueue.isEmpty()) {
                    ThreadWorker threadWorker = expiringWorkerQueue.poll();
                    if (threadWorker != null) {
                        return threadWorker;
                    }
                }
                ThreadWorker w = new ThreadWorker(threadFactory);
                allWorkers.add(w);
                return w;
            }        
    }
    

    核心功能就是缓存worker. 定亲清理过期的worker. 当然这个过期的worker是得先被release释放后的.

    ThreadWorker和NewThreadWorker

    继承NewThreadWorker.是真正的事件执行者.对NewThreadWorker的包装.加入了 一个过期时间.我们看他的父类

    这里还要记住.我们写的每个observable这一套.对应一个Worker.

    看NewThreadWorker源码

    public class NewThreadWorker extends Scheduler.Worker implements Disposable {
        1内部初始化了一个线程池
        private final ScheduledExecutorService executor;
    
        public NewThreadWorker(ThreadFactory threadFactory) {
            executor = SchedulerPoolFactory.create(threadFactory);
        }
        
        执行任务的真正的地方,这里默认最后一个参数是null,delaytime是0
           public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
            Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    
            ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
     
            Future<?> f;
            可以看到是把run交给线程池去执行了.
                if (delayTime <= 0) {
                    f = executor.submit((Callable<Object>)sr);
                } else {
                    f = executor.schedule((Callable<Object>)sr, delayTime, unit);
                }
                sr.setFuture(f);
    
            return sr;
        }
    

    可以看到.每个worker都创建了一个线程池. 然后在schedule的时候.把任务交给线程池去完成.

    这样在线程池中会调用传入的runnable的run方案, 从而实现在指定线程的调用.这就是上边observableOn和subscribeOn里的线程切换逻辑.

    相关文章

      网友评论

          本文标题:rxjava源码笔记

          本文链接:https://www.haomeiwen.com/subject/zaaewhtx.html