美文网首页Android开发积累
RxJava<第十三篇>:线程控制(切换/调度)

RxJava<第十三篇>:线程控制(切换/调度)

作者: NoBugException | 来源:发表于2019-03-21 22:14 被阅读35次

RxJava的线程控制主要设计到两种操作符:subscribeOnobserveOn

subscribeOn:如果多次调用,则只有第一次调用有效;
observeOn:如果多次调用,每次有可以切换线程。

(1)默认情况下
    Observable.just("A")
            .subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    Log.d("aaa", "threadName:"+Thread.currentThread().getName());
                }
            });

打印日志:

图片.png

默认情况下被观察者和观察者是运行在主线程的,如果阻塞50秒(耗时操作)

    Observable.just("A")
            .subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    Thread.sleep(50000);
                    Log.d("aaa", "threadName:"+Thread.currentThread().getName());
                }
            });

这样会阻塞主线程。

这时,我们就需要用到线程控制的知识了。

(2)Scheduler的种类
  • Schedulers.io( ):
    用于IO密集型的操作,例如读写SD卡文件,查询数据库,访问网络等,具有线程缓存机制,在此调度器接收到任务后,先检查线程缓存池中,是否有空闲的线程,如果有,则复用,如果没有则创建新的线程,并加入到线程池中,如果每次都没有空闲线程使用,可以无上限的创建新线程。

  • Schedulers.newThread( ):
    在每执行一个任务时创建一个新的线程,不具有线程缓存机制,因为创建一个新的线程比复用一个线程更耗时耗力,虽然使用Schedulers.io( )的地方,都可以使用Schedulers.newThread( ),但是,Schedulers.newThread( )的效率没有Schedulers.io( )高。

  • Schedulers.computation():
    用于CPU 密集型计算任务,即不会被 I/O 等操作限制性能的耗时操作,例如xml,json文件的解析,Bitmap图片的压缩取样等,具有固定的线程池,大小为CPU的核数。不可以用于I/O操作,因为I/O操作的等待时间会浪费CPU。

  • Schedulers.trampoline():
    在当前线程立即执行任务,如果当前线程有任务在执行,则会将其暂停,等插入进来的任务执行完之后,再将未完成的任务接着执行。

  • Schedulers.single():
    拥有一个线程单例,所有的任务都在这一个线程中执行,当此线程中有任务执行时,其他任务将会按照先进先出的顺序依次执行。

  • Scheduler.from(@NonNull Executor executor):
    指定一个线程调度器,由此调度器来控制任务的执行策略。

  • AndroidSchedulers.mainThread():
    在Android UI线程中执行任务,为Android开发定制。

  • 注:
    在RxJava2中,废弃了RxJava1中的Schedulers.immediate( )
    在RxJava1中,Schedulers.immediate( )的作用为在当前线程立即执行任务,功能等同于RxJava2中的Schedulers.trampoline( )。
    而Schedulers.trampoline( )在RxJava1中的作用是当其它排队的任务完成后,在当前线程排队开始执行接到的任务,有点像RxJava2中的Schedulers.single(),但也不完全相同,因为Schedulers.single()不是在当前线程而是在一个线程单例中排队执行任务。

(3)subscribeOn的使用
    Observable.create(new ObservableOnSubscribe<String>() {

        @Override
        public void subscribe(ObservableEmitter<String> e) throws Exception {
            Log.d("aaa", "Observable----threadName:"+Thread.currentThread().getName());
            e.onNext("A");

        }
    })
            .subscribeOn(Schedulers.io())
            .subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    Log.d("aaa", "Obsever----threadName:"+Thread.currentThread().getName());
                }
            });

代码中添加了subscribeOn(Schedulers.io())这句代码,这样就可以从默认主线程切换到IO线程。

我们看一下打印结果

图片.png

所以, 如果单纯用subscribeOn来控制线程,那么被观察者和观察者都会被切换到指定的线程。

如果添加多个, 比如

    Observable.create(new ObservableOnSubscribe<String>() {

        @Override
        public void subscribe(ObservableEmitter<String> e) throws Exception {
            Log.d("aaa", "Observable----threadName:"+Thread.currentThread().getName());
            e.onNext("A");

        }
    })
            .subscribeOn(Schedulers.io())
            .subscribeOn(Schedulers.trampoline())
            .subscribeOn(Schedulers.newThread())
            .subscribeOn(Schedulers.computation())
            .subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    Log.d("aaa", "Obsever----threadName:"+Thread.currentThread().getName());
                }
            });

