美文网首页
Rxjava2源码分析

Rxjava2源码分析

作者: 快乐的橙橙宝 | 来源:发表于2021-04-01 15:04 被阅读0次

    分析目的

    1. Observable发出数据和Observer接收数据
    2. 如何实现线程调度和操作符原理

    文章仅分析Observable不分析带背压的Flowable

    Observable创建和订阅

    一个常见例子

     Observable.create((ObservableOnSubscribe<String>) emitter -> {
                emitter.onNext("test");
                emitter.onComplete();
            }).map(s -> s + "111")
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Observer<String>() {
                        @Override
                        public void onSubscribe(@NonNull Disposable d) {
    
                        }
    
                        @Override
                        public void onNext(@NonNull String s) {
                            LPLogger.e("onNext:" + s);
                        }
    
                        @Override
                        public void onError(@NonNull Throwable e) {
                            e.printStackTrace();
                            LPLogger.e("onError:" + e.getMessage());
                        }
    
                        @Override
                        public void onComplete() {
                            LPLogger.e("onComplete");
                        }
                    });
        }
    

    以上常见例子展示了Observable 创建,订阅和调度以及转换操作符的整个过程下面分析各个过程

    Observable创建

    Observable.create()方法参数ObservableOnSubscribe

    public interface ObservableOnSubscribe<T> {
        void subscribe(ObservableEmitter<T> e) throws Exception;
    }  
    

    该接口方法subscribe(ObservableEmitter)参数ObservableEmitter实现了onNext(),onError()onComplete()即我们使用发送数据的地方

    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }
    

    其中RxJavaPlugins.onAssembly()方法是hook方法,默认返回原值即ObservableCreate(source)是一个Observable,实现了subscribeActual()此方法是订阅真正执行的方法,先不用关注
    只需要注意Observable.create()方法传入参数ObservableOnSubscribe返回ObservableCreate即可

    Observable订阅

    先不看线程调度和操作符转换处理仅看最简单的部分subscribe()

    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);
            //..
            subscribeActual(observer);
        }
    

    实际是调用的subscribeActual(observer),即前面创建的ObservableCreate里面的subscribeActual()

    protected void subscribeActual(Observer<? super T> observer) {
         //实现自ObservableEmitter
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        //这里是外界的回调
        observer.onSubscribe(parent);
        try {
            //这里source即create传入的ObservableOnSubscribe
            //parent即前面传入的ObservableEmitter,即emitter.onNext("test")中的emitter
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
    

    看下CreateEmitter源码

    static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {
    
        private static final long serialVersionUID = -3434801548987643227L;
    
        final Observer<? super T> observer;
    
        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }
    
        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("xxx"));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }
    
        @Override
        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }
    
        @Override
        public boolean tryOnError(Throwable t) {
            if (t == null) {
                t = new NullPointerException("xxx");
            }
            if (!isDisposed()) {
                try {
                    observer.onError(t);
                } finally {
                    dispose();
                }
                return true;
            }
            return false;
        }
    
        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }
    
    1. 以上onNext(T)调用了observer.onNext(t)即让订阅者接收到发送端数据
      我们可以看到subscribeActual(observer)中的方法已经将ObservableObserver联系起来
    2. 同时注意到CreateEmitter是继承自Disposable即我们可以使用回调中的onSubscribe(Disposable d)中的Disposable去结束Observable发送,当我们subscribe(Consumer)获取的返回值同理
    3. 从上面代码可以得出我们之前记住的一些结论如onError()onComplete()只能调用一次,onSubscribeonNext()之前执行等

    线程调度

    subcribeOn

    subscribeOn(Schedulers.io())返回值类似于create(),返回的是ObservableSubscribeOn<T>(this, scheduler)ObservableCreate<T>(source)多一个线程处理,典型的装饰器模式应用
    源码也和ObservableCreate类似

    public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
        final Scheduler scheduler;
    
        public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
            super(source);
            this.scheduler = scheduler;
        }
    
        @Override
        public void subscribeActual(final Observer<? super T> observer) {
            //和CreateEmitter一样也是包装Observer,最终调用Observer.onNext之类方法
            final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
            observer.onSubscribe(parent);
            //线程切换SubscribeTask是Runnable最终执行的还是source.subscribe(parent);
            //source.subscribe(parent)执行后会执行到ObservableCreate的subscribeActual()
            parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
        }
    

    继续看

    @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        //实际是一个Dispose并设置到ObservableSubscribeOn方便管理任务
        DisposeTask task = new DisposeTask(decoratedRun, w);
        w.schedule(task, delay, unit);
        return task;
    }
    

    执行调用如下不一一展开:
    w.schedule(task, delay, unit)->IoScheduler.EventLoopWorker.schedule->NewThreadWorker.scheduleActual()->ScheduledRunnable.call()->ScheduledRunnable.run()->DisposeTask.run()->new SubscribeTask(parent)
    即在使用线程池执行了source.subscribe(parent)

    observeOn

    observeOn(AndroidSchedulers.mainThread())中创建的是ObservableObserveOn

    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            //创建worker
            Scheduler.Worker w = scheduler.createWorker();
            //还是和source.subscribe(parent)一致
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
    

    查看ObserveOnObserver是继承Runnable

    public void onNext(T t) {
        if (done) {
            return;
        }
        //先把值存储到队列中,然后切换线程处理
        if (sourceMode != QueueDisposable.ASYNC) {
            queue.offer(t);
        }
        schedule();
    }
    
    void schedule() {
        if (getAndIncrement() == 0) {
            worker.schedule(this);
        }
    }
    //run方法运行到这里
    void drainNormal() {
        int missed = 1;
    
        final SimpleQueue<T> q = queue;
        final Observer<? super T> a = downstream;
    
        for (;;) {
            if (checkTerminated(done, q.isEmpty(), a)) {
                return;
            }
    
            for (;;) {
                boolean d = done;
                T v;
    
                try {
                    v = q.poll();
                } catch (Throwable ex) {
                    Exceptions.throwIfFatal(ex);
                    disposed = true;
                    upstream.dispose();
                    q.clear();
                    a.onError(ex);
                    worker.dispose();
                    return;
                }
                boolean empty = v == null;
    
                if (checkTerminated(d, empty, a)) {
                    return;
                }
    
                if (empty) {
                    break;
                }
                //切到线程后再执行onNext
                a.onNext(v);
            }
    
            missed = addAndGet(-missed);
            if (missed == 0) {
                break;
            }
        }
    }
    
    1. ObserveOnObserveronNext()中把发送值存到队列,然后调用schedule()
    2. 调用的是worker.schedule(this);和前面分析subcribeOn一样直接查看run(),此时已经完成线程切换
    3. run()中调用的是drainNormal(),从1中存储的队列中取出值发送
    切换线程分析

    AndroidSchedulers.mainThread()实现

    private static final class MainHolder {
        static final Scheduler DEFAULT
            = new HandlerScheduler(new Handler(Looper.getMainLooper()), false);
    }
    
    private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
            new Callable<Scheduler>() {
                @Override public Scheduler call() throws Exception {
                    return MainHolder.DEFAULT;
                }
            });
    
    /** A {@link Scheduler} which executes actions on the Android main thread. */
    public static Scheduler mainThread() {
        return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
    }
    

    继续看HandlerScheduler

    public Worker createWorker() {
    return new HandlerWorker(handler, async);
    }
    
    private static final class HandlerWorker extends Worker {
    private final Handler handler;
    private final boolean async;
    
    private volatile boolean disposed;
    
    HandlerWorker(Handler handler, boolean async) {
        this.handler = handler;
        this.async = async;
    }
    
    public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
        if (run == null) throw new NullPointerException("run == null");
        if (unit == null) throw new NullPointerException("unit == null");
    
        if (disposed) {
            return Disposables.disposed();
        }
    
        run = RxJavaPlugins.onSchedule(run);
        
        ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
        //设置了runnable()后续发送到主线程会执行run()
        Message message = Message.obtain(handler, scheduled);
        message.obj = this; // Used as token for batch disposal of this worker's runnables.
    
        if (async) {
            message.setAsynchronous(true);
        }
    
        handler.sendMessageDelayed(message, unit.toMillis(delay));
    
        // Re-check disposed state for removing in case we were racing a call to dispose().
        if (disposed) {
            handler.removeCallbacks(scheduled);
            return Disposables.disposed();
        }
    
        return scheduled;
    }
    

    ObservableObserveOn中的subscribeActual()创建的worker就是HandlerWorker

    1. AndroidSchedulers.mainThread() 创建了一个带主线程HandlerHandlerScheduler
    2. schedule()中通过handler.sendMessageDelayed(msg,delay)发送消息到主线程,因为message设置了Runnable(),消息发送到主线程后会调用message.callback.run()从而调用schedule()中的参数run(),即实际完成切换线程回调到ObserveOnObserverrun()
    操作符原理

    仅分析下map操作符,其它操作符类似
    直接看ObservableMap源码

    public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
        final Function<? super T, ? extends U> function;
    
        public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
            super(source);
            this.function = function;
        }
    
        @Override
        public void subscribeActual(Observer<? super U> t) {
            source.subscribe(new MapObserver<T, U>(t, function));
        }
    
        static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
            final Function<? super T, ? extends U> mapper;
    
            MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
                super(actual);
                this.mapper = mapper;
            }
    
            @Override
            public void onNext(T t) {
                if (done) {
                    return;
                }
    
                if (sourceMode != NONE) {
                    downstream.onNext(null);
                    return;
                }
    
                U v;
    
                try {
                    v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
                } catch (Throwable ex) {
                    fail(ex);
                    return;
                }
                downstream.onNext(v);
            }
    
            @Override
            public int requestFusion(int mode) {
                return transitiveBoundaryFusion(mode);
            }
    
            @Nullable
            @Override
            public U poll() throws Exception {
                T t = qd.poll();
                return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
            }
        }
    }
    
    1. 可以看到subscribeActual()直接是source.subscribe(parent)类型,我们直接看MapObserveronNext()
    2. 非聚合模式sourceMode的值是NONE,相当于Observer.onNext(mapper.apply(t))mapper.apply(t)则是我们写的lambda表达式的返回值即s + "111",由此可以看出map是直接把值返回
    总结
    1. subscribeActual()方法中实际完成订阅,subscribe订阅后各个操作符才完成订阅,即订阅是自下而上进行的
    2. 线程操作是通过线程池和Handler完成切换

    相关文章

      网友评论

          本文标题:Rxjava2源码分析

          本文链接:https://www.haomeiwen.com/subject/zfptkltx.html