美文网首页
RxJava2系列第二篇---异步

RxJava2系列第二篇---异步

作者: sofarsogoo_932d | 来源:发表于2018-09-01 17:15 被阅读0次

    目录

    第一篇---基本使用
    第二篇---异步
    第一篇---操作符

    异步

    在该系列第一篇最开始,我们已经说了RxJava是一个异步编程框架,之所以这么说,就是因为它在线程的切换方面非常方便。
    介绍异步之前,我们先看看下面几个方法

    subscribe(Observer<? super T> observer)
    subscribe(Consumer<? super T> onNext)
    subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError)
    subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
            Action onComplete)
    subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
            Action onComplete, Consumer<? super Disposable> onSubscribe)
    

    在基本使用中,我们用的是第一个方法,但此方法需要重写所有的事件,但有的时候我们并不需要对所有的事件进行处理,因此就有了下面几种方法,看参数我们就知道每个方法分别处理的是什么事件,比如第二个方法,只处理next事件,第三个方法,只处理next和error事件等等。

    为了代码的简洁性,接下来我将使用Consumer作为观察者。

    Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                Log.d("TAG","subscribe:"+Thread.currentThread().getName());
                emitter.onNext("1");
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d("TAG","accept:"+Thread.currentThread().getName());
            }
    });
    

    输出结果

    D/TAG: subscribe:main
    D/TAG: accept:main
    

    结果分析
    如果我将上述代码,放在一个子线程中去,发现结果线程的名字将不再是main。说明在哪创建上述代码,则上游和下游就会处于那个线程,并且它们处于同一个线程。

    如果我们要在子线程中发送交易,主线程更新UI,这种情况就满足不了我们的需求了。我们需要的是上线处于子线程,负责发送网络请求,下游处于主线程,负责更新UI。通过RxJava的线程调度器可以轻松实现上述需求。

    在Observable中有两个方法

    subscribeOn(Scheduler scheduler)  //指定上游所在的线程
    observeOn(Scheduler scheduler)  //指定下游所在的线程
    

    先来看看下面的代码

    Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                Log.d("TAG", "subscribe:" + Thread.currentThread().getName());
                emitter.onNext("1");
            }
        });
        Consumer<String> consumer = new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d("TAG", "accept:" +s+":"+ Thread.currentThread().getName());
            }
        };
    //关注点
    observable.subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(consumer);
    

    输出结果

    D/TAG: subscribe:RxNewThreadScheduler-1
    D/TAG: accept:1:main
    

    我们来看看Schedulers和AndroidSchedulers
    这两个类并无继承关系,是相互独立的两个final类

    AndroidSchedulers

    /** A {@link Scheduler} which executes actions on the Android main thread.*/
    public static Scheduler mainThread() {
        return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
    }
    
    /** A {@link Scheduler} which executes actions on {@code looper}. */
    public static Scheduler from(Looper looper) {
        if (looper == null) throw new NullPointerException("looper == null");
        return new HandlerScheduler(new Handler(looper));
    }
    

    前者指定方法在主线程中执行
    后者指定方法在哪个线程执行,由Looper所在的线程决定

    Schedulers
    该调度器里面有下面几个主要方法

    //新的子线程
    public static Scheduler newThread() {
        return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD);
    }
    //计算密集型任务
    public static Scheduler computation() {
        return RxJavaPlugins.onComputationScheduler(COMPUTATION);
    }
    //io密集型任务
     public static Scheduler io() {
        return RxJavaPlugins.onIoScheduler(IO);
    }
    public static Scheduler trampoline() {
        return TRAMPOLINE;
    }
    public static Scheduler single() {
        return RxJavaPlugins.onSingleScheduler(SINGLE);
    }
    

    其他两个暂时没用到,就先不说明了。

    回到开始的异步代码
    修改关注点

    observable.subscribeOn(Schedulers.newThread())
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .observeOn(Schedulers.io())
                .subscribe(consumer);
    

    输出结果

    D/TAG: subscribe:RxNewThreadScheduler-1
    D/TAG: accept:1:RxCachedThreadScheduler-2
    

    结果分析
    修改后的代码指定了2次上游发送事件的线程,下游也指定了2次线程,通过输出结果,我们可以得出结论:上游线程只有第一次指定的有效,下游线程最终会切换至最后一个指定的线程。

    为了更加清晰的知道下游线程的切换过程,我们修改代码如下

    Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                Log.d("TAG", "subscribe:" + Thread.currentThread().getName());
                emitter.onNext("1");
                emitter.onNext("2");
                emitter.onNext("3");
            }
        });
        Consumer<String> consumer = new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d("TAG", "accept:" +s+":"+ Thread.currentThread().getName());
            }
        };
        observable.subscribeOn(Schedulers.newThread())
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .doOnNext(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.d("TAG", "accept:" +s+":"+ Thread.currentThread().getName());
                    }
                })
                .observeOn(Schedulers.io())
                .doOnNext(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.d("TAG", "accept:" +s+":"+ Thread.currentThread().getName());
                    }
                })
                .subscribe(consumer);
    

    输出结果如下

    D/TAG: subscribe:RxNewThreadScheduler-1
    D/TAG: accept:1:main
    D/TAG: accept:2:main
    D/TAG: accept:3:main
    D/TAG: accept:1:RxCachedThreadScheduler-2
    D/TAG: accept:1:RxCachedThreadScheduler-2
    D/TAG: accept:2:RxCachedThreadScheduler-2
    D/TAG: accept:2:RxCachedThreadScheduler-2
    D/TAG: accept:3:RxCachedThreadScheduler-2
    D/TAG: accept:3:RxCachedThreadScheduler-2
    

    结果分析
    从输出结果,我们可以看出,每一个doOnNext都会接受到全部事件,并且每一个observeOn指定的是它下面的那个事件所处的线程。

    线程切换原理分析

    RxJava线程切换图.png

    来一段RxJava的调用链代码

    Observable.just(1)
        .map(new Function<Integer, String>() {
          @Override
          public String apply(Integer integer) throws Exception {
            LogUtil.d("rxjava", "map1: " + Thread.currentThread().getId());
            return integer.toString();
          }
        })
        .subscribeOn(Schedulers.newThread()) // s1
        .map(new Function<String, Integer>() {
          @Override
          public Integer apply(String s) throws Exception {
            LogUtil.d("rxjava", "map2: " + Thread.currentThread().getId());
            return s.hashCode();
          }
        })
        .observeOn(Schedulers.newThread()) // o1
        .map(new Function<Integer, String>() {
          @Override
          public String apply(Integer integer) throws Exception {
            LogUtil.d("rxjava", "map3: " + Thread.currentThread().getId());
            return integer.toString();
          }
        })
        .subscribeOn(Schedulers.newThread()) // s2
        .observeOn(Schedulers.newThread()) // o2
        .subscribe(new Consumer<String>() {
          @Override
          public void accept(String s) throws Exception {
            LogUtil.d("rxjava", "accept: " + Thread.currentThread().getId());
          }
        });
    
    结合图和代码来分析线程的切换过程

    这段代码中包含了很多操作符,每一个点后面的都是RxJava的操作符,如just,map,subscribe等等,对应图中的lift
    在这些操作符中,每调用一次操作符,都返回Observable,这就像Builder模式,只有subscribe返回的不是Observable,而是Disposable

    subscribe意味着RxJava调用链开始启动,对应图中的底端的actual-subscriber。

    自下而上找subscribeOn,每经过一个subscribeOn就切换一次线程(如果一个都没有,则线程默认为当前线程),直到到达顶端的Observable,对应图中的onSubscribe

    自上而下找observeOn(图中的Subscribe),同样是每经过一个observeOn就切换一次线程

    通过这个思路,大家想一想上述程序的打印结果
    just,map1和map2处于一个线程,并且是s1所指定的线程
    map3处于o1所处的线程
    accept处于o2所处的线程

    打印验证一下

    map1: 732
    map2: 732
    map3: 733
    accept: 734
    

    结果一致

    []

    相关文章

      网友评论

          本文标题:RxJava2系列第二篇---异步

          本文链接:https://www.haomeiwen.com/subject/hprewftx.html