那么只有第一次调用subscribeOn有效果。

(4)observeOn的使用
    Observable.create(new ObservableOnSubscribe<String>() {

        @Override
        public void subscribe(ObservableEmitter<String> e) throws Exception {
            Log.d("aaa", "Observable----threadName:"+Thread.currentThread().getName());
            e.onNext("A");

        }
    })
            .observeOn(Schedulers.newThread())
            .subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    Log.d("aaa", "Obsever----threadName:"+Thread.currentThread().getName());
                }
            });

打印效果

图片.png

我们发现被观察者在主线程运行,观察者在子线程运行。

结论:结合(3)总结的结论是,subscribeOn可以控制被观察者和观察者的线程,observeOn仅可以控制观察者的线程。

(5)subscribeOn和observeOn结合使用
    Observable.create(new ObservableOnSubscribe<String>() {

        @Override
        public void subscribe(ObservableEmitter<String> e) throws Exception {
            Log.d("aaa", "Observable----threadName:"+Thread.currentThread().getName());
            e.onNext("A");

        }
    })
            .subscribeOn(AndroidSchedulers.mainThread())
            .observeOn(Schedulers.computation())
            .subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    Log.d("aaa", "Obsever----threadName:"+Thread.currentThread().getName());
                }
            });

打印效果如下:

图片.png

这样观察者就从主线程切换到子线程了。

我们再来举一个稍微复杂的例子。

   Observable.create(new ObservableOnSubscribe<String>() {

        @Override
        public void subscribe(ObservableEmitter<String> e) throws Exception {
            Log.d("aaa", "Observable----threadName:"+Thread.currentThread().getName());
            e.onNext("A");

        }
    })
            .subscribeOn(AndroidSchedulers.mainThread())
            .map(new Function<String, String>() {

                @Override
                public String apply(String s) throws Exception {
                    Log.d("aaa", "map1----threadName:"+Thread.currentThread().getName());
                    return s;
                }
            })
            .observeOn(Schedulers.computation())
            .map(new Function<String, String>() {

                @Override
                public String apply(String s) throws Exception {
                    Log.d("aaa", "map2----threadName:"+Thread.currentThread().getName());
                    return s;
                }
            })
            .observeOn(Schedulers.newThread())
            .map(new Function<String, String>() {

                @Override
                public String apply(String s) throws Exception {
                    Log.d("aaa", "map3----threadName:"+Thread.currentThread().getName());
                    return s;
                }
            })
            .observeOn(Schedulers.single())
            .map(new Function<String, String>() {

                @Override
                public String apply(String s) throws Exception {
                    Log.d("aaa", "map4----threadName:"+Thread.currentThread().getName());
                    return s;
                }
            })
            .observeOn(Schedulers.io())
            .map(new Function<String, String>() {

                @Override
                public String apply(String s) throws Exception {
                    Log.d("aaa", "map1----threadName:"+Thread.currentThread().getName());
                    return s;
                }
            })
            .observeOn(Schedulers.computation())
            .map(new Function<String, String>() {

                @Override
                public String apply(String s) throws Exception {
                    Log.d("aaa", "map2----threadName:"+Thread.currentThread().getName());
                    return s;
                }
            })
            .observeOn(Schedulers.newThread())
            .map(new Function<String, String>() {

                @Override
                public String apply(String s) throws Exception {
                    Log.d("aaa", "map3----threadName:"+Thread.currentThread().getName());
                    return s;
                }
            })
            .observeOn(Schedulers.single())
            .map(new Function<String, String>() {

                @Override
                public String apply(String s) throws Exception {
                    Log.d("aaa", "map4----threadName:"+Thread.currentThread().getName());
                    return s;
                }
            })
            .observeOn(Schedulers.io())
            .subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    Log.d("aaa", "Obsever----threadName:"+Thread.currentThread().getName());
                }
            });

执行效果如下:

图片.png

我们发现

  • 多次调用Schedulers.single(),都是在同一个线程执行。
  • 多次调用Schedulers.computation()、Schedulers.newThread()、Schedulers.io()都会重新新建线程。

Schedulers.from()和AndroidSchedulers.mainThread()就不介绍了。

相关文章

网友评论

    本文标题:RxJava<第十三篇>:线程控制(切换/调度)

    本文链接:https://www.haomeiwen.com/subject/bbocvqtx.html