美文网首页
subscribe订阅 Observer的几种创建方式

subscribe订阅 Observer的几种创建方式

作者: 蜗牛是不是牛 | 来源:发表于2022-11-16 14:34 被阅读0次

    1、subscribe流程

    subscribe有两类重载方法
    注意subscribe(Observer o)方法没有返回值,因为Observer的方法onSubscribe(Disposable d)里面会返回一个
    注意subscribe(Consumer c)方法有一个Disposable返回值,subscribe传入Consumer对象有多个重载方法,最终会转换成Observer的一个实现类LambdaObserver

    // 表示观察者不对被观察者发送的事件作出任何响应(但被观察者还是可以继续发送事件)    
    public final Disposable subscribe() {}
        
    // 表示观察者只对被观察者发送的Next事件作出响应
    public final Disposable subscribe(Consumer<? super T> onNext) {}
        
    // 表示观察者只对被观察者发送的Next事件 & Error事件作出响应
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {} 
        
    // 表示观察者只对被观察者发送的Next事件、Error事件 & Complete事件作出响应
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
        
    // 表示观察者对被观察者发送的任何事件都作出响应
    public final void subscribe(Observer<? super T> observer) {}
    

    Consumer转换成LambdaObserver对象,这是一个Observer的实现类

        public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
                Action onComplete, Consumer<? super Disposable> onSubscribe) {
            ObjectHelper.requireNonNull(onNext, "onNext is null");
            ObjectHelper.requireNonNull(onError, "onError is null");
            ObjectHelper.requireNonNull(onComplete, "onComplete is null");
            ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
    
            LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
    
            subscribe(ls);
    
            return ls;
        }
    

    LambdaObserver

    public final class LambdaObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
    
        private static final long serialVersionUID = -7251123623727029452L;
        final Consumer<? super T> onNext;
        final Consumer<? super Throwable> onError;
        final Action onComplete;
        final Consumer<? super Disposable> onSubscribe;
    
        public LambdaObserver(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
                Action onComplete,
                Consumer<? super Disposable> onSubscribe) {
            super();
            this.onNext = onNext;
            this.onError = onError;
            this.onComplete = onComplete;
            this.onSubscribe = onSubscribe;
        }
    }
    

    2、Consumer

            Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    emitter.onNext(1);
                    emitter.onNext(2);
                    emitter.onNext(3);
                    emitter.onError(new RuntimeException("error"));
                    emitter.onComplete();
                }
            }).subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Throwable {
                    Log.d(TAG, "accept: onNext " + integer);
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(Throwable throwable) throws Throwable {
                    Log.d(TAG, "accept: onError "+throwable.toString());
    
                }
            }, new Action() {
                @Override
                public void run() throws Throwable {
                    Log.d(TAG, "run: onComplete");
                }
            });
    

    3、Disposable作用

    ● 从上游和下游的关系来讲,onComplete和onError是上游来通知下游不再接收消息的,dispose是下游主动告诉上游自己不再接收消息的。
    ● 可采用 Disposable.dispose() 切断观察者与被观察者之间的连接,即观察者无法继续接收被观察者的事件,但被观察者还是可以继续发送事件

            Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    emitter.onNext(1);
                    emitter.onNext(2);
                    emitter.onNext(3);
                    emitter.onComplete();
                }
            }).subscribe(new Observer<Integer>() {
    
                private Disposable mDisposable;
    
                @Override
                public void onSubscribe(Disposable d) {
                    mDisposable = d;
                    Log.d(TAG, "开始采用subscribe连接");
                }
    
                // 默认最先调用复写的 onSubscribe()
                @Override
                public void onNext(Integer value) {
                    Log.d(TAG, "对Next事件" + value + "作出响应");
                    if (value == 2){
                        mDisposable.dispose();
                        Log.d(TAG, "onNext: 切断连接" );
                    }
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.d(TAG, "对Error事件作出响应");
                }
    
                @Override
                public void onComplete() {
                    Log.d(TAG, "对Complete事件作出响应");
                }
            });
    

    运行结果

    D/MainActivity: 开始采用subscribe连接
    D/MainActivity: 对Next事件1作出响应
    D/MainActivity: 对Next事件2作出响应
    D/MainActivity: onNext: 切断连接
    

    4、onComplete和onError

    ● 从上游和下游的关系来讲,onComplete和onError是上游来通知下游不再接收消息的,dispose是下游主动告诉上游自己不再接收消息的。
    ● 例子

            // onComplete和onError是上游来通知下游不再接收消息的
            // 发送onError事件后之后onComplete就不会响应了
            Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    emitter.onNext(1);
                    emitter.onNext(2);
                    emitter.onNext(3);
                    emitter.onError(new RuntimeException("error"));
                    emitter.onComplete();
                }
            }).subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
                    Log.d(TAG, "开始采用subscribe连接");
                }
                // 默认最先调用复写的 onSubscribe()
                @Override
                public void onNext(Integer value) {
                    Log.d(TAG, "对Next事件"+ value +"作出响应"  );
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.d(TAG, "对Error事件作出响应");
                }
    
                @Override
                public void onComplete() {
                    Log.d(TAG, "对Complete事件作出响应");
                }
            });
    

    运行结果

    D/MainActivity: 开始采用subscribe连接
    D/MainActivity: 对Next事件1作出响应
    D/MainActivity: 对Next事件2作出响应
    D/MainActivity: 对Next事件3作出响应
    D/MainActivity: 对Error事件作出响应
    

    来自:https://www.yuque.com/xiaomaolv-pb4aw/rtx9u3/ggb8hs

    相关文章

      网友评论

          本文标题:subscribe订阅 Observer的几种创建方式

          本文链接:https://www.haomeiwen.com/subject/kcbyxdtx.html