美文网首页Android RxJava 2.X 入门例子详解
Android RxJava 2.x入门例子详解(四)

Android RxJava 2.x入门例子详解(四)

作者: IM魂影 | 来源:发表于2017-09-13 20:36 被阅读12次

    线程调度器

    上游默认在主线程发送事件,下游默认也是主线程中接收事件,
    上下游默认是在同一个线程工作

    //create创建一个上游 Observable(被观察者)
    Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
    
                @Override
                public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                    Log.d(TAG, "Observable thread is : " + Thread.currentThread().getName());
    
                    Log.d(TAG, "Observable发出:1");
                    e.onNext(1);//向下游(观察者)发射内容1
                    Log.d(TAG, "Observable发出:2");
                    e.onNext(2);
                    Log.d(TAG, "Observable发出:3");
                    e.onNext(3);
                }
            });
    
            Consumer<Integer> consumer = new Consumer<Integer>() {
    
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d(TAG, "Consumer thread is :" + Thread.currentThread().getName());
                    Log.d(TAG, "onNext收到:" + integer);
                }
            };
    
            observable.subscribe(consumer);
    

    而我们更多时候想要的是,在子线程中做耗时的操作, 然后回到主线程操作UI。通过RxJava内置的线程调度器,我们可以很轻松的做到这一点,如下面的例子:

    //create创建一个上游 Observable(被观察者)
    Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
    
                @Override
                public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                    Log.d(TAG, "Observable thread is : " + Thread.currentThread().getName());
    
                    Log.d(TAG, "Observable发出:1");
                    e.onNext(1);//向下游(观察者)发射内容1
                    Log.d(TAG, "Observable发出:2");
                    e.onNext(2);
                    Log.d(TAG, "Observable发出:3");
                    e.onNext(3);
                }
            });
    
            Consumer<Integer> consumer = new Consumer<Integer>() {
    
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d(TAG, "Consumer thread is :" + Thread.currentThread().getName());
                    Log.d(TAG, "onNext收到:" + integer);
                }
            };
    
            //subscribeOn() 指定上游发送事件的线程, observeOn() 指定下游接收事件的线程.
            observable.subscribeOn(Schedulers.newThread())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(consumer);
    

    subscribeOn只能调用一次,如果调用多次,只有第一次有效。
    而observeOn可以多次调用,每次调用下游都可以切换一次线程。
    如下面的例子:

    //create创建一个上游 Observable(被观察者)
            Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
    
                @Override
                public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                    Log.d(TAG, "Observable thread is : " + Thread.currentThread().getName());
    
                    Log.d(TAG, "Observable发出:1");
                    e.onNext(1);//向下游(观察者)发射内容1
                    Log.d(TAG, "Observable发出:2");
                    e.onNext(2);
                    Log.d(TAG, "Observable发出:3");
                    e.onNext(3);
                }
            });
    
            Consumer<Integer> consumer = new Consumer<Integer>() {
    
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d(TAG, "Consumer thread is :" + Thread.currentThread().getName());
                    Log.d(TAG, "onNext收到:" + integer);
                }
            };
    
            //subscribeOn() 指定上游发送事件的线程, observeOn() 指定下游接收事件的线程.
            observable.subscribeOn(Schedulers.newThread())
                    .subscribeOn(Schedulers.io())//多次调用subscribeOn()只有第一次的有效
                    .observeOn(AndroidSchedulers.mainThread())
                    .doOnNext(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) throws Exception {
                            Log.d(TAG, "observeOn(mainThread) current thread is: " + Thread.currentThread().getName());
                        }
                    })
                    .observeOn(Schedulers.io())//每调用一次observeOn() , 下游的线程就会切换一次
                    .doOnNext(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) throws Exception {
                            Log.d(TAG, "observeOn(io) current thread is : " + Thread.currentThread().getName());
                        }
                    })
                    .subscribe(consumer);
    

    RxJava内置了很多线程选项供我们选择
    1、Schedulers.io() 代表io操作的线程, 通常用于网络,读写文件等io密集型的操作
    2、Schedulers.computation() 代表CPU计算密集型的操作, 例如需要大量计算的操作
    3、Schedulers.newThread() 代表一个常规的新线程
    4、AndroidSchedulers.mainThread() 代表Android的主线程

    这些内置的Scheduler已经足够满足我们开发的需求,因此我们应该使用内置的这些选项,在RxJava内部使用的是线程池,效率也比较高。

    相关文章

      网友评论

        本文标题:Android RxJava 2.x入门例子详解(四)

        本文链接:https://www.haomeiwen.com/subject/xlblsxtx.html