美文网首页
RxJava-disposable

RxJava-disposable

作者: 河马过河 | 来源:发表于2018-08-31 14:25 被阅读28次

    一、disposable用法

        public Flowable<Integer> getRxJavaFlowableRealExample() {
            return Flowable.create(new FlowableOnSubscribe<Integer>() {
                @Override
                public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                    for (int i = 0; ; i++) {
                        if (emitter.isCancelled()) {
                            break;
                        }
                        while (emitter.requested() == 0) {
                        }
                        LogUtils.debug(TAG, "getRxJavaFlowableRealExample---:" + Thread.currentThread().getName() + "-request-emit:" + emitter.requested());
                        LogUtils.debug(TAG, "getRxJavaFlowableRealExample---:" + Thread.currentThread().getName() + "--:" + i);
                        emitter.onNext(i);
                    }
    
                }
            }, BackpressureStrategy.ERROR);
        }
        public void rxJavaFlowableConsumeExample() {
            Disposable disposable = model.getRxJavaFlowableRealExample()
                    .subscribeOn(Schedulers.io())
                    .unsubscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) throws Exception {
                            LogUtils.error(TAG, "rxJavaFlowableConsumeExample--:" + Thread.currentThread().getName() + "-Consumer-:" + integer);
                        }
                    });
            compositeDisposable.add(disposable);
        }
    

    1、在activity的生命周期结束前清除compositeDisposable即可

     public void onFinishActivity() {
            compositeDisposable.dispose();
        }
    

    二、注意事项

    1、 compositeDisposable.dispose(); 此处只是把观察者和被观察者之前的通道切断,观察者不能接收数据,而被观察者则会继续发送数据到结束为止。
    2、如上例所示,如果在无限for循环中不添加

      if (emitter.isCancelled()) {
           break;
       }
    

    这样的语句,则被观察者会无限发送数据。

    日志

    备注:(关闭acitivity情况下,等于已经切断通道,可以requested会保持某个值,一直打印数据)

    08-31 14:14:15.203 29916-30066/com.example.zhang D/MainModel: getRxJavaFlowableRealExample---:RxCachedThreadScheduler-2-request-emit:45
        getRxJavaFlowableRealExample---:RxCachedThreadScheduler-2--:87145
        getRxJavaFlowableRealExample---:RxCachedThreadScheduler-2-request-emit:45
        getRxJavaFlowableRealExample---:RxCachedThreadScheduler-2--:87146
        getRxJavaFlowableRealExample---:RxCachedThreadScheduler-2-request-emit:45
        getRxJavaFlowableRealExample---:RxCachedThreadScheduler-2--:87147
        getRxJavaFlowableRealExample---:RxCachedThreadScheduler-2-request-emit:45
        getRxJavaFlowableRealExample---:RxCachedThreadScheduler-2--:87148
        getRxJavaFlowableRealExample---:RxCachedThreadScheduler-2-request-emit:45
    
    

    三、总结

    1、被观察者如果是无限发送数据,或者有线程阻塞的情况下,要手动控制
    类似

     if (emitter.isCancelled()) {
           break;
       }
    
    河马过河微信公众号.jpg

    相关文章

      网友评论

          本文标题:RxJava-disposable

          本文链接:https://www.haomeiwen.com/subject/tvmswftx.html