RxJava2 源码学习

作者: 看我眼前007 | 来源:发表于2017-11-02 17:28 被阅读1152次
    基于 RxJava 2.1.6 RxJava github 地址

    为什么要用 RxJava

    简洁!简洁!简洁!(重要的事情说三遍)

    RxJava 最大的优点就是简洁。简洁的代码能让人心旷神怡,减少 bug 。

    RxJava 是一种新的编程模式 响应式编程

    响应式编程是一种基于异步数据流概念的编程模式。
    数据流就像一条河:它可以被观测,被过滤,被操作,或者为新的消费者与另外一条流合并为一条新的流。
    

    以上是 RxJava Essentials 中文翻译版 对响应式编程的介绍。使用 RxJava 可以让我们在 java 语言中体验什么是响应式编程。响应式编程有两个重要概念

    1. 基于异步
    2. 数据流
    

    如何使用 RxJava

    RxJava 的设计理念基于 观察者模式

    在 RxJava 中首先明白有两个对象观察者被观察者

    观察者和被观察者可以存在不同的线程之中,所以存在观测线程被观测线程*

    观察者和被观察者通过 subscribe 发生『订阅』关系。

    RxJava 提供一些列的链式调用,使用起来如下:

    Observable
        .create(...)
        .observeOn(...)
        .subscribeOn(...)
        .subscribe(...)    
    

    subscribe() 方法为链式调用的最后一层,create() 和 subscribe() 方式之前可以任意设置其他操作。

    上面提到『响应式编程』中的数据流就像是一条河。
    create() 方法可以比喻为河流的『上游发源地』,subscribe() 则为河流的『入海口』。
    在这两个方法之间我们可以添加观察、过滤等操作。
    

    在链式调用中增加一些数据处理

    Observable
        .create(...)
        .observeOn(...)
        .subscribeOn(...)
        .map(...)
        .filter(...)
        .subscribe(...) 
    

    在 RxJava2 中提供一系列可观测对象(也就是上面链式调用的 Observable 等同功能)

    • io.reactivex.Flowable
    • io.reactivex.Observable
    • io.reactivex.Single
    • io.reactivex.Completable
    • io.reactivex.Maybe

    这里我们写一个例子

        Observable.create((ObservableOnSubscribe<String>) e -> {
            for (int i = 0; i < 5; i++) {
                e.onNext(i + "");
            }
        })
                .observeOn(Schedulers.io())
                .subscribeOn(Schedulers.io())
                .map(s -> {
                    System.out.println("map:" + s);
                    return s + "_map";
                })
                .filter(o -> {
                    System.out.println("flat:" + o);
                    if (o.compareTo("3") < 0) {
                        return true;
                    }
                    return false;
                })
                .subscribe(o -> System.out.println("subscribe:" + o));
    

    输出结果如下:

    map:0
    flat:0_map
    subscribe:0_map
    map:1
    flat:1_map
    subscribe:1_map
    map:2
    flat:2_map
    subscribe:2_map
    map:3
    flat:3_map
    map:4
    flat:4_map
    

    上面的例子中,我们在被观察者中发射了 5 个数据源,观察者和被观察着都在同一个线程中,通过 map 对象给每个发射的对象拼接一个 『_map』字符串,通过 filter 过滤了比『3』字符串小的对象。

    所以最终在观察者中接收到了 3 次消息。

    源码分析 RxJava 中的核心代码

    订阅关系的产生

    创建被观察着

    『被观测者』是事件产生的一方,创建方式也有很多种。这里列举一下 Observable 创建的方式

    1. 通过 create() 方法创建

       public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
           ObjectHelper.requireNonNull(source, "source is null");
           return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
       }
      
    2. 通过 just() 方法创建

       public static <T> Observable<T> just(T item) {
           ObjectHelper.requireNonNull(item, "The item is null");
           return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
       }
      
    3. 通过 fromArray() 方法创建

       public static <T> Observable<T> fromArray(T... items) {
           ObjectHelper.requireNonNull(items, "items is null");
           if (items.length == 0) {
               return empty();
           } else
           if (items.length == 1) {
               return just(items[0]);
           }
           return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
       }
      

    其中 ObservableFromArray、ObservableJust、ObservableCreate 都是 Observable 的子类,而 Observable 本身是一个抽象。

    这些子类主要实现 Observe 的抽象方法

    protected abstract void subscribeActual(Observer<? super T> observer);
    

    再看一下 RxJavaPlugins.onAssembly() 方法

    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }
    

    这里需要说明一下 RxJavaPlugins.onAssembly()是一个 Hock,如果不做任何 hock 处理,RxJavaPlugins.onAssembly()会直接返回传入的对象。onObservableAssembly 静态成员变量为 null

    我们用 ObservableCreate 举例

    ObservableCreate 的构造方法需要传入一个 ObservableOnSubscribe 对象

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }
    

    并重载 Observable 的 subscribeActual()

    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);
    
        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
    

    subscribeActual() 方法传入了一个 Observer 对象并且包装到 CreateEmitter 对象中,然后调用
    observer.onSubscribe(parent);source.subscribe(parent);

    创建观察者

    观察者比较简单,需要实现 4 个方法

    new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {
    
        }
    
        @Override
        public void onNext(String s) {
    
        }
    
        @Override
        public void onError(Throwable e) {
    
        }
    
        @Override
        public void onComplete() {
    
        }
    }
    
    订阅

    通常情况下,我们不需要重载 Observer 的每一个方法,RxJava 内部提供了另一个 LambdaObserver 把 Observer 的四个方法拆分为 4 个部分。

    Observable.subscribe() 可以只传入一个 Consumer 对象。

    内部会把 Consumer 包裹在 LambdaObserver 中,并且返回 LambdaObserver

        public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
            Action onComplete, Consumer<? super Disposable> onSubscribe) {
        ObjectHelper.requireNonNull(onNext, "onNext is null");
        ObjectHelper.requireNonNull(onError, "onError is null");
        ObjectHelper.requireNonNull(onComplete, "onComplete is null");
        ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
    
        LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
    
        subscribe(ls);
    
        return ls;
    }
    

    然后调用接收 Observer 的 subscribe() 方法

    @Override
    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);
    
            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
    
            subscribeActual(observer);
        } ……
    }
    

    这里调用了 Observable 的 subscribeActual(observer) 方法。
    这里就完成了 观察者被观察着 之间的订阅关系

    rxJava_01.png

    如下一段代码

        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                for (int i = 0; i < 5; i++) {
                    System.out.println("subscribe:" + i);
                    e.onNext(i + "");
                }
            }
        })
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        System.out.println("accept:" + s);
                    }
                });
    

    调用的时序图如下


    rxJava_02.png

    线程调度原理分析

    上部分分析的订阅关系的创建,都是在当前线程之中。RxJava 可以指定 观察线程观察者线程

    observeOn 原理分析

    Observable 的 observeOn 方法有三个

    • Observable<T> observeOn(Scheduler scheduler)
    • Observable<T> observeOn(Scheduler scheduler, boolean delayError)
    • Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize)

    这三个方法中,前两个方法都会再次调用第三个方法

    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }
    

    我们看到这里创建了一个 ObservableObserveOn 对象,ObservableObserveOnObservable的子类。

    我们看下 ObservableObserveOnsubscribeActual

    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();
    
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
    

    这里我们假设传入的 SchedulerSchedulers.io() 进一步跟踪分析会发现 Schedulers.io() 返回的是 IoScheduler
    所以会走上面代码的 else 分支。

    先忽略 scheduler.createWorker() 过程,先看下 ObserveOnObserver

    这里的 sourceObservableCreate ,而 source.subscribe() 会调用 ObservableCreate.subscribeActual(observer) 然后调用 ObserveOnObserver.onSubscribe() 方法

        @Override
        public void onSubscribe(Disposable s) {
            if (DisposableHelper.validate(this.s, s)) {
                this.s = s;
                if (s instanceof QueueDisposable) {
                    ……
                }
                queue = new SpscLinkedArrayQueue<T>(bufferSize);
                actual.onSubscribe(this);
            }
        }
    

    这里传入的 Disposable 是 CreateEmitter 对象,所以不会走 if 分支。

    然后创建了一个 SpscLinkedArrayQueue 对象,

    紧接着执行 actual.onSubscribe() 也就是 LambdaObserver.onSubscribe()

    后面应该执行的是 ObserveOnObserver.onNext() 方法

        public void onNext(T t) {
            if (done) {
                return;
            }
    
            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            schedule();
        }
    

    看到会把 onNext(T t) 传入参数放入队列之中,然后执行 schedule

        void schedule() {
            if (getAndIncrement() == 0) {
                worker.schedule(this);
            }
        }
    

    worker.schedule() 接收的是一个 Runnable 对象,所以我们从这里可以看出 Observer 的 onNext()onComplete()onError() 等方法都是在线程之中执行。

    接下来我们看下线程的创建

    ObservableObserveOnsubscribeActual() 方法中的

    Scheduler.Worker w = scheduler.createWorker();
    

    Schedulers.io()的跟踪过程比较简单,最终会得到一个 IoScheduler

    public IoScheduler(ThreadFactory threadFactory) {
        this.threadFactory = threadFactory;
        this.pool = new AtomicReference<CachedWorkerPool>(NONE);
        start();
    }
    

    直接看 createWorker()

    public Worker createWorker() {
        return new EventLoopWorker(pool.get());
    }
    

    再看 EventLoopWorker 的构造函数

        EventLoopWorker(CachedWorkerPool pool) {
            this.pool = pool;
            this.tasks = new CompositeDisposable();
            this.threadWorker = pool.get();
        }
    

    继续看下 CachedWorkerPool 的构造方法

        CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
            this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;
            this.expiringWorkerQueue = new ConcurrentLinkedQueue<ThreadWorker>();
            this.allWorkers = new CompositeDisposable();
            this.threadFactory = threadFactory;
    
            ScheduledExecutorService evictor = null;
            Future<?> task = null;
            if (unit != null) {
                evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
                task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
            }
            evictorService = evictor;
            evictorTask = task;
        }      
    

    终于我们找到了线程池相关的代码

     evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
     task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
    

    pool.get() 方法

        ThreadWorker get() {
            if (allWorkers.isDisposed()) {
                return SHUTDOWN_THREAD_WORKER;
            }
            while (!expiringWorkerQueue.isEmpty()) {
                ThreadWorker threadWorker = expiringWorkerQueue.poll();
                if (threadWorker != null) {
                    return threadWorker;
                }
            }
    
            // No cached worker found, so create a new one.
            ThreadWorker w = new ThreadWorker(threadFactory);
            allWorkers.add(w);
            return w;
        }
    

    这里返回了一个 ThreadWorker

    看下 ThreadWorker 的创建过程

        ThreadWorker(ThreadFactory threadFactory) {
            super(threadFactory);
            this.expirationTime = 0L;
        }
    

    看下父类的构造函数

    public NewThreadWorker(ThreadFactory threadFactory) {
        executor = SchedulerPoolFactory.create(threadFactory);
    }
    

    再跟下去

    public static ScheduledExecutorService create(ThreadFactory factory) {
        final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
        if (exec instanceof ScheduledThreadPoolExecutor) {
            ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) exec;
            POOLS.put(e, exec);
        }
        return exec;
    }
    

    这里又出现了一个线程池

    下面开始看 worker.schedule(this)

        public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
            if (tasks.isDisposed()) {
                // don't schedule, we are unsubscribed
                return EmptyDisposable.INSTANCE;
            }
    
            return threadWorker.scheduleActual(action, delayTime, unit, tasks);
        }
    

    继续跟踪下去

    public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    
        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
    
        if (parent != null) {
            if (!parent.add(sr)) {
                return sr;
            }
        }
    
        Future<?> f;
        try {
            if (delayTime <= 0) {
                f = executor.submit((Callable<Object>)sr);
            } else {
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            sr.setFuture(f);
        } catch (RejectedExecutionException ex) {
            if (parent != null) {
                parent.remove(sr);
            }
            RxJavaPlugins.onError(ex);
        }
    
        return sr;
    }
    

    终于我们找到了执行线程的方法
    总结一下真个流程

    rxJava_03.png
    subscribeOn 流程分析

    subscribeOn 方法只有一个

    public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }
    

    和 observeOn 类似,这里把 Observable 包装到 ObservableSubscribeOn 对象中。

    直接看subscribeActual方法

    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
    
        s.onSubscribe(parent);
    
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
    

    这里把 Observe 包裹在 SubscribeOnObserver 中,并执行

        void setDisposable(Disposable d) {
            DisposableHelper.setOnce(this, d);
        }
    

    这只设置了 Disposable 只能被执行一次。

    重点看下 scheduler.scheduleDirect(new SubscribeTask(parent))

    先看一些 SubscribeTask 类

    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;
    
        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }
    
        @Override
        public void run() {
            source.subscribe(parent);
        }
    }
    

    这里看出 SubscribeTask 也是一个 Runnable 在 run 方法中执行 source.subscribe(parent),

    所以 SubscribeTask 是观察者线程的关键

    继续依照 IoScheduler 为例

    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();
    
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    
        DisposeTask task = new DisposeTask(decoratedRun, w);
    
        w.schedule(task, delay, unit);
    
        return task;
    }
    

    这里和 subscribeOn 原理一样了,通过 Worker 对象把观察者的行为设置在线程之中。

    操作符原理分析

    RxJava 里面有很多操作符,这里找一个 map 操作进行分析。其他更复杂的操作不一一分析。

    先看一个例子

        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                for (int i = 0; i < 5; i++) {
                    System.out.println("subscribe:" + i);
                    e.onNext(i + "");
                }
            }
        })
                .map(new Function<String, String>() {
                    @Override
                    public String apply(String s) throws Exception {
                        System.out.println("map:" + s);
                        return s + "_map";
                    }
                })
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        System.out.println("accept:" + s);
                    }
                });
    

    输出结果如下:

    subscribe:0
    map:0
    accept:0_map
    subscribe:1
    map:1
    accept:1_map
    subscribe:2
    map:2
    accept:2_map
    subscribe:3
    map:3
    accept:3_map
    subscribe:4
    map:4
    accept:4_map
    

    可以看 ObservableOnSubscribe 里面每次发送的数据都会到 Function.apply() 方法进行『过滤』,把每个发送的 String 转换一下后再发送给 Consumer

    看下 map() 方法

    public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    }
    

    根据上面的经验,我们直接看 ObservableMap 对象的 subscribeActual()

    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }
    

    然后继续看 MapObserver 对象的 onNext,其中 function 就是我们在 map() 方法中传入的 Function 对象。

    先看构造函数

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }
    

    在看 onNext()

        public void onNext(T t) {
            if (done) {
                return;
            }
    
            if (sourceMode != NONE) {
                actual.onNext(null);
                return;
            }
    
            U v;
    
            try {
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            actual.onNext(v);
        }
    

    可以看出,在执行 actual.onNext(v) 之前,会执行 mapper.apply(t) 从而完成转换。

    其他更复杂的操作符,基本都是在各种特定的 XXXObserver 中的 onNext(T t) 方法中做特殊处理。

    参考资料

    NotRxJava懒人专用指南

    RxJava 从入门到放弃再到不离不弃

    RxJava github 地址

    观察者模式

    RxJava系列6(从微观角度解读RxJava源码)

    Rxjava 2 源码分析

    Rxjava 2 源码分析 (2)

    RxJava Essentials 中文翻译版

    相关文章

      网友评论

        本文标题:RxJava2 源码学习

        本文链接:https://www.haomeiwen.com/subject/ihlapxtx.html