美文网首页
Rxjava解析

Rxjava解析

作者: taijielan | 来源:发表于2019-04-12 17:06 被阅读0次

Rxjava git:地址

RxAndroid git:地址

至于为什么提到RxAndroid,切换线程时将会用到。
RxJava中文文档
先说Rxjava 的好处,Rxjava的好处是异步调用,那么Android中已经有Handler,AsynTask了,为什么还需要Rxjava呢?首先是Android中进行耗时操作不能在UI线程,当进行网络请求的时候,需要另开线程,当更新UI时又需要切换到UI线程,首先引起的是代码可阅读性不是强,但是Rxjava通过操作符,实现链式编程,用户可以关注业务逻辑的实现,不用去管线程调度的问题,保持代码的可读性,再有就是Rxjava提供各种操作符,可以将请求过程中的数据更新,过滤成为自己想要的数据。
Rxjava操作符,如何使用可以看下官方文档,下面来看下Rxjava实现。
首先Rxjava中必须要的Observable(被观察者)observer(观察者),因为只有观察者订阅被观察者,被观察者中的事件才会发送发送出来。
看一个Rxjava的简单实现。

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("111");
            }
        }).subscribe(new Observer<Object>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.e("info","onSubscribe");
            }
            @Override
            public void onNext(Object o) {
                Log.e("info","onNext");
            }
            @Override
            public void onError(Throwable e) {
                Log.e("info","onError");
            }
            @Override
            public void onComplete() {
                Log.e("info","onComplete");
            }
        });

首先来看下Observer对象 ,如果其中onError,onComplete这俩个方法不能同时执行,如果俩个方法已经执行了了,则不能调用onNext方法了,也就是当前ObservableObserver已经无订阅关系了。
再来看下 为什么要先说只有观察者订阅被观察者之后才能发送消息呢?看下subscribe方法,是将ObservableObserver连接起来,看下其中的实现

    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
    
       ...
          //关键的实现
            subscribeActual(observer);
        ...
    }

发现调用了subscribeActual(observer)方法,这是一个抽象方法。然后再看下Observable.create()方法中

 @CheckReturnValue
    @NonNull
    @SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }

获取一个Observable 的真正实例ObservableCreate,看下它的subscribeActual(observer)方法

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);//1
        observer.onSubscribe(parent);//2
        try {
            source.subscribe(parent);//3
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

CreateEmitterextends Disposable这个类的主要作用是让观察者与被观察者建立联系,也通过此将联系中断。看到上面的2,我们知道系统会默认回调Disposable 对象给我们,如果当前Disposable调用dispose(),将会可以手动释放掉当前的联系。
1处将发射器与观察者关联起来,在发射器中存放一个观察者的实例。
3处是将发射器与被观察者关联起来,在被观察中通过发射器发送一个消息,通过发射器转发给关联转发器的观察者对象,所以emitter.onNext("111");那么我们将会在观察者中的onNext(Object o)方法中收到一个消息中CreateEmitter中的接收。

   @Override
        public void onNext(T t) {
           ...
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

上面就是一个简单的Rxjava调用已经接收,以及处理数据流的流程。
如果我们想取消订阅
Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
Log.e("info","--subscribe");
emitter.onNext("11");
// emitter.onComplete();
// emitter.onNext("22");
// emitter.onNext("22");
// emitter.onNext("33");
}
}).subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
disposable =d;
//关键代码
disposable.dispose();

        }

...
}
disposable.dispose() 直接取消订阅即可,我们看下里面如何取消订阅的 disposable其实就是前面的parent的,也就是CreateEmitter实例,CreateEmitter继承自AtomicReference<Disposable>AtomicReference<Disposable>这个类是对对象的原子操作。disposable.dispose()调用

 @Override
        public void dispose() {
            DisposableHelper.dispose(this);
        }

执行的是DisposableHelper.dispose(this);方法

   public static boolean dispose(AtomicReference<Disposable> field) {
        Disposable current = field.get();
        Disposable d = DISPOSED;
    // 如果当前的对象不为DISPOSED
        if (current != d) {
    //设置当前为状态为DISPOSED,并且返回原来的值
            current = field.getAndSet(d);
            if (current != d) {
    // 如果前面的原子引用不为null,则废弃前面的。
                if (current != null) {
                    current.dispose();
                }
                return true;
            }
        }
        return false;
    }

再 onNext之前先判断当前的状态

 public void onNext(T t) {
           ...
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

看下isDisposed ()方法

  @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }

也是调用的DisposableHelper中的方法

 /**
     * Checks if the given Disposable is the common {@link #DISPOSED} enum value.
     * @param d the disposable to check
     * @return true if d is {@link #DISPOSED}
     */
    public static boolean isDisposed(Disposable d) {
        return d == DISPOSED;
    }
下面看下Rxjava的线程切换模型。

*Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler

  • Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
  • Schedulers.io(): I/O操作(读写文件、读写数据库、网络信息交互等)所使用的Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread()更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
  • Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
  • Android 还有一个专用的AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。
