美文网首页Rxjava
Rxjava基础知识

Rxjava基础知识

作者: 覆水无言 | 来源:发表于2019-12-18 10:51 被阅读0次

    目录

    1. Rx简介
    2. Rxjava基础知识
    3. Rxjava创建操作符
    4. Rxjava的线程操作
    5. Rxjava变换过滤操作符
    6. Rxjava合并、链接操作符
    7. Rxjava条件和布尔操作符

    Rxjava基础知识

    Observable:

    一:Rxjava的Observable使用通常需要三步。
    <font color=blue>1:创建Observable(被观察者):</font>
    Observable是Rxjava中的被观察者,Rxjava使用时需要创建一个被观察者,
    他决定什么时候触发事件, 也就是被观察者发布信息.
    <font color=blue>2:创建Observer(观察者):</font>
    Observer是观察者,用于接受Observable发布的数据,它可以在不同的线程执行任务,
    这种模式极大简化并发操作,
    <font color=blue>3:使用subscribe()进行订阅,</font>
    这个操作类似传统观察者的注入操作,将Observable与Observer链接起来,

    上篇博客的hello world

        Observable.just("hello world").subscribe(new Consumer<String> (){
            @Override
            public void accept(@NonNull String s) throws Exception {
                System.out.println(s);
            }
        })
    

    <font color=red>代码注解:</font>
    1:just:rxjava中Observable的创建操作符,创建操作符有多种create、from等,后续会详细介绍。
    2:Consumer:消费者,用于接受单个值,
    3:subscribe:订阅,他有多种重载方法,

    subscribe(onNext)
    subscribe(onNext,onError)
    subscribe(onNext,onError,onComplete)
    subscribe(onNext,onError,onComplete,onSubscribe); //onSubscribe:订阅事件,
    
    
    eg:
    Observable.just("hello world")
        .subscribe(new Consumer<String>(){  //订阅onNext事件
            @Override
            public void accept(String s) throws Exception {
                System.out.println(s);
            }
        },new Consumer<Throwable>(){   //订阅错误事件
             @Override
             public void accept(Throwable throwable) throws Exception {
                 System.out.println(throwable);
             }
        }, new Action(){
            @Override
            public void run() throws Exception {   //订阅完成事件
                System.out.println(" onComplete");
            }
        });
        
    注:Action与Consumer是一个意思都是事件的观察者,只是action无参数,Consumer:单一参数类型。
    

    执行顺序:

    1. onSubscribe() 订阅事件,用于取消订阅,表示观察者已将开始观察被观察者。
    2. onNext()
    3. onComplete()
    4. onError(); 在程序运行异常时发布该事件。

    <font color=red>注:</font>

    Rxjava中被观察者、观察者、subscribe三者缺一不可,只有使用了subscribe(),observable才会发布数据。
    Rxjava中5中包含的观察者模式图:


    观察模式图

    observable:能发射0或N个数据,并以成功或错误事件终止。
    Flowable:能发射0或N个数据,并以成功或错误事件终止。支持背压,可以控制发布数据的速度。
    Single:只发送单个数据或错误事件。
    Completable:不发送数据,只处理onComplete,onError事件,
    Maybe:发射0或1个数据,要么成功,要么失败。

    do操作符

    Rxjava包含许多操作符,do操作符可以给Observable的生命周期的各个阶段加上一系列的回调监听,Rxjava中有许多doXXX操作符。
    根据生命周期顺序,doXXX操作符的用途。

    操作符 用途
    doOnSubscribe 监听订阅事件,一旦观察者订阅了Observable,它就会被调用
    doOnLifecycle 可以在观察者订阅后,设置是否取消订阅,参数为Disposable.isDisposed()
    doOnNext Observable每次调用OnNext事件发送数据之前调用一次,只包含onNext事件,它的参数为发射的数据
    doOnEach Observable每发射一次数据就调用一次,包含onNext, onError, onCompleted
    doAfterNext 与doOnNext属性一致,只是doOnNext发生在onNext事件之前,它发生在之后
    doOnComplete Observable正常终止时调用该事件。
    doFinally Observable终止之后调用,无论是正常终止还是异常终止,并且它执行在doAfterTerminate之前
    doAfterTerminate 向Observable注入一个action,当Observable调用onComplete或onError时触发
    Observable.just("hello")
        .doOnNext(new Consumer<String>(){
            @Override
            public void accept(@NonNull String s) throws Exception{
                System.out.println("doOnNext : " + s);
            }
        })
        .doOnComplete(new Action(){
            @Override
            public void run() throws Exception{
                System.out.println("OnComplete");
            }
        })
        .doFinally(new Action(){
            @Override
            public void run() throws Exception{
                System.out.println(" doFinally");
            }
        })
        .doOnLifecycle(new Consumer<Disposable>(){
            @Override
            public void accept(@NonNull Disposable disposable) throws Exception{
                System.out.println( "doOnLifecycle : + " disposable.isDisposed());
            }
        });
    

    Hot Observable与Cold Obsevable

    1:区别

    \color{#ea4235}{hot Observable:}
    无论有没有观察者,事件都会发生,而且与订阅这的关系是一对多,多个Observer可以订阅它。常用与某些事件不确定什么时候发生或不确定发射的元素数量,常用与UI交互,网络环境变化,服务器推送消息的到达等。
    \color{#ea4235}{cold Observable:}
    只有观察者订阅了,才开始发射数据。并且与订阅者关系是一对一。Observable的普通创建操作符just、create, range,fromXXX等都是生成的cold Observable。
    \color{#ea4235}{注意:}当一个cold Observable被两个Observer同时订阅时,这两个是完全相互独立的程序,相当于两个Observable与两个Observer。

    2:Cold Observable装换成Hot Observable

    1. 使用publish生成ConnectableObservable.
      publish可以让Cold Observable转换成Hot Observable,将原来的Observable装换成ConnectableObservable.
    2. 使用Connect进行执行。
      生成ConnectableObservable后调用Connect(),ConnectableObaservable才能真正开始执行,不管是否有订阅。
    //代码使用了lambda表达式,这个自己学习。
    Consumer<Long> observer1 = aLong -> System.out.println("observer1 : " + aLong);
    Consumer<Long> observer2 = aLong -> System.out.println("observer1 : " + aLong);
    Consumer<Long> observer3 = aLong -> System.out.println("observer1 : " + aLong);
    ConnectableObservable<Long> observable = Observable.create((ObservableOnSubscribe<Long>)
         emitter -> Observable.interval(10, TimeUnit.MILLISECONDS, Schedulers.computation())
         .take(Integer.MAX_VALUE).subscribe(emitter::onNext)).observeOn(Schedulers.newThread())
         .publish(); //这里将cold 转换为了hot
    observable.connect();  //这里启动了observable, 之后已经开始发送数据,无论有没有订阅
    observable.subscribe(observer1);
    observable.subscribe(observer2);
    observable.subscribe(observer3);
    
    

    注意:
    Hot Observable是多个订阅者共享同一事件。Connectableobservable是线程安全的。

    3:Hot Observable转Cold Observable

    ConnectableObservable的refCount操作符。refCount操作符可以把一个可连接的Observable链接和断开的过程自动化,它操作一个可链接的Observable返回一个普通的Observable,当观察者(订阅者)订阅这个Observable时,RefCount链接到下层的可连接Observable。refconut跟踪多个观察者,直到最后一个观察者取消订阅,才断开与下层可连接的OBservable的链接。
    \color{#ea4235}{注意:}

    1:如果所有观察者都取消订阅,则Observable的数据流停止,重新订阅,重新开始数据流。
    2:部分观察者取消后,如果又有观察者订阅,则数据不会重新开发送,会按原来的书序发送。
    
    Consumer<Long> observer1 = aLong -> System.out.println("observer1 : " + aLong);
    Consumer<Long> observer2 = aLong -> System.out.println("observer1 : " + aLong);
    Consumer<Long> observer3 = aLong -> System.out.println("observer1 : " + aLong);
    
    ConnectableObservable<Long> observable = Observable.create((ObservableOnSubscribe<Long>)
            emitter -> Observable.interval(10, TimeUnit.MILLISECONDS, Schedulers.computation())
            .take(Integer.MAX_VALUE).subscribe(emitter::onNext)).observeOn(Schedulers.newThread()).publish();
            observable.connect();
            
    Observable obs = observable.refCount();    //使用refcount生成一个需要的observable.
    Disposable disposable = obs.subscribe(observer1);
    Disposable disposable1 = obs.subscribe(observer2);
    obs.subscribe(observer3);
    try {
        Thread.sleep(20L);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    //取消部分订阅。
    disposable.dispose(); //取消observer1
    disposable1.dispose(); //取消Observer2
    //重新开始订阅
    disposable = obs.subscribe(observer1);
    disposable1 = obs.subscribe(observer2);
    

    Flowable

    Rxjava2中Observable不再支持背压,而是Flowable来支持背压,Flowable是Rxjava2中新增的被观察者。Flowable可以看做
    Flowable新的实现,它支持背压。同时实现Reactive streams的Publisher接口。

    使用场景

    Observable一般处理不超过1000套数据,几乎不会出现内存溢出。
    Flowable较好的使用场景

    1:处理某种产生超过10k的元素
    2:文件的读取和分析
    3:读取数据库记录,也是一个阻塞和基于拉取模式
    4:IO流
    5:创建响应式非阻塞接口。

    相关文章

      网友评论

        本文标题:Rxjava基础知识

        本文链接:https://www.haomeiwen.com/subject/qfwynctx.html