美文网首页
RxJava-Observable分析

RxJava-Observable分析

作者: 风雪围城 | 来源:发表于2017-06-25 18:16 被阅读0次

    背景

    看完Flowable的流控机制之后,对Observable的应对机制好奇 。希望在以后的应用过程中,不会犯一些不必要的错误。

    环境与目的

    RxJava版本信息如下:

    'io.reactivex.rxjava2:rxandroid:2.0.1'
    'io.reactivex.rxjava2:rxjava:2.1.0'
    

    目的:探究Observable实现异步事件流的原理。

    Obserable的构建过程

    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
            ObjectHelper.requireNonNull(source, "source is null");
            return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
        }
    

    与Flowable构建过程类似,它也是通过ObservableCreate来构建Observable对象。而ObservableOnSubscribe实际上就是一个接口,接口中只有一个方法:

    void subscribe(@NonNull ObservableEmitter<T> e) throws Exception;
    

    用以通过发射器Emitte发射事件。

    subscribe过程

    该过程实际上是执行了subscribeActual,具体代码为:

        protected void subscribeActual(Observer<? super T> observer) {
            CreateEmitter<T> parent = new CreateEmitter<T>(observer);
            //observer.onSubscribe会首先执行
            observer.onSubscribe(parent);
            try {
                //这个source是ObservableOnSubscribe对象
                //该过程实际为通过Emitter发射事件
                //parent就是Emitter,在该方法中为CreateEmitter
                source.subscribe(parent);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                parent.onError(ex);
            }
        }
    

    我们通过Emitter的onNext发射事件,在CreateEmitter中,具体为:

    public void onNext(T t) {
                if (t == null) {
                    onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                    return;
                }
                if (!isDisposed()) {
                    observer.onNext(t);
                }
            }
    

    可以看到,每次onNext发射事件之后,由于生产和消费是在同一个线程中,马上就通过oberver.onNext进行消费。
    即,每发射一个事件,该事件会立即被消费。
    以上,观察者和被观察者者默认都运行在主线程中。

    Observable涉及到不同运行线程时的构建流程

    subscribeOn(Schedulers.newThread())

    主要完成功能是将发射事件的过程置于一个新的线程中。
    该方法实际返回了一个ObservableSubscribeOn对象,当然,这个类是Observable的子类:

    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    

    其subscribeActual为:

    public void subscribeActual(final Observer<? super T> s) {
            //SubscribeOnObserver实现了对Observer的一层包装,将s包装成parent
            //(1)
            final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
            //s 实际就是我们自己定义的Observer
            //(2)
            s.onSubscribe(parent);         
            //(3) 重点在这里
            parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));  (3)
        }
    

    (1) 封装 s 为 parent 。
    (2) 对外暴露parent,实际上s就是我们自己定义的Observer,我们可以在其onSubscribe中获取到parent。
    (3) 首先我们来看一下scheduler。
    我使用了一个新线程Schedulers.newThread()。该scheduler的构建过程如下:

    //NEW_THREAD 是一个 Scheduler
    public static Scheduler newThread() {
            return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD);
        }
    
    //进一步查看NEW_THREAD
    --------->
    NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
    
    //通过NewThreadTask() 的call方法,返回了NEW_THREAD
    ------>
    public Scheduler call() throws Exception {
                return NewThreadHolder.DEFAULT;
            }
    
    //DEFAULT 这个实际上就是我们使用的scheduler的本尊
    ------>
    static final Scheduler DEFAULT = new NewThreadScheduler();
    

    来看看它的scheduleDirect方法:

      public Disposable scheduleDirect(final Runnable run, long delayTime, TimeUnit unit) {
            ScheduledDirectTask task = new ScheduledDirectTask(RxJavaPlugins.onSchedule(run));
            try {
                Future<?> f;
                if (delayTime <= 0L) {
                    //从线程池中拿出线程执行了任务
                    f = executor.submit(task);
                } else {
                    f = executor.schedule(task, delayTime, unit);
                }
                task.setFuture(f);
                return task;
            } catch (RejectedExecutionException ex) {
                RxJavaPlugins.onError(ex);
                return EmptyDisposable.INSTANCE;
            }
        }
    

    而SubscribeTask就是一个Runnable任务,执行:

    public void run() {
                source.subscribe(parent);
            }
    

    以上,就是发射事件能够运行在新线程的根本,同时,后续消费事件默认也运行再该线程中(如果没有主动将消费事件的运行设置在另外一个线程中)。
    我们应该注意到,对外暴露给我们的parent,是一个Disposable,它有一个dispose方法,该方法的具体实现在NewThreadWorker里:

     @Override
        public void dispose() {
            if (!disposed) {
                disposed = true;
                executor.shutdownNow();
            }
        }
    

    它关闭了整个线程池。

    observeOn(AndroidSchedulers.mainThread())

    主要作用是将消费线程置于新的独立线程中。
    该方法同样将返回一个ObservableObserveOn对象,实际执行过程:

    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
            ObjectHelper.requireNonNull(scheduler, "scheduler is null");
            ObjectHelper.verifyPositive(bufferSize, "bufferSize");
            return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
        }
    

    我们可以看到,和subscribeOn相比,这里就有了bufferSize的概念。bufferSize实际上是一个int常量128,不用关注。
    它的subscribeActual方法为:

    protected void subscribeActual(Observer<? super T> observer) {
            if (scheduler instanceof TrampolineScheduler) {
                source.subscribe(observer);
            } else {
                Scheduler.Worker w = scheduler.createWorker();
    
                source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
            }
        }
    

    这里可以看到,事件的消费,实际上是通过ObserveOnObserver对象执行的。该对象的onNext方法为:

    public void onNext(T t) {
                if (done) {
                    return;
                }
    
                if (sourceMode != QueueDisposable.ASYNC) {
                    //将事件先存到队列中
                    queue.offer(t);
                }
                //消费事件
                schedule();
            }
    

    相关文章

      网友评论

          本文标题:RxJava-Observable分析

          本文链接:https://www.haomeiwen.com/subject/jnygfxtx.html