美文网首页Android收藏集
RxJava+Retrofit+OkHttp 网络框架封装(二)

RxJava+Retrofit+OkHttp 网络框架封装(二)

作者: 涛涛123759 | 来源:发表于2019-03-12 17:04 被阅读346次

    RxJava的简介
    RxJava是一个响应式编程框架,实现异步操作,如事件响应、网络操作等。R表示Reactive(响应式),x 表示任何,则Rx表示将响应式的编程思想应用于任何语言,即Java,因此RxJava就是响应式编程的Java版本。同时,在RxJava的基础上,还扩展为RxAndroid引入了Android系统的线程概念。

    1、 RxJava2.0 VS RxJava1.0

    RxJava1.0和RxJava2.0的核心思想都是观察者模式,只不过RxJava2.0在RxJava1.0的基础对一些方法进行了优化,方便于开发者更好地理解其编程思想,同时又增加了一部分新的方法解决1.0存在的问题,例如背压等。

    RxJava2中除了保留了RxJava1中Observable --> Observer观察者模式以外,还新增了一种观察者模式Flowable --> Subscriber。目的为何相信大家已经知道了,新观察者模式可完美支持背压策略。
    Rxjava2.0,出现了两种观察者模式:

    • Observable(被观察者)/Observer(观察者)
    • Flowable(被观察者)/Subscriber(观察者)
      RxJava2.X中, Observeable用于订阅Observer ,是不支持背压的,而 Flowable用于订阅Subscriber ,是支持背压(Backpressure)的。

    2、什么是观察者模式

    响应式编程的核心思想是观察者模式(设计模式)。对于观察者模式,其需求本质是:对象A(观察者)关注对象B(被观察者)的变化,当对象B发生瞬间变化时,对象A就会作出反应(响应)。
    观察者模式可以分为两种:

    • 主动模式:观察者实时监控被观察者的状态。根据被观察者的状态变化,观察者做出反馈。
    • 被动模式: 观察者在被观察者中订阅或注册,当被观察者发生状态变化时,通知观察者,观察者做出反馈。
      主动模式以观察者为主导,被动模式以被观察者为主导
    • 例如:
      生活中的警察和小偷就是观察者与被观察者的关系;在比如Android系统中的监听器(Listener)与控件的的关系,也是观察者与被观察者的关系。这种观察者模式属于主动模式
      RxJava采用观察者模式中的被观察者模式,通过订阅的方式将观察者与被观察者绑定,有观察者监听被观察者的状态,当被观察者的状态发生变化时,通知观察者做出适当的反应。

    3、添加依赖

    首先在Gradle添加RxJava2与RxAndroid2的依赖啦

    dependencies {
        ......
        compile 'io.reactivex.rxjava2:rxjava:2.0.1'
        compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
    }
    

    4、用RxJava实现一个观察者模式

    • 实现观察者模式分为三部分:
      1、初始化 Observable
      2、初始化 Observer
      3、建立订阅关系
    Observable.create(new ObservableOnSubscribe<Integer>() { // 第一步:初始化Observable
                @Override
                public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                    Log.e(TAG, "Observable emit 1" + "\n");
                    e.onNext(1);
                    Log.e(TAG, "Observable emit 2" + "\n");
                    e.onNext(2);
                    Log.e(TAG, "Observable emit 3" + "\n");
                    e.onNext(3);
                    e.onComplete();
                    Log.e(TAG, "Observable emit 4" + "\n" );
                    e.onNext(4);
                }
            }).subscribe(new Observer<Integer>() { // 第三步:订阅
    
                // 第二步:初始化Observer
                private int i;
                private Disposable mDisposable;
    
                @Override
                public void onSubscribe(@NonNull Disposable d) {      
                    mDisposable = d;
                }
    
                @Override
                public void onNext(@NonNull Integer integer) {
                    i++;
                    if (i == 2) {
                        // 在RxJava 2.x 中,新增的Disposable可以做到切断的操作,让Observer观察者不再接收上游事件
                        mDisposable.dispose();
                    }
                }
    
                @Override
                public void onError(@NonNull Throwable e) {
                    Log.e(TAG, "onError : value : " + e.getMessage() + "\n" );
                }
    
                @Override
                public void onComplete() {
                    Log.e(TAG, "onComplete" + "\n" );
                }
            });
    

    因有事暂时先写到这里,未完待续……

    相关文章

      网友评论

        本文标题:RxJava+Retrofit+OkHttp 网络框架封装(二)

        本文链接:https://www.haomeiwen.com/subject/cswbpqtx.html