RXJAVA

作者: leap_ | 来源:发表于2019-06-10 15:32 被阅读0次

简介:

rxjava是一个异步框架,功能和handler类似,特点是链式调用,逻辑简单。

本文内容:

  • 观察者模式
  • rxjava异步使用
  • 操作符介绍
  • rxjava背压
  • rxjava源码分析
  • rxjava+retrofit

观察者模式

java中的观察者模式,主要有三个关键词需要记住,被观察者(Observable),订阅(subscribe),观察者(Observer)。
核心思想:被观察者和观察者通过订阅产生一种关系,当被观察者发生一些改变,通知观察者,观察者对应做出相应的回应。
举例:小说是被观察者,读者是观察者,小说和读者之前通过subscribe产生订阅关系,小说更新了,通知读者去买新小说。

rx异步使用

  1. 创建被观察者(Observable)
Observable<String> story = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                for (int i=1;i<4;i++){
                    Log.d("TAG","我是小说,我更新了第"+i+"季");
                    emitter.onNext(i+"");
                }
            }
        });

调用Observable的create(),传入ObservableOnSubscribe对象,重写ObservableOnSubscribe对象的subscribe(),在subscribe()中,有一个ObservableEmitter对象,这是一个发射器,调用发射器的onNext(),把被观察者(Observable)的事件发送出去。

  1. 创建观察者(Observer)
Observer<String> reader = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d("TAG","我是读者,我和小说订阅了");
            }

            @Override
            public void onNext(String value) {
                Log.d("TAG","我是读者,我拿到了小说的新版本:"+value+"版本");
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {
                Log.d("TAG","我是读者,小说的新版本被我拿完了");
            }
        };

创建Observer,直接new一个Observer重写他的四个方法,

  • onSubscribe():当Observer和Observable订阅的时候调用
  • onNext():对Observable中的emitter.onNext()发射出来的事件进行处理,
  • onError():不用多说,坏了
  • onComplete():Observable发送来的事件全部处理完成,结束调用,
    注意onError()和onComplete()是互斥的,只会调用一个
  1. 订阅
 story.observeOn(Schedulers.io());
 story.subscribeOn(AndroidSchedulers.mainThread());
 story.subscribe(reader);

异步实现:调用Observable的observeOn和subscribeOn设置Observable发射事件和Observer处理事件的线程,

  • story.observeOn(Schedulers.io());发射事件线程是io线程
  • story.subscribeOn(AndroidSchedulers.mainThread());处理事件线程是main
    ps:这里不要纠结为什么是读者被小说订阅了,rx里的规矩就是被观察者subscribe观察者,所以,,,,我也不知道为什么。

运行结果:



上面的三个步骤可以合起来用链式调用的方法写:

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                for (int i=1;i<4;i++){
                    Log.d("TAG","我是小说,我更新了第"+i+"季");
                    emitter.onNext(i+"");
                }
            }
        })
                .observeOn(Schedulers.io())
                .subscribeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d("TAG","我是读者,我和小说订阅了");
            }

            @Override
            public void onNext(String value) {
                Log.d("TAG","我是读者,我拿到了小说的新版本:"+value+"版本");
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {
                Log.d("TAG","我是读者,小说的新版本被我拿完了");
            }
        });

效果和上面一模一样,这种写法逻辑上更加清晰一点。

rxjava操作符使用

  • 创建操作符
  • 变换操作符
  • 合并操作符
  • 功能操作符
创建操作符

作用:创建Observable,发送事件

  • just:
      Observable.just("1","2","3","4");
  • fromArray:
      Integer[] numbers = {1,2,3,4};
      Observable.fromArray(numbers);
  • fromIterable
        ArrayList<String> arrayList = new ArrayList<>();
        arrayList.add("1");
        arrayList.add("2");
        arrayList.add("3");
        arrayList.add("4");
        Observable.fromIterable(arrayList);
  • never:不发送任何事件
  • empty:只发送Complete事件,即emitter.complete()
  • error():发送一个异常,传入error()中
