美文网首页
RxJava学习总结-进程控制

RxJava学习总结-进程控制

作者: 取了个很好听的名字 | 来源:发表于2019-07-01 16:53 被阅读0次

    前言

    上一篇文章简单介绍了RxJava的使用,本文将在前文的基础上针对一些细节进行说明。

    链式编程

    链式编程不是什么新鲜的东西,很多框架中的构造者模式都使用了链式编程,RxJava自然也不例外。代码如下:

       Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    emitter.onNext(1);
                    emitter.onNext(2);
                    emitter.onNext(3);
                    emitter.onComplete();
                }
            }).subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
                    Log.e("onSubscribe", "onSubscribe");
                }
    
                @Override
                public void onNext(Integer integer) {
                    Log.e("onNext", integer+"");
                }
    
                @Override
                public void onError(Throwable e) {
                   Log.e("onError",e.getMessage());
                }
    
                @Override
                public void onComplete() {
                   Log.e("onComplete","onComplete");
                }
            });
    

    结果如下:

    06-11 14:11:14.134 21877-21877/com.zhqy.myrxjavademo E/onSubscribe: onSubscribe
    06-11 14:11:14.134 21877-21877/com.zhqy.myrxjavademo E/onNext: 1
    06-11 14:11:14.134 21877-21877/com.zhqy.myrxjavademo E/onNext: 2
    06-11 14:11:14.134 21877-21877/com.zhqy.myrxjavademo E/onNext: 3
    06-11 14:11:14.134 21877-21877/com.zhqy.myrxjavademo E/onComplete: onComplete
    

    这里需要说明的是subscribe()有多个重载方法

        public final Disposable subscribe() {}
        public final Disposable subscribe(Consumer<? super T> onNext) {}
        public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {} 
        public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
        public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
        public final void subscribe(Observer<? super T> observer) {}
    

    第一个重载:上游发送的事件下游都不接收
    第二个重载:只关心上游onNext()方法发送的事件
    第三个重载:只关心上游onNext()和onError()发送的事件
    第四个重载:关心上游onNext(),onError(),onComplete()发送的事件
    第五个重载:和例子中使用的Observer类似,只是需要分别创建对象来接收各自的事件

    线程控制

    RxJava作为一个异步库上下游自然能够在进程间进行切换,但需要注意的是线程间的切换也需要遵守一些规则:
    默认情况下上下游在同一个进程中,所处进程有所在的进程决定,举例来说,如果你把上下游写在Activity中,那么上下游均处于主线程中。代码如下:

      Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    Log.e("thread", "上游处在"+Thread.currentThread().getName() );
                }
            }).subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
    
                    Log.e("thread","下游处在"+ Thread.currentThread().getName() );
                }
    
                @Override
                public void onNext(Integer integer) {
    
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onComplete() {
    
                }
            });
    

    结果如下:

    06-11 14:26:04.576 23290-23290/? E/thread: 下游处在main
    06-11 14:26:04.576 23290-23290/? E/thread: 上游处在main
    

    结果说明了上下游处在同一线程内。但很多情况下上下游处于不同的线程下,比如说网络请求时请求数据需要在子进程下,而更新UI则需要在UI线程下,这又该如何做呢,其实很简单,只需要设置如下代码

    .subscribeOn(Schedulers.newThread())                                              
    .observeOn(AndroidSchedulers.mainThread())  
    

    其中subscribeOn指定上游所在线程,observeOn指定下游所在进程,结果如下:

      Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    Log.e("thread", "上游处在"+Thread.currentThread().getName() );
                }
            }).subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
    
                    Log.e("thread","下游处在"+ Thread.currentThread().getName() );
                }
    
                @Override
                public void onNext(Integer integer) {
    
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onComplete() {
    
                }
            });
    

    注意:如果想在Android下使用RxJava需要添加如下依赖

        //RxAndroid
        implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'
    

    结果如下

    06-11 14:34:12.541 23802-23802/com.zhqy.myrxjavademo E/thread: 下游处在main
    06-11 14:34:12.543 23802-23830/com.zhqy.myrxjavademo E/thread: 上游处在RxNewThreadScheduler-1
    

    从结果可以看出上下游处于不同的线程中,从而完成不同的功能。

    相关文章

      网友评论

          本文标题:RxJava学习总结-进程控制

          本文链接:https://www.haomeiwen.com/subject/rofqfctx.html