RxJava进阶那些事

作者: 唠嗑008 | 来源:发表于2019-11-04 17:13 被阅读0次

    前言

    rxjava很多人都用过,但是你真的对它足够了解吗?不妨来看看文中的这些问题你能否全部答对。阅读本文,你可以有如下收获:
    1、熟悉RxJava主要操作符的应用和使用场景;
    2、对RxJava原理的核心逻辑有清楚的解释,给看不懂源码的你一个更清晰的思路;
    3、RxJava的原理、设计思想和最大卖点是什么?

    目前很多人都在用【retrofit+okhttp+rxjava】做网络请求,这似乎成了Android开发的标配了。但是你真的熟悉/了解这几个工具吗?先不用着急回答,我先抛出几个问题,如果你都掌握了,那本文大多数内容,你已经不用看了;如果你觉得自己回答的不是很好,那本文应该可以让你在网络请求的实战思维上有所收获。这里以RxJava为例:

    • Q1:你用过RxJava的哪些功能?除了如下网络请求异步操作,还有别的吗?(提示:操作符)
    public interface ApiService {
        @GET("/")
        Observable<Login> getData();
    }
    ApiService apiService = retrofit.create(ApiService.class);
    Observable<Login> observable =  apiService.getData();
    observable.subscribeOn(Schedulers.io())                
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Observer<Login>() {
                        @Override
                        public void onSubscribe(Disposable d) {
                            Log.e(TAG, "onSubscribe: ");
                        }
    
                        @Override
                        public void onNext(Login login) {
                            Log.e(TAG, "onNext: ");
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.e(TAG, "onError: " + e.toString());
                        }
    
                        @Override
                        public void onComplete() {
                            Log.e(TAG, "onComplete: ");
                        }
                    });
    
    
    • Q2:你项目中是如何处理嵌套(连续)的网络请求的?比如说登陆成功了,再根据uid去获取用户相关信息。
    • Q3:你是如何处理多请求,单输出的?也就是说获取多个接口数据,然后合并结果做下一步数据处理
    • Q4:rxJava的线程调度器是如何实现线程切换的?
    • Q5:rxJava的原理是什么?与其他的异步通信机制HandlerAsyncTask有什么区别?

    关于rxjava,你最少且必要知道这些


    1、操作符分类与详解
    2、优点,使用场景
    3、线程调度器
    4、原理和设计模式

    关于1,2就不在这里细说了,推荐几篇非常优秀的系列文章。
    备注:上面的Q1~Q3的答案也在这里面

    RxJava2操作符详解
    RxJava2实战应用解析
    RxJava2线程切换原理(上)
    RxJava2线程切换原理(下)

    使用场景:

    • 单请求异步处理
    • 嵌套请求(flatmap)
    • 多请求合并处理(多输入,单输出)
    • 定时轮训
    • 错误重连

    线程调度器

    注意:这部分,我希望你是看过源码之后再来阅读,不然你会看懵的,因为这不是详细的源码解读,而是给那些看不懂源码,或者说看了之后很懵、找不到关键点的同学看的。

    这里涉及2个方法

    切换上游线程
    .subscribeOn(Schedulers.io()) 
    切换下游线程
    .observeOn(AndroidSchedulers.mainThread())
    

    先抛出几个问题

    • 线程切换是如何实现的?(提示:Thread,Handler)
    • subscribeOnobserveOn顺序变换有影响吗?observeOn放在其他操作符比如flatmap之前,线程有什么变化?subscribeOn在整个链式调用不同的位置有什么不同吗?(这里需要搞清楚)

    RxJava2的订阅原理是执行subscribe时调用Observable的各个子类的subscribeActual方法,这个方法最终会调用observer相关的订阅方法。

    这里我是不会通过完整的代码去告诉你上游是如何切换到子线程,下游最后又如何切换到主线程的,我只是想把切换的核心点、主干逻辑告诉你,这样你再去看源码的时候就不会迷茫了。

    subscribeOn(Schedulers.io())
    这里以常用的写法为例,把上游切换到子线程执行。
    注意:这里会吧subscribeOn()Schedulers.io()拆开来讲。

    首先subscribeOn()返回的ObservableSubscribeOn内部是将订阅操作subscribe放到一个Runnable执行

    //ObservableSubscribeOn.SubscribeTask
    
    @Override
        public void subscribeActual(final Observer<? super T> s) {
            final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
    
            s.onSubscribe(parent);
          //重点在这里
            parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
        }
    
    final class SubscribeTask implements Runnable {
            private final SubscribeOnObserver<T> parent;
    
            SubscribeTask(SubscribeOnObserver<T> parent) {
                this.parent = parent;
            }
    
            @Override
            public void run() {
    //上游的Observable和observer的订阅
                source.subscribe(parent);
            }
        }
    

    scheduler.scheduleDirect()内部也仅仅是对Runnable的再次包装,并没有创建子线程的操作。

    Schedulers.io()内部会创建线程池,并把上面定义的Runnable放到子线程操作。Schedulers.io()返回一个IoScheduler,在它内部经过一系列调用,最终在NewThreadWorker创建了线程池,然后通过线程池中的子线程去执行上面订阅的Runnable,至此上游的线程切换就完成了。

    创建线程池
    IoScheduler--->createWorker()--->EventLoopWorker--->ThreadWorker--->NewThreadWorker 构造方法
    Runnable任务执行
    IoScheduler--->createWorker()--->EventLoopWorker--->schedule()--->threadWorker.scheduleActual()---NewThreadWorker.scheduleActual()

    observeOn(AndroidSchedulers.mainThread())
    切换下游线程

    首先observeOn()最终会调用ObservableObserveOn. subscribeActual。重点关注scheduler.createWorker(),这个Worker 对象实际上是在AndroidSchedulers.mainThread()内部的HandlerScheduler中生成的,并且持有主线程 handler 的引用。

    //ObservableObserveOn.Java
    @Override
        protected void subscribeActual(Observer<? super T> observer) {
            if (scheduler instanceof TrampolineScheduler) {
                source.subscribe(observer);
            } else {
                //重点在这里
                Scheduler.Worker w = scheduler.createWorker();
    
                source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
            }
        }
    

    接着看ObserveOnObserver,它内部会调用worker.schedule(this);这里实际上调用的是HandlerWorker.schedule(),这里面会有一个主线程的Handler对象,然后把特定的线程任务通过handler.sendMessageDelayed() 方法转移到主线中去执行。

    至此,关于线程切换的主要逻辑就讲完了。


    RxJava2原理总结

    • RxJava2是基于事件流的异步操作库,事件流是通过subscribe触发的(实际上是subscribeActual),每个Observable子类的subscribeActual实现逻辑不同。

    • 从下往上订阅,事件从上往下发射。

    • 我认为RxJava最大的卖点/亮点是事件驱动型编程,可以通过Observable 的操作符对事件在时间和空间维度进行重新组织,而观察者的逻辑几乎不需要修改。

    相关文章

      网友评论

        本文标题:RxJava进阶那些事

        本文链接:https://www.haomeiwen.com/subject/uhobmctx.html