延时创建:定时操作&周期性操作
  • defer:直到有观察者(Observer )订阅时,才动态创建被观察者对象(Observable) & 发送事件
  • timer: 延迟指定时间后,发送1个数值0,默认是在新线程中执行
 Observable.timer(2, TimeUnit.SECONDS) 

本质 = 延迟指定时间后,调用一次 onNext(0)

  • interval:
        // 参数1 = 第1次延迟时间;
        // 参数2 = 间隔时间数字;
        // 参数3 = 时间单位;
Observable.interval(3,1,TimeUnit.SECONDS)
变换操作符

待扩展(* ̄︶ ̄)

合并操作符

待扩展(* ̄︶ ̄)

功能操作符

待扩展(* ̄︶ ̄)

背压

问题:在异步情况中,被观察者发送事件的速率和观察者接收事件的速率不一样,会导致缓冲区溢&oom

对策:背压策略(back pressure strategy)————控制事件流速

原理:
  • 反馈控制:被观察者根据观察者接收事件的能力发送事件
  • 响应式拉取:根据观察者自身情况接收事件
  • 缓冲区:对超出缓冲区的事件进行丢弃,覆盖,报错
具体使用:Flowable

在flowable用法中,被观察者变成了Flowable类,观察者变成了Subscriber类,其他用法和规则不变

  1. 响应式拉取(控制观察者)


Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                Log.d("TAG", "发送事件 1");
                emitter.onNext(1);
                Log.d("TAG", "发送事件 2");
                emitter.onNext(2);
                Log.d("TAG", "发送事件 3");
                emitter.onNext(3);
                Log.d("TAG", "发送完成");
                emitter.onComplete();
            }
        }, BackpressureStrategy.ERROR)
                .subscribeOn(Schedulers.io()) // 设置被观察者在io线程中进行
                .observeOn(AndroidSchedulers.mainThread()) // 设置观察者在主线程中进行
                .subscribe(new Subscriber<Integer>() {
                    // 步骤2:创建观察者 =  Subscriber & 建立订阅关系
                    @Override
                    public void onSubscribe(Subscription s) {
                        // 对比Observer传入的Disposable参数,Subscriber此处传入的参数 = Subscription
                        // 相同点:Subscription参数具备Disposable参数的作用,
                        // 即Disposable.dispose()切断连接, 同样的调用Subscription.cancel()切断连接
                        // 不同点:Subscription增加了void request(long n)
                        // 作用:决定观察者能够接收多少个事件
                        // 如设置了s.request(3),这就说明观察者能够接收3个事件(多出的事件存放在缓存区)
                        // 官方默认推荐使用Long.MAX_VALUE,即s.request(Long.MAX_VALUE);
                        Log.d("TAG", "onSubscribe");
                        s.request(3);
                        /**如果在异步的情况中request()没有参数,则认为观察者不接受事件
                         * 被观察者可以继续发送事件存到缓存区(缓存区大小=128)
                         * */
                    }
                    @Override
                    public void onNext(Integer integer) {
                        Log.d("TAG", "接收到了事件" + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w("TAG", "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d("TAG", "onComplete");
                    }
                });
  1. 反馈控制(控制被观察者)



    在反馈控制中,同步和异步是不同的,先介绍反馈控制的同步实现方法

  • 反馈控制实现(同步)
Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                
                // 调用emitter.requested()获取当前观察者需要接收的事件数量
                long n = emitter.requested();

                Log.d(TAG, "观察者可接收事件" + n);

                // 根据emitter.requested()的值,即当前观察者需要接收的事件数量来发送事件
                for (int i = 0; i < n; i++) {
                    Log.d(TAG, "发送了事件" + i);
                    emitter.onNext(i);
                }
            }
        }, BackpressureStrategy.ERROR)
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");

                        // 设置观察者每次能接受10个事件
                        s.request(10);

                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "接收到了事件" + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });

在被观察者发送的时候,我们拿到emitter.requested()的值,这个值和观察者s.request(10)设置的值相同,为10,观察者设置的能接收多少事件,被观察者就发送多少事件

  • 反馈控制(异步)
    在异步的情况下emitter.requested()的值和观察者s.request()的值不相同,即 被观察者不能根据 观察者自身接收事件的能力 控制发送事件的速度

    被观察者FlowableEmitter.requested()的返回值由RxJava内部决定,并且只会返回128,96,0三种情况,
    RxJava内部调用request(n)(n = 128、96、0)的逻辑
    大概就是开始request(128),当缓冲区<=32的时候,request(96),
// 被观察者:一共需要发送500个事件,但真正开始发送事件的前提 = FlowableEmitter.requested()返回值 ≠ 0
// 观察者:每次接收事件数量 = 48(点击按钮)

        Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {

                Log.d(TAG, "观察者可接收事件数量 = " + emitter.requested());
                    boolean flag; //设置标记位控制

                    // 被观察者一共需要发送500个事件
                    for (int i = 0; i < 500; i++) {
                        flag = false;

                        // 若requested() == 0则不发送
                        while (emitter.requested() == 0) {
                            if (!flag) {
                                Log.d(TAG, "不再发送");
                                flag = true;
                            }
                        }
                        // requested() ≠ 0 才发送
                        Log.d(TAG, "发送了事件" + i + ",观察者可接收事件数量 = " + emitter.requested());
                        emitter.onNext(i);
                }
            }
        }, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io()) // 设置被观察者在io线程中进行
                .observeOn(AndroidSchedulers.mainThread()) // 设置观察者在主线程中进行
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");
                        mSubscription = s;
                       // 初始状态 = 不接收事件;通过点击按钮接收事件
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "接收到了事件" + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });

// 点击按钮才会接收事件 = 48 / 次
btn = (Button) findViewById(R.id.btn);
        btn.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View view) {
                mSubscription.request(48);
                // 点击按钮 则 接收48个事件
            }

        });

通过分析上面的例子来理解反馈控制(异步)的原理

  • 被观察者:一共需要发送500个事件,真正开始发送事件的前提 = FlowableEmitter.requested()返回值 ≠ 0
  • 观察者:每次接收事件数量 = 48(点击按钮)
  1. Flowable发送500个事件,开始RxJava内部会设置FlowableEmitter.requested()返回128,没有点击btn,所以Subscriber默认接收的数量为0,这128个事件被存放到缓冲区,Flowable还有(500-128=372)个事件
  2. 点击btn,Subscriber对象可以接收48个事件,此时缓存区是(128-48=80),Subscriber对象处理了48个对象,Flowable还是(500-128=372)个事件
  3. 再次点击,Subscriber可以接收(48+48=96)个事件,缓冲区(128-48-48=32), 此时缓冲区满足<=32条件,RxJava内部调用request(96),Flowable(500-128-96=276),缓存区(32+96=128),
    剩下的以此类推,就完整的完成了反馈控制的异步展示
  1. 缓冲区
    在创建Flowable的时候,会传入第二个参数,BackpresureStrategy.ERROR



    直接传入参数即可

  • ERROR: 当缓冲区满了会抛出异常MissingBackpressureException
  • MISSING:当缓冲区满了会提示:QUEUE is full
  • BUFFER: 将缓存区大小设置成无限大(要注意内存,防止oom)
  • DROP:超过缓冲区的事件会被丢掉
  • LATEST: 只保留最后1个事件和第1到第128个事件,(一共129个)

rxjava源码

rxjava+retrofit

相关文章

网友评论

    本文标题:RXJAVA

    本文链接:https://www.haomeiwen.com/subject/pgdoxctx.html