美文网首页
RxJava的学习笔记(三)线程调度

RxJava的学习笔记(三)线程调度

作者: jltxseo | 来源:发表于2017-08-15 23:12 被阅读0次

    前言

    RxJava的学习笔记(一)基础概念
    RxJava的学习笔记(二)操作符
    RxJava的学习笔记(三)线程调度
    上一节笔记二中记录了RxJava2常用操作符的使用,在本节中继续学习RxJava2中最强大、最牛逼的地方,那就是可以在各个事件产生和传递的过程中能够自由的切换线程。
    要在Android中使用RxAndroid, 先继续添加Gradle配置:

     compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
    

    1、Scheduler

    Scheduler顾名思义被称为线程调度器。有了这个东西能够让RxJava中可以自由的在主线程和子线程之间切换,在RxJava中内置了以下Scheduler线程调度器:

    调度器类型 用途
    Schedulers.computation() 用于计算任务,如事件循环或和回调处理,不要用于IO操作(IO操作请使用Schedulers.io());默认线程数等于处理器的数量
    Schedulers.from(executor) 使用指定的Executor线程池作为自定义的调度器
    Schedulers.single() RxJava2新增调度器,返回一个默认的,共享的,单线程支持的Scheduler实例,用于在同一个后台线程上强制执行的工作。
    Schedulers.immediate( ) RxJava1.0里面有该调度器,到2.0之后已被移除。在当前线程立即开始执行任务
    Schedulers.io( ) 用于IO密集型任务,如异步阻塞IO操作,这个调度器的线程池会根据需要增长;对于普通的计算任务,请使用Schedulers.computation();Schedulers.io( )默认是一个CachedThreadScheduler,类似一个有线程缓存的新线程调度器
    Schedulers.newThread( ) 为每个任务创建一个新线程
    Schedulers.trampoline( ) 当其它排队的任务完成后,在当前线程排队开始执行
    AndroidSchedulers.mainThread() 此调度器为RxAndroid特有,顾名思义,运行在Android UI线程上

    2、Scheduler线程调度的实践

    在RxJava中可以利用subscribeOn() 结合 observeOn() 来实现线程控制,让事件的产生和消费发生在不同的线程。对于subscribeOn和observeOn方法的使用特点和对应的作用分下面几种情况:

    • 1、不进行线程Scheduler切换控制时都是在当前主线程里面运行。
      示例:
        public static void noScheduler() {
            Log.d(XqTag.TAG, "start.thread:" + Thread.currentThread());
            Observable.just("data")
                    .map(new Function<String, String>() {
                        @Override
                        public String apply(String s) throws Exception {
                            Log.d(XqTag.TAG, "map0.thread:" + Thread.currentThread());
                            return s + "-map0";
                        }
                    })
                    .subscribe(new Observer<String>() {
                        @Override
                        public void onSubscribe(Disposable d) {
                            Log.d(XqTag.TAG, "onSubscribe.thread:" + Thread.currentThread());
                        }
    
                        @Override
                        public void onNext(String value) {
                            Log.d(XqTag.TAG, "onNext:" + value + ".thread:" + Thread.currentThread());
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.d(XqTag.TAG, "onError:" + e.toString() + ".thread:" + Thread.currentThread());
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(XqTag.TAG, "onComplete.thread:" + Thread.currentThread());
                        }
                    });
        }
    

    得到的结果:

    08-15 18:30:04.854 10161-10161/cn.jltx.rxjava.rx D/XQTAG: start.thread:Thread[main,5,main]
    08-15 18:30:04.863 10161-10161/cn.jltx.rxjava.rx D/XQTAG: onSubscribe.thread:Thread[main,5,main]
    08-15 18:30:04.863 10161-10161/cn.jltx.rxjava.rx D/XQTAG: map0.thread:Thread[main,5,main]
    08-15 18:30:04.863 10161-10161/cn.jltx.rxjava.rx D/XQTAG: onNext:data-map0.thread:Thread[main,5,main]
    08-15 18:30:04.863 10161-10161/cn.jltx.rxjava.rx D/XQTAG: onComplete.thread:Thread[main,5,main]
    
    • 2、subscribeOn无论放置在哪个位置,它的线程切换发生在该Observable的OnSubscribe 中,即在它通知上一级 OnSubscribe 时,这时事件还没有开始发送,因此 subscribeOn() 的线程控制可以从事件发出的开端就造成影响。当使用多个subscribeOn() 的时候,只有第一个 subscribeOn() 起作用。
      示例:
        public static void subscribeOnScheduler() {
            Log.d(XqTag.TAG, "start.thread:" + Thread.currentThread());
            Observable.just("data")
                    .subscribeOn(Schedulers.io())
                    .map(new Function<String, String>() {
                        @Override
                        public String apply(String s) throws Exception {
                            Log.d(XqTag.TAG, "map0.thread:" + Thread.currentThread());
                            return s + "-map0";
                        }
                    })
                    .subscribeOn(Schedulers.newThread())
                    .subscribe(new Observer<String>() {
                        @Override
                        public void onSubscribe(Disposable d) {
                            Log.d(XqTag.TAG, "onSubscribe.thread:" + Thread.currentThread());
                        }
    
                        @Override
                        public void onNext(String value) {
                            Log.d(XqTag.TAG, "onNext:" + value + ".thread:" + Thread.currentThread());
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.d(XqTag.TAG, "onError:" + e.toString() + ".thread:" + Thread.currentThread());
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(XqTag.TAG, "onComplete.thread:" + Thread.currentThread());
                        }
                    });
        }
    

    得到的结果是:

    08-15 18:33:31.654 10161-10161/cn.jltx.rxjava.rx D/XQTAG: start.thread:Thread[main,5,main]
    08-15 18:33:31.671 10161-10161/cn.jltx.rxjava.rx D/XQTAG: onSubscribe.thread:Thread[main,5,main]
    08-15 18:33:31.695 10161-10499/cn.jltx.rxjava.rx D/XQTAG: map0.thread:Thread[RxCachedThreadScheduler-1,5,main]
    08-15 18:33:31.695 10161-10499/cn.jltx.rxjava.rx D/XQTAG: onNext:data-map0.thread:Thread[RxCachedThreadScheduler-1,5,main]
    08-15 18:33:31.695 10161-10499/cn.jltx.rxjava.rx D/XQTAG: onComplete.thread:Thread[RxCachedThreadScheduler-1,5,main]
    

    只有第一个subscribeOn()生效,而最后的subscribeOn就没有效果了。

    • 3、通过observeOn进行线程切换时,发生在自己内部构建的Observable被订阅者中,也就是在它即将给下一级的订阅者发送事件时,因此,ObserveOn()控制的是后面的线程,可以多个ObserveOn()一起使用。每使用一次ObserveOn(),它后面的线程就跟着变换一次。
    52eb2279jw1f2rxd1vl7xj20hd0hzq6e.jpg

    示例:

        public static void observeOnScheduler() {
            Log.d(XqTag.TAG, "start.thread:" + Thread.currentThread());
            Observable.just("data")
                    .observeOn(Schedulers.io())
                    .map(new Function<String, String>() {
                        @Override
                        public String apply(String s) throws Exception {
                            Log.d(XqTag.TAG, "map0.thread:" + Thread.currentThread());
                            return s + "-map0";
                        }
                    })
                    .observeOn(Schedulers.computation())
                    .observeOn(Schedulers.newThread())
                    .subscribe(new Observer<String>() {
                        @Override
                        public void onSubscribe(Disposable d) {
                            Log.d(XqTag.TAG, "onSubscribe.thread:" + Thread.currentThread());
                        }
    
                        @Override
                        public void onNext(String value) {
                            Log.d(XqTag.TAG, "onNext:" + value + ".thread:" + Thread.currentThread());
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.d(XqTag.TAG, "onError:" + e.toString() + ".thread:" + Thread.currentThread());
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(XqTag.TAG, "onComplete.thread:" + Thread.currentThread());
                        }
                    });
        }
    

    得到的结果是:

    08-15 10:52:51.998 2498-2498/cn.jltx.rxjava.rx D/XQTAG: start.thread:Thread[main,5,main]
    08-15 10:52:52.003 2498-2498/cn.jltx.rxjava.rx D/XQTAG: onSubscribe.thread:Thread[main,5,main]
    08-15 10:52:52.004 2498-2577/cn.jltx.rxjava.rx D/XQTAG: map0.thread:Thread[RxCachedThreadScheduler-1,5,main]
    08-15 10:52:52.018 2498-2579/cn.jltx.rxjava.rx D/XQTAG: onNext:data-map0.thread:Thread[RxNewThreadScheduler-1,5,main]
    08-15 10:52:52.018 2498-2579/cn.jltx.rxjava.rx D/XQTAG: onComplete.thread:Thread[RxNewThreadScheduler-1,5,main]
    
    • 4、下游的onSubscrible的回调是在subscrible()订阅动作的时候就被调用了,因此不能先直接指定onSubscrible执行的线程,而只能在subscribe()被调用时的线程中。
    • 5、对于设置doOnSubscribe回调默认跟onSubscribe的回调规则那样在执行subscribe()订阅动作的时候被调用了,但是如果在 doOnSubscribe() 之后有 subscribeOn() ,它将执行在离它最近的 subscribeOn() 所指定的线程。
      示例:
        public static void subscribeAndobserveOnScheduler() {
            Log.d(XqTag.TAG, "start.thread:" + Thread.currentThread());
            Observable.just("data")
                    .observeOn(Schedulers.io())
                    .doOnSubscribe(new Consumer<Disposable>() {
                        @Override
                        public void accept(Disposable disposable) throws Exception {
                            Log.d(XqTag.TAG, "doOnSubscribe.thread:" + Thread.currentThread());
                        }
                    })
                    //doOnSubscribe后面设置了subscribeOn后执行离它最近的subscribeOn所指定的线程
                    .subscribeOn(Schedulers.newThread())
                    .subscribeOn(Schedulers.io())
                    .flatMap(new Function<String, ObservableSource<String>>() {
                        @Override
                        public ObservableSource<String> apply(String s) throws Exception {
                            Log.d(XqTag.TAG, "map0.thread:" + Thread.currentThread());
                            return Observable.just(s + "-map0");
                        }
                    })
                    .observeOn(Schedulers.computation())
                    .subscribe(new Observer<String>() {
                        @Override
                        public void onSubscribe(Disposable d) {
                            Log.d(XqTag.TAG, "onSubscribe.thread:" + Thread.currentThread());
                        }
    
                        @Override
                        public void onNext(String value) {
                            Log.d(XqTag.TAG, "onNext:" + value + ".thread:" + Thread.currentThread());
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.d(XqTag.TAG, "onError:" + e.toString() + ".thread:" + Thread.currentThread());
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(XqTag.TAG, "onComplete.thread:" + Thread.currentThread());
                        }
                    });
        }
    

    得到结果是:

    08-15 10:58:41.261 2498-2498/cn.jltx.rxjava.rx D/XQTAG: start.thread:Thread[main,5,main]
    08-15 10:58:41.267 2498-2498/cn.jltx.rxjava.rx D/XQTAG: onSubscribe.thread:Thread[main,5,main]
    08-15 10:58:41.268 2498-2606/cn.jltx.rxjava.rx D/XQTAG: doOnSubscribe.thread:Thread[RxNewThreadScheduler-2,5,main]
    08-15 10:58:41.268 2498-2605/cn.jltx.rxjava.rx D/XQTAG: map0.thread:Thread[RxCachedThreadScheduler-2,5,main]
    08-15 10:58:41.270 2498-2607/cn.jltx.rxjava.rx D/XQTAG: onNext:data-map0.thread:Thread[RxComputationThreadPool-2,5,main]
    08-15 10:58:41.270 2498-2607/cn.jltx.rxjava.rx D/XQTAG: onComplete.thread:Thread[RxComputationThreadPool-2,5,main]
    
    • 6、subscribeOn和observeOn嵌套以及多个Observable联合使用时,一层一层的线程由父Observable到子Observable的切换,同时也可以在doOnSubscribe()、doOnNext()、doOnComplete()这些方法前后自由通过observeOn()切换这些方法回调的线程。
      示例:
        public static void subscribeAndobserveOnNestScheduler() {
            Log.d(XqTag.TAG, "start.thread:" + Thread.currentThread());
            Observable.just("data")
                    .observeOn(Schedulers.io())
                    .doOnSubscribe(new Consumer<Disposable>() {
                        @Override
                        public void accept(Disposable disposable) throws Exception {
                            Log.d(XqTag.TAG, "doOnSubscribe.thread:" + Thread.currentThread());
                        }
                    })
                    //doOnSubscribe后面设置了subscribeOn后执行离它最近的subscribeOn所指定的线程
                    .subscribeOn(Schedulers.newThread())
                    .subscribeOn(Schedulers.io())
                    .map(new Function<String, String>() {
                        @Override
                        public String apply(String s) throws Exception {
                            Log.d(XqTag.TAG, "map0.thread:" + Thread.currentThread());
                            return s + "-map0";
                        }
                    })
                    .observeOn(Schedulers.newThread())
                    .flatMap(new Function<String, ObservableSource<String>>() {
                        @Override
                        public ObservableSource<String> apply(String s) throws Exception {
                            Log.d(XqTag.TAG, "flatMap1.thread:" + Thread.currentThread());
                            Observable<String> flatMapObservable = Observable.just(s + "-flatMap1")
                                    .doOnSubscribe(new Consumer<Disposable>() {
                                        @Override
                                        public void accept(Disposable disposable) throws Exception {
                                            Log.d(XqTag.TAG, "flatMapObservable.doOnSubscribe.thread:" + Thread.currentThread());
                                        }
                                    })
                                    .subscribeOn(AndroidSchedulers.mainThread())
                                    .observeOn(Schedulers.io())
                                    .map(new Function<String, String>() {
                                        @Override
                                        public String apply(String s) throws Exception {
                                            Log.d(XqTag.TAG, "apply.map2.thread:" + Thread.currentThread());
                                            return s + "-map2";
                                        }
                                    })
                                    .observeOn(Schedulers.single())
                                    .doOnNext(new Consumer<String>() {
                                        @Override
                                        public void accept(String s) throws Exception {
                                            Log.d(XqTag.TAG, "doOnNext.accept" + s + ".map2.thread:" + Thread.currentThread());
                                        }
                                    })
                                    .observeOn(Schedulers.newThread())
                                    .doOnComplete(new Action() {
                                        @Override
                                        public void run() throws Exception {
                                            Log.d(XqTag.TAG, "doOnComplete.accept" + s + ".map2.thread:" + Thread.currentThread());
                                        }
                                    });
                            return flatMapObservable;
                        }
                    })
                    .observeOn(Schedulers.computation())
                    .subscribe(new Observer<String>() {
                        @Override
                        public void onSubscribe(Disposable d) {
                            Log.d(XqTag.TAG, "onSubscribe.thread:" + Thread.currentThread());
                        }
    
                        @Override
                        public void onNext(String value) {
                            Log.d(XqTag.TAG, "onNext:" + value + ".thread:" + Thread.currentThread());
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.d(XqTag.TAG, "onError:" + e.toString() + ".thread:" + Thread.currentThread());
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(XqTag.TAG, "onComplete.thread:" + Thread.currentThread());
                        }
                    });
        }
    

    得到的结果是:

    08-15 11:08:09.277 2498-2498/cn.jltx.rxjava.rx D/XQTAG: start.thread:Thread[main,5,main]
    08-15 11:08:09.277 2498-2498/cn.jltx.rxjava.rx D/XQTAG: onSubscribe.thread:Thread[main,5,main]
    08-15 11:08:09.279 2498-2646/cn.jltx.rxjava.rx D/XQTAG: doOnSubscribe.thread:Thread[RxNewThreadScheduler-3,5,main]
    08-15 11:08:09.279 2498-2645/cn.jltx.rxjava.rx D/XQTAG: map0.thread:Thread[RxCachedThreadScheduler-3,5,main]
    08-15 11:08:09.280 2498-2647/cn.jltx.rxjava.rx D/XQTAG: flatMap1.thread:Thread[RxNewThreadScheduler-4,5,main]
    08-15 11:08:09.282 2498-2498/cn.jltx.rxjava.rx D/XQTAG: flatMapObservable.doOnSubscribe.thread:Thread[main,5,main]
    08-15 11:08:09.283 2498-2645/cn.jltx.rxjava.rx D/XQTAG: apply.map2.thread:Thread[RxCachedThreadScheduler-3,5,main]
    08-15 11:08:09.284 2498-2648/cn.jltx.rxjava.rx D/XQTAG: doOnNext.acceptdata-map0-flatMap1-map2.map2.thread:Thread[RxSingleScheduler-1,5,main]
    08-15 11:08:09.287 2498-2649/cn.jltx.rxjava.rx D/XQTAG: doOnComplete.acceptdata-map0.map2.thread:Thread[RxNewThreadScheduler-5,5,main]
    08-15 11:08:09.299 2498-2650/cn.jltx.rxjava.rx D/XQTAG: onNext:data-map0-flatMap1-map2.thread:Thread[RxComputationThreadPool-3,5,main]
    08-15 11:08:09.299 2498-2650/cn.jltx.rxjava.rx D/XQTAG: onComplete.thread:Thread[RxComputationThreadPool-3,5,main]
    

    写在最后

    每天坚持写点日志心得。
    最后附上源码:
    https://git.oschina.net/jltx/RxJavaRetrofitDemoPro

    相关文章

      网友评论

          本文标题:RxJava的学习笔记(三)线程调度

          本文链接:https://www.haomeiwen.com/subject/yuddrxtx.html