美文网首页
rxjava笔记

rxjava笔记

作者: 61etj | 来源:发表于2018-10-28 00:35 被阅读0次

导包

compile 'io.reactivex.rxjava2:rxjava:2.0.1'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'

被观察者

Observable<Integer> integerObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {

                //ObservableEmitter发射器

                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onNext(4);
                e.onError(new RuntimeException("test"));
                e.onComplete();
            }
        });

观察者

Observer<Integer> integerObserver = new Observer<Integer>() {

            private Disposable disposable;

            @Override
            public void onSubscribe(Disposable d) {

                //Disposable 一次性用品  管道连接的开关
                disposable = d;
                System.out.println("onSubscribe");
            }

            @Override
            public void onNext(Integer value) {
                System.out.println("onNext" + value);
                if (value % 3 == 0) {
                    disposable.dispose();//将开关关闭
                    System.out.println("disposable . dispose");
                }
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError");
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        };
    
    
integerObservable.subscribe(integerObserver);//订阅

观察者简写

    //onNext简写
    Consumer consumer = new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            System.out.println("next--"+integer);
        }
    };

    //onError简写
    Consumer<Throwable> throwableConsumer = new Consumer<Throwable>() {
        @Override
        public void accept(Throwable throwable) throws Exception {

            System.out.println("异常");
        }
    };

    //onComplete简写
    Action action = new Action() {
        @Override
        public void run() throws Exception {

            System.out.println("complete");
        }
    };

    integerObservable.subscribe(consumer, throwableConsumer, action);

切换线程

.subscribeOn(Schedulers.newThread()) //被观察者线程             被观察者线程只有第一次有效,其余无效                                
.observeOn(AndroidSchedulers.mainThread())  //观察者线程        观察者可多次切换线程

所有线程选项
Schedulers.io() 代表io操作的线程, 通常用于网络,读写文件等io密集型的操作
Schedulers.computation() 代表CPU计算密集型的操作, 例如需要大量计算的操作
Schedulers.newThread() 代表一个常规的新线程
AndroidSchedulers.mainThread() 代表Android的主线程

在onNext之前执行 是属于Observar的线程

.doOnNext(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "After observeOn(mainThread), current thread is: " + Thread.currentThread().getName());
                    }
                })

map操作符,

位于被观察者与观察者中间,将被观察者转换为另外一种类型
.map(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                return "This is result " + integer;
            }
        })

FlatMap

位于被观察者与观察者中间
将被观察者挨个拆分成Observables,并发送每一个Observable

.flatMap(new Function<Integer, ObservableSource<String>>() {
        @Override
        public ObservableSource<String> apply(Integer integer) throws Exception {
            final List<String> list = new ArrayList<>();
            for (int i = 0; i < 3; i++) {
                list.add("I am value " + integer);
            }
            return Observable.fromIterable(list).delay(10,TimeUnit.MILLISECONDS);//fromIterable将list转换为Observables  delay为延时
        }
    }

问题

FlatMap调用子Observable是无序的

解决方案

使用concatMap 顺序执行
.concatMap(new Function<Integer, ObservableSource<String>>() {
        @Override
        public ObservableSource<String> apply(Integer integer) throws Exception {
            final List<String> list = new ArrayList<>();
            for (int i = 0; i < 3; i++) {
                list.add("I am value " + integer);
            }
            return Observable.fromIterable(list).delay(10,TimeUnit.MILLISECONDS);
        }
    })

zip操作符

参数1,2都是observable 参数3为 压缩算法
发送的事件数量和两个observable中最短事件数量的哪一个有关
如果observable和observer在同一个线程 执行顺序会  observable1先发送,observable2在发送,当observable2每发送一次都会进行zip
如果observable和observer在不同线程,则observer会同时发送
Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {                  
    @Override                                                                                         
    public String apply(Integer integer, String s) throws Exception {                                 
        return integer + s;                                                                           
    }                                                                                                 
})

问题

Observable发送事件过快Observer处理事件过慢,Observable发送的事件去哪里了?

解决方案

使用一个容器将Observable发送的事件储存起来,等待Observer挨个读取

问题

储存Observable发送事件的容器容量有限,如何解决Observable发送过快,Observer处理过慢的问题

解决方案

使用Backpressure背压,及
    1.加入延时是Observable发送慢点(从速度上进行治理, 减缓事件发送进水缸的速度)
        Observable发送事件时延时一段时间
        
    2.容器不保存全部,定时保存,或某些特定的条件才保存(从数量上进行治理, 减少发送进水缸里的事件)
        .sample(2, TimeUnit.SECONDS)  //sample取样 两秒获取一次事件放入容器
    
        .filter(new Predicate<Integer>() {//过滤器返回true才会发送到容器
                            @Override
                            public boolean test(Integer integer) throws Exception {
                                return integer % 10 == 0;
                            }
                        })

Flowable对上一个问题的解决方案做了一个封装 自带了一个128大小的容器

参数2为容器满了的时候的解决方案
取值
    BackpressureStrategy.DROP  超过满的都删除
    BackpressureStrategy.LATEST   一直保存最新的,将老的删除
    BackpressureStrategy.BUFFER  无限存放,不删除
        
Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                Log.d(TAG, "emit 1");
                emitter.onNext(1);
                Log.d(TAG, "emit 2");
                emitter.onNext(2);
                Log.d(TAG, "emit 3");
                emitter.onNext(3);
                Log.d(TAG, "emit complete");
                emitter.onComplete();
            }
        }, BackpressureStrategy.ERROR); //增加了一个参数

Subscriber      
Subscriber<Integer> downstream = new Subscriber<Integer>() {

        @Override
        public void onSubscribe(Subscription s) {
            //Subscription使用Subscription.cancel()  关闭管道连接  Subscription.request(long)来请求容器获取多少个事件
            Log.d(TAG, "onSubscribe");
            s.request(Long.MAX_VALUE);  //注意这句代码
        }

        @Override
        public void onNext(Integer integer) {
            Log.d(TAG, "onNext: " + integer);

        }

        @Override
        public void onError(Throwable t) {
             Log.w(TAG, "onError: ", t);
        }

        @Override
        public void onComplete() {
            Log.d(TAG, "onComplete");
        }
    };

问题

同步线程中 下游不调用Subscription.request(long)会抛出MissingBackpressureException,而异步线程不会

解决方案

同步线程中,下游不调用Subscription,则上游会认为下游无处理数据能力,因为在同一线程中,下游不处理上游不能干等,所以抛出异常提醒用户

相关文章

网友评论

      本文标题:rxjava笔记

      本文链接:https://www.haomeiwen.com/subject/djtutqtx.html