RxJava2.0 - 文章二

作者: 世道无情 | 来源:发表于2018-04-29 20:15 被阅读5次

前言

上篇文章讲解了 RxJava2.0的最基本使用,在本节中主要看下RxJava的线程控制。

1. 概述


RxJava原理图.png

2. 下边演示上游和下游在同一个线程


正常情况,上游和下游在同一个线程,也就是说上游在那个线程发送事件,下游就在哪个线程接收事件,下边通过示例代码演示:
    /**
     * 在主线程中创建一个上游 Observable 发送事件,则上游就在主线程中发送事件
     * 在主线程中创建一个下游 Observer 接收事件,则下游就在主线程中接收事件
     */
    public static void demo1(){
        // 创建一个上游:Observable
        Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                Log.e("TAG" , "Observable thread is : " + Thread.currentThread().getName()) ;
                Log.e("TAG" , "emit 1") ;

                emitter.onNext(1);
            }
        }) ;

        // 创建一个下游:Observer
        Consumer<Integer> consumer = new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e("TAG" , "Observer thread is : " + Thread.currentThread().getName()) ;
                Log.e("TAG" , "next : " + integer) ;
            }
        } ;

        // 建立连接
        observable.subscribe(consumer);
    }

运行结果如下:

cn.novate.rxjava2 E/TAG: Observable thread is : main
cn.novate.rxjava2 E/TAG: emit 1
cn.novate.rxjava2 E/TAG: Observer thread is : main
cn.novate.rxjava2 E/TAG: next : 1

以上验证了,上游、下游在同一个线程工作;

但是这样肯定是不能满足我们的需求,我们更多的是需要在子线程中做耗时操作,然后切回到主线程中进行UI更新,如下图所示


thread.png

上图黄色表示子线程,深蓝色表示主线程
要达到在子线程中做耗时操作,然后切回主线程进行UI更新,只需要让上游在子线程中发送事件,然后把下游切回到主线程中接收事件就ok,示例代码如下:

/**
     * 让上游在子线程中发送事件,然后把下游切回到主线程接收事件
     */
    public static void demo2(){
        // 创建一个上游:Observable
        Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                Log.e("TAG" , "Observable thread is " + Thread.currentThread().getName()) ;
                Log.e("TAG" , "emit 1") ;
                emitter.onNext(1);
            }
        }) ;

        // 创建一个下游:Observer
        Consumer<Integer> consumer = new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e("TAG" , "Observer thread is " + Thread.currentThread().getName()) ;
                Log.e("TAG" , "next :" + integer) ;
            }
        } ;

        // 建立连接
        observable.subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(consumer);
    }

运行结果如下

cn.novate.rxjava2 E/TAG: Observable thread is RxNewThreadScheduler-1
cn.novate.rxjava2 E/TAG: emit 1
cn.novate.rxjava2 E/TAG: Observer thread is main
cn.novate.rxjava2 E/TAG: next :1

subscribeOn()指的是上游发送事件的线程,observeOn()指的是下游接收事件的线程;

注意:

1>:多次调用subscribeOn()方法,只有第一次有效,其余的被忽略;
2>:多次调用observeOn()方法,都是可以的,也就是说每调用一次observeOn(),下游线程就会切换一次

 observable.subscribeOn(Schedulers.newThread()) 
.subscribeOn(Schedulers.io()) 
.observeOn(AndroidSchedulers.mainThread()) 
.observeOn(Schedulers.io()) 
.subscribe(consumer);

在RxJava中,已经内置很多线程供我们选择,比如:
1>:Schedulers.newThread():常规的线程;
2>:Schedulers.io():io操作的线程,用于联网请求、读写文件等操作;
3>:Schedulers.computation():CPU计算密集型操作,用于大量计算操作;
4>:AndroidSchedulers.mainThread():主线程;
在RxJava内部使用线程池维护这些线程,效率比较高;

3. 实践


在我们开发过程中,一般都是把耗时操作在子线程中做,比如读写文件、读写数据库、联网请求等操作,下边做一个登录示例来演示,如何把线程切换到子线程中进行耗时操作,然后又是如何把线程切换到主线程中进行更新UI:

    /**
     * 具体示例:
     *     通过演示登录成功与失败的功能来演示:
     *          如何把线程切换到子线程中让其执行耗时操作,然后再次切换到主线程中更新UI
     */
    public static void demo3(final Context context){
        Api api = RetrofitProvider.get().create(Api.class) ;
        api.login(new LoginRequest())
                .subscribeOn(Schedulers.io())     // 切换到io线程(子线程)中进行联网请求
                .observeOn(AndroidSchedulers.mainThread())  // 在耗时操作进行完之后切换到主线程中处理请求结果,来更新UI
                .subscribe(new Observer<LoginResponse>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(LoginResponse value) {

                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.e("TAG" , "登录失败") ;
                    }

                    @Override
                    public void onComplete() {
                        Log.e("TAG" , "登录成功") ;
                    }
                });
    }

注意:上篇文章我们讲到了Disposable,说如果调用 Disposable.dispose()方法,会切断水管,让下游接收不到事件。既然接收不到事件,那么就不能更新UI,因此可以在这个Activity中保存 Disposable,在Activity退出时,调用Disposable.dispose()方法来切断即可;

如果有多个 Disposable,RxJava内置了一个 CompositeDisposable容器,每次得到一个 Disposable,就调用 CompositeDisposable.add()方法把它添加到容器中,在Activity退出时候,直接调用 CompositeDisposable.clear()方法,直接切断所有水管即可;

以上就是教程二的全部内容。

相关文章

  • RXjave总结

    文章 给初学者的RxJava2.0教程(一)给初学者的RxJava2.0教程(二)

  • Rxjava系列(六) RxJava2.0操作符详解

    Rxjava2.0概述 通过前面的文章介绍,读者对RxJava2.0应该有了初步的认识。RxJava2.0相对1....

  • RxJava2.0 - 文章二

    前言 上篇文章讲解了 RxJava2.0的最基本使用,在本节中主要看下RxJava的线程控制。 1. 概述 ...

  • RxJava

    教程 给初学者的RxJava2.0教程(一) 给初学者的RxJava2.0教程(二) 给初学者的RxJava2.0...

  • rx - 收藏集 - 掘金

    给初学者的 RxJava2.0 教程 (二) - Android - 掘金作者博客 http://www.jian...

  • RxJava2.0 - 文章一

    前言 自己在学习RxJava2.0时,参考了大神的博客,然后在这里做一个笔记为了方便自己以后复习和查看,同时也给需...

  • RxJava2.0 - 文章八

    前言 上一节中,我们一次性发送128个事件没有任何问题,但是一旦超过128个立马抛出MissingBackpres...

  • RxJava2.0 - 文章七

    前言 上一节我们学习了使用Observable解决上、下游发射事件速度不平衡的问题,之所以学习 Observabl...

  • RxJava2.0源码初探

    RxJava2.0源码初探 RxJava2.0的源码相对于1.0发生了很大的变化, 命名方式也发生了很大变化, 下...

  • Rxjava2.0 发生订阅关系 的源码解析

    由于要做一场关于rxjava2.0 的内部分享,本人便怀着期待的心情去了解了下rxjava2.0,关于rxjava...

网友评论

    本文标题:RxJava2.0 - 文章二

    本文链接:https://www.haomeiwen.com/subject/dqyplftx.html