美文网首页
Rxjava(一)之流程分析与基本操作

Rxjava(一)之流程分析与基本操作

作者: 梦星夜雨 | 来源:发表于2021-01-25 10:25 被阅读0次

    前言

    Rxjava由于其基于事件流的链式调用、逻辑简洁 & 使用简单的特点,深受各大 Android开发者的欢迎。
    Rxjava是一个基于时间流,实现异步操作的库。
    定义:Rxjava简单来说就是采用的观察者内模式来定义的,被观察者(Observable)通过订阅(Subscribe)按照一定顺序将事件发送给观察者(Observer),观察者按顺序接收事件并做出相应的响应动作。
    首先我们引入Rxjava库

    implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'
    implementation 'io.reactivex.rxjava2:rxjava:2.0.7'
    

    基本使用

    Observable<Integer> integerObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> e) throws Exception {
    
                }
            });
    
            Observer<Integer> observer = new Observer<Integer>() {
    
                @Override
                public void onSubscribe(Disposable d) {
    
                }
    
                @Override
                public void onNext(Integer integer) {
    
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onComplete() {
    
                }
            };
    
            integerObservable.subscribe(observer);
    

    首先创建一个被观察者integerObservable 做为上游,接受创建一个观察者observer 做为下游接受integerObservable 传来的消息,最后调用被观察者的订阅方法subscribe()进行订阅。
    当然,Rxjava最经典的还是如下的基于事件流的链式调用结构。有着逻辑简洁,代码优雅,使用简单等优点。

    Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    Log.d(TAG,"Observable onSubscribe");
                    emitter.onNext(1);
                    emitter.onNext(2);
                    Log.d(TAG,"start send onComplete");
                    emitter.onComplete();
                    Log.d(TAG,"end send onComplete");
                    emitter.onNext(3);
                    emitter.onNext(4);
                }
            }).subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
                    Log.d(TAG,"Observer onSubscribe");
                }
    
                @Override
                public void onNext(Integer integer) {
                    Log.d(TAG,"Observer onNext: "+ integer);
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.d(TAG,"Observer onError: " + e.getMessage());
                }
    
                @Override
                public void onComplete() {
                    Log.d(TAG,"Observer onComplete");
                }
            });
    

    在上述代码中我们加入了一些逻辑和日志,运行上述代码会得到下面结果:

    Observer onSubscribe
    Observable onSubscribe
    Observer onNext: 1
    Observer onNext: 2
    start send onComplete
    Observer onComplete
    end send onComplete
    

    通过分析上面的结果,我们不难发现,Rxjava的流程是从被观察者的subscribe()方法开始,然后到观察者的onSubscribe()方法,通过发射器(emitter)发送事件,最终在发射完onComplete()事件后结束,这里我们注意到,最后发射的两个事件,下游并没有收到,这是因为发送完onComplete()事件后,观察者的任务就完成了。
    如果我们这里在发送onError()事件,会抛出UndeliverableException异常,但是先调用onError()再调用onComplete()事件是可以的。由于篇幅关系,这里我就不贴源码了。

    这里我们介绍一个简单的观察者Consumer。

    Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                    e.onNext(1);
                    e.onNext(2);
                }
            }).subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) throws Exception {
                            Log.d(TAG,"accept: "+integer);
                        }
                    });
    
    accept: 1
    accept: 2
    

    我们可以看到,简易的观察者也能接收到被观察者发送的消息。

    下面我们介绍一下Disposable对象,它可以通过disposable()方法切断观察者和被观察者之间的联系,虽然被观察者还可以继续发送事件,但是观察者已经接收不到。具体示例代码和结果如下:

    Disposable disposable;
        public void r01(View view) {
            Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    Log.d(TAG,"Observable onSubscribe");
                    emitter.onNext(1);
                    emitter.onNext(2);
                    emitter.onNext(3);
                    emitter.onNext(4);
                    emitter.onComplete();
                }
            }).subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
                    disposable =d;
                    Log.d(TAG,"Observer onSubscribe");
                }
    
                @Override
                public void onNext(Integer integer) {
                    if (integer == 2) {
                        disposable.dispose();
                    }
                    Log.d(TAG,"Observer onNext: "+ integer);
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.d(TAG,"Observer onError: " + e.getMessage());
                }
    
                @Override
                public void onComplete() {
                    Log.d(TAG,"Observer onComplete");
                }
            });
        }
    
    Observer onSubscribe
    Observable onSubscribe
    Observer onNext: 1
    Observer onNext: 2
    

    本文对RxJava的进行了基本介绍,对基本用法进行了讲解。

    相关文章

      网友评论

          本文标题:Rxjava(一)之流程分析与基本操作

          本文链接:https://www.haomeiwen.com/subject/tojvaktx.html