美文网首页
RxJava基本使用

RxJava基本使用

作者: 小城哇哇 | 来源:发表于2022-11-16 14:29 被阅读0次

1、RxJava概述

RxJava 是一个基于事件流、实现异步操作的库
Rxjava原理基于一种扩展的观察者模式,有4个角色:
被观察者(Observable):产生事件
观察者(Observer):接收事件,并给出响应动作
订阅(Subscribe):连接被观察者 & 观察者
事件(Event):被观察者 & 观察者沟通的载体
添加依赖:

    implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'
    implementation 'io.reactivex.rxjava2:rxjava:2.0.7'

2、RxJava基本使用

2-1、订阅

        // 1、创建被观察者Observable
        Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {

            /**
             * 被观察者Observable的subscribe中会使用ObservableEmitter发送事件,观察者响应对应的事件
             */
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {

                Log.d(TAG, "subscribe: ");
            }
        });


        // 2、创建观察者Observer
        Observer<Integer> observer = new Observer<Integer>() {

            /**
             * 观察者接收事件前,默认最先调用复写 onSubscribe()
             */
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe: "+d.isDisposed());
            }

            /**
             * 当被观察者生产Next事件 & 观察者接收到时,会调用该复写方法 进行响应
             */
            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "对Next事件作出响应" + value);
            }

            /**
             * 当被观察者生产Error事件& 观察者接收到时,会调用该复写方法 进行响应
             */
            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "对Error事件作出响应");
            }

            /**
             * 当被观察者生产Complete事件& 观察者接收到时,会调用该复写方法 进行响应
             */
            @Override
            public void onComplete() {
                Log.d(TAG, "对Complete事件作出响应");
            }
        };

        // 3、当 Observable 被订阅后,观察者的Observer的OnSubscribe方法会自动被调用,被观察者Observable的subscribe方法会被调用
        observable.subscribe(observer);

上述代码的效果基本如下:

D/MainActivity: onSubscribe: false
D/MainActivity: subscribe: 

2-2、发送事件

        Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {

                Log.d(TAG, "subscribe: ");

                // ObservableEmitter 事件发射器,向观察者发送事件
                Log.d(TAG, "subscribe: 1 start");
                emitter.onNext(1);
                Log.d(TAG, "subscribe: 1 end");

                Log.d(TAG, "subscribe: 2 start");
                emitter.onNext(2);
                Log.d(TAG, "subscribe: 2 end");

                Log.d(TAG, "subscribe: 3 start");
                emitter.onNext(3);
                Log.d(TAG, "subscribe: 3 end");

                Log.d(TAG, "subscribe: onComplete start");
                emitter.onComplete();
                Log.d(TAG, "subscribe: onComplete end");
            }
        });

        Observer<Integer> observer = new Observer<Integer>() {

            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe: "+d.isDisposed());
            }

            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "对Next事件作出响应" + value);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "对Error事件作出响应");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "对Complete事件作出响应");
            }
        };

        observable.subscribe(observer);

代码执行结果

D/MainActivity: onSubscribe: false
D/MainActivity: subscribe: 
D/MainActivity: subscribe: 1 start
D/MainActivity: 对Next事件作出响应1
D/MainActivity: subscribe: 1 end
D/MainActivity: subscribe: 2 start
D/MainActivity: 对Next事件作出响应2
D/MainActivity: subscribe: 2 end
D/MainActivity: subscribe: 3 start
D/MainActivity: 对Next事件作出响应3
D/MainActivity: subscribe: 3 end
D/MainActivity: subscribe: onComplete start
D/MainActivity: 对Complete事件作出响应
D/MainActivity: subscribe: onComplete end

2-3、链式调用

        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "开始采用subscribe连接");
            }
            // 默认最先调用复写的 onSubscribe()
            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "对Next事件"+ value +"作出响应"  );
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "对Error事件作出响应");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "对Complete事件作出响应");
            }
        });

https://www.yuque.com/xiaomaolv-pb4aw/rtx9u3/ggb8hs

相关文章

网友评论

      本文标题:RxJava基本使用

      本文链接:https://www.haomeiwen.com/subject/rtbyxdtx.html