美文网首页
Android框架学习之RxJava入门

Android框架学习之RxJava入门

作者: sssssss_ | 来源:发表于2022-05-24 14:49 被阅读0次

    一、简单使用

    创建被观察者 (Observable )& 生产事件
    创建观察者 (Observer )并 定义响应事件的行为
    通过订阅(Subscribe)连接观察者和被观察者

    1.1 拆分式实现

    // 1. 创建被观察者(Observable )对象
    Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onNext(3);
            emitter.onComplete();
        }
    });
    // 2. 创建观察者(Observer )对象
    Observer<Integer> observer = new Observer<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "开始采用subscribe连接");
        }
        @Override
        public void onNext(Integer value) {
            Log.d(TAG, "对Next事件作出响应" + value);
        }
        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "对Error事件作出响应");
        }
        @Override
        public void onComplete() {
            Log.d(TAG, "对Complete事件作出响应");
        }
    };
    // 3、进行订阅(subscribe)
    observable.subscribe(observer);
    

    1.2 链式调用实现

    Observable.create((ObservableOnSubscribe<Integer>) emitter -> {
        emitter.onNext(1);
        emitter.onNext(2);
        emitter.onNext(3);
        emitter.onComplete();
    }).subscribe(new Observer<Integer>() {
        @Override
        public void onSubscribe(@NonNull Disposable d) {
            Log.d(TAG, "开始采用subscribe连接");
        }
        @Override
        public void onNext(@NonNull Integer integer) {
            Log.d(TAG, "对Next事件" + integer + "作出响应");
        }
        @Override
        public void onError(@NonNull Throwable e) {
            Log.d(TAG, "对Error事件作出响应");
        }
        @Override
        public void onComplete() {
            Log.d(TAG, "对Complete事件作出响应");
        }
    });
    

    二、其他说明

    2.1 重载被观察者 Observable的subscribe()方法

    /**
     * subscribe 方法重载,可以直接在参数中传入 onNext,onError和 onComplete
     * @param onNext
     * @param onError
     * @param onComplete
     * @return
     */
    public final Disposable subscribe(@NonNull Consumer<? super T> onNext, @NonNull Consumer<? super Throwable> onError,
            @NonNull Action onComplete) {
        Objects.requireNonNull(onNext, "onNext is null");
        Objects.requireNonNull(onError, "onError is null");
        Objects.requireNonNull(onComplete, "onComplete is null");
        LambdaObserver<T> ls = new LambdaObserver<>(onNext, onError, onComplete, Functions.emptyConsumer());
        subscribe(ls);
        return ls;
    }
    

    2.2 断开 观察者 与 被观察者 之间的连接

    2.2.1 主动断开 Disposable.dispose()

    // 全局对象
    private Disposable mDisposable;
    
    Observer<Integer> observer = new Observer<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {
            mDisposable = d; // 赋值给全局对象
        }
        ...
    }
    // 主动断开连接
    mDisposable.dispose();
    

    2.2.2 条件断开 takeWhile()

    /**
     * 启动计时器
     */
    private void startTimer() {
        mStartTime = 0;
        mCurrentCount = 0;
        EventBus.getDefault().post(new StakeKingTimerEvent(mCurrentCount, 0));
        mIsTimerRunning = true;
        Observable.interval(0, 1000, TimeUnit.MILLISECONDS, AndroidSchedulers.mainThread())
                .takeWhile(period -> mIsTimerRunning) // 根据条件决定当前定时器是否停止
                .subscribe(period -> {
                    ...
                });
    }
    /**
     * 停止计时器
     */
    private void stopTimer() {
        mIsTimerRunning = false;
    }
    

    三、被观察者(Observable)的源码学习

    // 创建一个被观察者
    Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onNext(3);
            emitter.onComplete();
        }
    });
    
    // 创建方法create()
    public static <T> Observable<T> create(@NonNull ObservableOnSubscribe<T> source) {
        // 将传入的 ObservableOnSubscribe 转成 ObservableCreate
        return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));
    }
    
    // ObservableCreate 的源码
    public final class ObservableCreate<T> extends Observable<T> {
        final ObservableOnSubscribe<T> source;
        public ObservableCreate(ObservableOnSubscribe<T> source) {
            this.source = source; // 存储传入的ObservableOnSubscribe
        }
        // 重写subscribeActual() 方法,在订阅后会执行该方法
        @Override
        protected void subscribeActual(Observer<? super T> observer) {
            // 创建 CreateEmitter 存储事件
            CreateEmitter<T> parent = new CreateEmitter<>(observer);
            // 调用观察者重写的onSubscribe()方法
            observer.onSubscribe(parent);
            try {
                // 调用 被观察者重写的 subscribe()方法
                source.subscribe(parent);// 方法内部会调用onNext、onError、onComplete
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                parent.onError(ex);
            }
        }
        
        // CreateEmitter 的源码
        static final class CreateEmitter<T> extends AtomicReference<Disposable>
        implements ObservableEmitter<T>, Disposable {
            final Observer<? super T> observer;
            CreateEmitter(Observer<? super T> observer) {
                this.observer = observer;
            }
            // 若无断开连接,则调用观察者的同名方法 onNext()
            @Override
            public void onNext(T t) {
                if (t == null) {
                    onError(ExceptionHelper.createNullPointerException("onNext called with a null value."));
                    return;
                }
                if (!isDisposed()) {
                    observer.onNext(t);
                }
            }
            // 最终都会自动调用dispose()
            @Override
            public void onError(Throwable t) {
                if (!tryOnError(t)) {
                    RxJavaPlugins.onError(t);
                }
            }
            // 最终都会自动调用dispose()
            @Override
            public void onComplete() {
                if (!isDisposed()) {
                    try {
                        observer.onComplete();
                    } finally {
                        dispose();
                    }
                }
            }
        ...
        }
    

    小结:

    1. create方法传入参数: ObservableOnSubscribe
    2. 构建返回对象 ObservableCreate
    3. 在ObservableCreate中,重写subscribeActual方法;
    4. 在方法中,创建CreateEmitter进行存储事件;
    5. 调用观察者重写的onSubscribe()方法;
    6. 调用被观察者重写的subscribe()方法;
    7. 在被观察者重写的subscribe()里调用onNext、onError和onComplete方法
    8. 等待被执行;

    四、观察者(Observer)的源码学习

    // 这是一个接口,含4个方法
    public interface Observer<@NonNull T> {
        void onSubscribe(@NonNull Disposable d);
        void onNext(@NonNull T t);
        void onError(@NonNull Throwable e);
        void onComplete();
    }
    // 被观察者订阅观察者的代码
    observable.subscribe(observer);
    
    // subscribe方法的源码
    public final void subscribe(@NonNull Observer<? super T> observer) {
        observer = RxJavaPlugins.onSubscribe(this, observer);
        subscribeActual(observer); // 在收到订阅后,调用subscribeActual方法
    }
    
    // subscribeActual方法的源码,这是个抽象方法,具体的已经在被观察中实现;
    protected abstract void subscribeActual(@NonNull Observer<? super T> observer);
    
    

    小结:

    1. 执行被观察者的订阅时,会调用被观察者的 ObservableCreate 对象的 subscribeActual 方法;
    2. subscribeActual 方法中会执行事件,会在观察者中对应的方法中体现;
    3. 当执行到 onError 或 onComplete 时,会断开订阅。

    参考文章

    https://blog.csdn.net/AthonyDavis/article/details/119219746?spm=1001.2014.3001.5502
    https://www.jianshu.com/p/d52ef3ad7460

    相关文章

      网友评论

          本文标题:Android框架学习之RxJava入门

          本文链接:https://www.haomeiwen.com/subject/tnadqqtx.html