美文网首页
RxJava 2.0(二)线程调度Scheduler和操作符

RxJava 2.0(二)线程调度Scheduler和操作符

作者: Cris_Ma | 来源:发表于2017-05-30 23:42 被阅读0次

RxJava中的线程

默认的情况下,Observable 和 Observer是处在同一线程的,发送事件在哪个线程,处理事件同样也在该线程。
在Activity的onCreate方法中运行以下代码:

        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                Log.d("RxJava", "Observable Thread:  " + Thread.currentThread().getName());
                e.onNext(1);
            }
        })
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                        Log.d("RxJava", "Observer Thread: " + Thread.currentThread().getName());
                    }
                });

可以得到以下结果:

D/RxJava: Observable Thread: main
D/RxJava: Observer Thread: main

RxJava中,使用Scheduler来进行线程控制,从而实现了关键的异步操作。

Scheduler

Scheduler可以称之为线程调度器,它指定了发送和处理事件所在的线程。常用的API有以下几个:

  • Schedulers.newThread():启用新线程并在新线程运行
  • Schedulers.io():进行I/O 操作所使用的Scheduler,它的内部实现是一个无上限的线程池,可以重用空闲线程,比newThread更有效率,通常用于读写文件,数据库,网络操作等。
  • Schedulers.computation():CPU密集计算所用的Scheduler,它内部是一个线程数等于CPU核心数的线程池。
  • AndroidSchedulers.mainThread(): Android中的主线程(UI线程)。

介绍完了常用API之后,通过下面的例子来看一下是怎样使用的:

        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                Log.d("RxJava", "Observable Thread:  " + Thread.currentThread().getName());
                e.onNext(1);
            }
        })
                .subscribeOn(Schedulers.newThread())//指定observable线程
                .observeOn(AndroidSchedulers.mainThread())//指定Observer线程
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                        Log.d("RxJava", "Observer Thread: " + Thread.currentThread().getName());
                    }
                });

还是上面的例子,加了两行代码,subscribeOn和observeOn。subscribeOn用来指定发送事件的线程,即事件产生的线程,observeOn指定接收并处理事件的线程,即事件消费线程。运行结果如下:

D/RxJava: Observable Thread: RxNewThreadScheduler-1
D/RxJava: Observer Thread: main

subscribeOn和observeOn都可以多次设置,但是subscribeOn只有第一次设置的值会生效,而observeOn不一样,观察者会按照observeOn的指定顺序依次切换到最后一个线程。

操作符

操作符的作用是在事件发送的过程中完成一些特定的操作,比如对事件的包装,添加额外的动作等等。常用操作符主要有以下几种:

  • map();
    map的作用是将observable的数据进行加工,转换成一个新的数据之后再进行发送。看一个具体的例子:
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
            }
        })
                .map(new Function<Integer, String>() {
                    @Override
                    public String apply(@NonNull Integer integer) throws Exception {
                        return "This is Data No. " + integer ;
                    }
                })
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(@NonNull String str) throws Exception {
                        Log.d("RxJava", "Received data: " + str);
                    }
                });

输出结果如下:

D/RxJava: Received data: This is Data No. 1
D/RxJava: Received data: This is Data No. 2
D/RxJava: Received data: This is Data No. 3

  • FlatMap()

    FlatMap与map类似,但是功能更强大了。map只是对Observable发送的数据进行处理,返回的是处理后的数据,而FlatMap在数据处理之后返回的是一个Observable对象,所以,FlatMap实际上是对原来的一系列事件进行加工然后分拆,将每一个数据包含在一个新的Observable对象中发送给下游的观察者。这样做有什么好处? 举一个简单的例子,如果每一个事件都是耗时操作,那么采用FlatMap,将事件分发给不同的Observable,然后加入Schedulers.io(),这样效率瞬间提高了。示例如下:

 Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
//原始事件,打印线程
                Log.d("RxJava", "Original Observable Thread:  " + Thread.currentThread().getName());
                e.onNext(10);
                e.onNext(20);
                e.onNext(30);
            }
        })
                .flatMap(new Function<Integer, ObservableSource<String>>() {
                    @Override
                    public ObservableSource<String> apply(@NonNull final Integer integer) throws Exception {
                        return Observable.create(new ObservableOnSubscribe<String>(){

                            @Override
                            public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
//打印FlatMap转换后,发送事件的线程
                                Log.d("RxJava", "Observable Thread:  " + Thread.currentThread().getName());
                                Thread.sleep(1000);
                                e.onNext("This is Data No." + integer);
                            }
//指定flatMap转换后发送事件所处的线程
                        }).subscribeOn(Schedulers.io());
                    }
                })
//指定原始事件发送线程
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(@NonNull String str) throws Exception {
//打印观察者所处的线程
                        Log.d("RxJava", "Observer Thread: " + Thread.currentThread().getName());
                        Log.d("RxJava", "Received data: " + str);
                    }
                });

运行结果如下:

Original Observable Thread: RxCachedThreadScheduler-1
D/RxJava: Observable Thread: RxCachedThreadScheduler-1
D/RxJava: Observable Thread: RxCachedThreadScheduler-2
D/RxJava: Observable Thread: RxCachedThreadScheduler-3
Observer Thread: main
D/RxJava: Received data: This is Data No.10
D/RxJava: Observer Thread: main
D/RxJava: Received data: This is Data No.20
D/RxJava: Observer Thread: main
D/RxJava: Received data: This is Data No.30

从结果可以看出来,最初的Observable包含3个事件,运行在同一个子线程中,如果是耗时操作,采用同步的方式会浪费大量事件,经过FlatMap转换之后,将每个事件转换为一个新的Observable对象,并指定线程,效率一下提高了3倍!

  • concatMap
    这个操作符与FlatMap作用一样,只是,FlatMap转换的事件在发送时并不保证顺序,而concatMap仍然会按原来的顺序发送。

  • filter()
    filter用来对发送的数据进行过滤

.filter(new Predicate<Integer>() {
                    @Override
                    public boolean test(@NonNull Integer integer) throws Exception {
                        return false;
                    }
                })

返回值决定了下游观察者是否能够收到数据,true表示能收到,false表示不能接收到。

  • take()
    传入一个long数值,表示取前多少个数据。如果传入值大于数据量,会全部发送。另外,它还接收时间参数,表示在多长时间内发送的数据会被接收。

  • doOnNext()

    doOnNext()允许我们在每次输出一个元素之前做一些额外的事情,比如缓存,调试,等等。

.observeOn(AndroidSchedulers.mainThread())
                .doOnNext(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                        Log.d("RxJava", "onnext2 Observer Thread: " + Thread.currentThread().getName());
                    }
                })

相关文章

网友评论

      本文标题:RxJava 2.0(二)线程调度Scheduler和操作符

      本文链接:https://www.haomeiwen.com/subject/zburfxtx.html