美文网首页
RxJava2 源码浅析

RxJava2 源码浅析

作者: 嗯哼嗯哼嗯哼嗯哼 | 来源:发表于2018-10-24 13:47 被阅读0次

    RxJava2 源码浅析

    ReactiveX

    历史:
    ReactiveX是Reactive Extensions的缩写,一般简写为Rx,最初是LINQ的一个扩展,由微软的架构师Erik Meijer领导的团队开发,在2012年11月开源,Rx是一个编程模型,目标是提供一致的编程接口,帮助开发者更方便的处理异步数据流,Rx库支持.NET、JavaScript和C++,Rx近几年越来越流行了,现在已经支持几乎全部的流行编程语言了,Rx的大部分语言库由ReactiveX这个组织负责维护,比较流行的有RxJava/RxJS/Rx.NET,社区网站是 reactivex.io

    定义:
    ReactiveX.io给的定义是,Rx是一个使用可观察数据流进行异步编程的编程接口,ReactiveX结合了观察者模式、迭代器模式和函数式编程的精华。
    ReactiveX不仅仅是一个编程接口,它是一种编程思想的突破,它影响了许多其它的程序库和框架以及编程语言。

    filter操作符

    这就是数据流?

    RxJava2定义

    a library for composing asynchronous and event-based programs by using observable sequences.(一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库)

    大致流程

    上代码:

    Observable.
                create(new ObservableOnSubscribe<String>() {
                    @Override
                    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                        emitter.onNext("test First");
                        Log.e("TAG", "subScribe test First");
                        emitter.onNext("test Second");
                        Log.e("TAG", "subScribe test Second");
                        emitter.onComplete();
                        Log.e("TAG", "subScribe onComplete");
                    }
                }).
                subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.e("TAG", "onSubscribe");
                        mDisposable = d;
                    }
    
                    @Override
                    public void onNext(String s) {
                        Log.e("TAG", "onNext");
                        Log.e("TAG", s);
                        if (s.equals("test First")) {
                            mDisposable.dispose();
                        }
                    }
    
                    @Override
                    public void onError(Throwable e) {
                        Log.e("TAG", "onError");
                    }
    
                    @Override
                    public void onComplete() {
                        Log.e("TAG", "onComplete");
                    }
                });
    

    刚开始学习RxJava 时,这段代码给我最直观的感受就是,这不就是自己调用自己吗。ObservableEmitter<String> emitter 这个就是下面的subscribe(new Observer<String>())。对吧,我觉得大家应该都是这样的感受吧...

    追踪一下源码:点击create()方法进去看一下:

    @SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }
    
    
    public static <T> Observable<T> onAssembly(Observable<T> source) {
        Function<Observable, Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }
    看了代码onObservableAssembly为null,所以create方法之后这个对象被包装成new ObservableCreate<T>(source),source是外面传进来的。
    

    关键字:io.reactivex.internal.operators.observable.ObservableCreate

    继续看下一个操作符:subscribe(),点进去看一下

    @SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);//observer原样返回,没改动
    
            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
    
            subscribeActual(observer);//关键点
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);
    
            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }
    // RxJavaPlugins.onSubscribe
    
      public static <T> Observer<? super T> onSubscribe(Observable<T> source, Observer<? super T> observer) {
        BiFunction<Observable, Observer, Observer> f = onObservableSubscribe;   //f 为null
        if (f != null) {
            return apply(f, source, observer);
        }
        return observer;
    }
    
    上面说过了经过create()方法或这个对象已经是ObservableCreate了,那么最终会调用的就是subscribeActual(observer) 看一下ObservableCreate这个类的代码:
    
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);
    
        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
    
    
    static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {
    CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }
    
        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }
    }
    

    看一下,这里会把observer包装成一个CreateEmitter对象,然后source是Observable.create(new ObservableOnSubscribe<String>())传进来的ObservableOnSubscribe对象。然后会调用observer.onSubscribe(parent);source.subscribe(parent);终于清晰了...可以回答上面的问题了,其实Observer和ObservableEmitter可以看成是一个对象,只是对observer做了个包装...

    Scheduler 线程变换(subscribeOn 和 observeOn)

    说到线程变换即线程间通信,因为我是学Android,所以第一印象就是Handler,然后就是Future。看了源码后发现RxJava用的是Future,ScheduledExecutorService,Runnable,二AndroidScheduler就是用Handler的,因为需要切换到Android中的UI线程。

    subscribeOn(Schedulers.newThread())

    点进去看一下:

    public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    
    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }
    
    @Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
    
        s.onSubscribe(parent);
    
        parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
            @Override
            public void run() {
                source.subscribe(parent);
            }
        }));
    }
    }
    
    传进来的是Schedulers.newThread(),点击Schedulers.newThread() 点进去
    发现最终返回的是NewThreadScheduler
    

    关键字:io.reactivex.internal.schedulers.NewThreadScheduler
    io.reactivex.internal.schedulers.NewThreadWorker(真正做线程调度的类)

    发现有scheduler.scheduleDirect(new Runnable())点击进去,最终调用
    public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
    final Worker w = createWorker();

        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    
        w.schedule(new Runnable() {
            @Override
            public void run() {
                try {
                    decoratedRun.run();
                } finally {
                    w.dispose();
                }
            }
        }, delay, unit);
    
        return w;
    }
    最终还是w.schedule(new Runnable()),w就是NewThreadWorker,找到这个类看一下schedule方法,最终会调用:
    
    
    public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, TimeUnit unit, DisposableContainer parent) {
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    
        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
    
        if (parent != null) {
            if (!parent.add(sr)) {
                return sr;
            }
        }
    
        Future<?> f;
        try {
            if (delayTime <= 0) {
                f = executor.submit((Callable<Object>)sr);
            } else {
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            sr.setFuture(f);
        } catch (RejectedExecutionException ex) {
            parent.remove(sr);
            RxJavaPlugins.onError(ex);
        }
    
        return sr;
    }
    

    看到future和executor了,这里就是线程切换

    observeOn(AndroidSchedulers.mainThread())
    ObservableObserveOn 最终调用的是HandlerScheduler和HandlerWorker
    
    HandlerWorker:
    
    @Override
        public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
            if (run == null) throw new NullPointerException("run == null");
            if (delay < 0) throw new IllegalArgumentException("delay < 0: " + delay);
            if (unit == null) throw new NullPointerException("unit == null");
    
            if (disposed) {
                return Disposables.disposed();
            }
    
            run = RxJavaPlugins.onSchedule(run);
    
            ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
    
            Message message = Message.obtain(handler, scheduled);
            message.obj = this; // Used as token for batch disposal of this worker's runnables.
    
            handler.sendMessageDelayed(message, unit.toMillis(delay));
    
            // Re-check disposed state for removing in case we were racing a call to dispose().
            if (disposed) {
                handler.removeCallbacks(scheduled);
                return Disposables.disposed();
            }
    
            return scheduled;
        }
    

    发现是用Handler来做线程切换,Handler管理的Looper是Looper.getMainLooper(),所以把消息发送到了主线程。

    相关文章

      网友评论

          本文标题:RxJava2 源码浅析

          本文链接:https://www.haomeiwen.com/subject/rfottqtx.html