Observable.create(new ObservableOnSubscribe<String>() {
                    @Override
                    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                        emitter.onNext("111");
                        Log.e("info","---->"+Thread.currentThread());
                    }
                }).flatMap(new Function<String, ObservableSource<?>>() {
                    @Override
                    public ObservableSource<?> apply(String s) throws Exception {
                        Log.e("info","---->"+Thread.currentThread()); //4
                        return null;
                    }
                }).subscribeOn(Schedulers.io())
                        .observeOn(AndroidSchedulers.mainThread())
                        .subscribe(new Observer<Object>() {
                            @Override
                            public void onSubscribe(Disposable d) {

                                Log.e("info","--onSubscribe" +Thread.currentThread()); //5
                            }

                            @Override
                            public void onNext(Object o) {
                                Log.e("info","--onNext"+Thread.currentThread());
                            }

                            @Override
                            public void onError(Throwable e) {
                                Log.e("info","--onError"+Thread.currentThread());
                            }

                            @Override
                            public void onComplete() {
                                Log.e("info","--onComplete"+Thread.currentThread());
                            }
                        });
            }

Rxjava的订阅流程是自下而上传递的。也即是说先会执行离Observable.subscribe(observer)最近的Obsevable,然后再向上传递。
subscribeOn()表示订阅的线程,observeOn ()表示观察的线程,
subscribeOn表示被观察者中的需要执行操作执行所在的线程,并非指的是当前线程。
先看下subscribeOn()方法,

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
 ...

    @Override
    public void subscribeActual(final Observer<? super T> observer) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);//1
        observer.onSubscribe(parent);//2
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));//3
    }
...
}

具体实现在ObservableSubscribeOn类中

  • 首先从上面2,5可以确认Observer.onSubscribe()是在当前线程执行的。
  • 通过查看日志4 可以确认订阅流程是在新开的线程中。
    下面分析代3:
  • 首先3中的schedulerIoScheduler
  • parent.setDisposable()方法设置返回的Disposable对象,这个Disposable对象实例是一个DisposeTask对象。
    由外而内的分析,
    parent.setDisposable()方法的调用
 void setDisposable(Disposable d) {
            DisposableHelper.setOnce(this, d);
        }

具体的调用是

 public static boolean setOnce(AtomicReference<Disposable> field, Disposable d) {
        ObjectHelper.requireNonNull(d, "d is null");
        if (!field.compareAndSet(null, d)) {
            d.dispose();
            if (field.get() != DISPOSED) {
                reportDisposableSet();
            }
            return false;
        }
        return true;
    }

默认field字段是无值的,将DisposeTask 传递给SubscribeOnObserver,上面的主要目的是设置 为SubscribeOnObserver默认值,DisposeTask 可以关联到订阅的线程,这样的话,如果取消订阅就可以将正在订阅的任务废弃掉。
事实也是这样,当调用废弃线程时的操作。

 @Override
        public void dispose() {
            if (runner == Thread.currentThread() && w instanceof NewThreadWorker) {
                ((NewThreadWorker)w).shutdown();
            } else {
                w.dispose();
            }
        }

w是 EventLoopWorker实例,所以这个地方调用的是EventLoopWorker.dispose()
现在知道dispose ()的真实调用地址了,
现在知道如何取消ObserverObservable的订阅关系了,那么Observable是如何切换到其他线程的?
我们知道Observable是从下往上传递的,而Observable 中的回调是从上往下传递的。可能听起来很蛋疼,以上面的例子来说

 Observable.create(new ObservableOnSubscribe<Object>() {
               ....
            }).flatMap(new Function<Object, ObservableSource<?>>() {
            ...
             }).subscribeOn(Schedulers.io())
          ...
            .subscribe(Observer<? super T> observer)
             ....

首先这个执行顺序是从subscribe ()调用开始 ,这个很关键,前面说了,如果没有订阅,那么被观察者中的将不会执行,只有在该方法执行的情况下,订阅者与被订阅者才会产生联系。在subscribe ()方法中,

  @SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
          ...
            subscribeActual(observer);
      ...
    }

实现的是抽象方法是subscribeActual (),具体是实现是在子类中,不同子类的实现不同,每个子类中都有上一个ObservableSource的对象source,然后在子类的subscribeActual ()方法中调用上一个Observable(也就是source)的subscribe()方法以此向上传递。到这里也就明白,为什么在subscribeOn(Schedulers.io())中线程切换之后其他Observable中的调用就都是在新的线程了。
如何切换到新线程?实现在scheduler.scheduleDirect(new SubscribeTask(parent))方法中。
SubscribeTask实现Runnable接口,在run方法中

 @Override
        public void run() {
            source.subscribe(parent);
        }

这个source就是上一个ObservableSource对象,parent猜也能猜到是一个Observer对象,现在的任务是要将new SubscribeTask(parent)在一个新线程中执行,就可实现订阅在新线程执行的操作。由此可知scheduler.scheduleDirect必定开了一个新线程,scheduler实现是IoScheduler,具体的调用是在EventLoopWorker类中的父类NewThreadWorker,通过线程池调用。

    public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
    ...
  ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
//decoratedRun 就是前面的 SubscribeTask
     ....
        Future<?> f;
        try {
            if (delayTime <= 0) {
                f = executor.submit((Callable<Object>)sr);
            } else {
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            sr.setFuture(f);
        } catch (RejectedExecutionException ex) {
         ...
        }
        return sr;
    }

启动新线程的订阅的过程分心完成。
至于如何切回到主线程,其实是通过Handler,通过主线程的handler来实现切换。

相关文章

网友评论

      本文标题:Rxjava解析

      本文链接:https://www.haomeiwen.com/subject/vonqwqtx.html