美文网首页
subscribe订阅 Observer的几种创建方式

subscribe订阅 Observer的几种创建方式

作者: 蜗牛是不是牛 | 来源:发表于2022-11-16 14:34 被阅读0次

1、subscribe流程

subscribe有两类重载方法
注意subscribe(Observer o)方法没有返回值,因为Observer的方法onSubscribe(Disposable d)里面会返回一个
注意subscribe(Consumer c)方法有一个Disposable返回值,subscribe传入Consumer对象有多个重载方法,最终会转换成Observer的一个实现类LambdaObserver

// 表示观察者不对被观察者发送的事件作出任何响应(但被观察者还是可以继续发送事件)    
public final Disposable subscribe() {}
    
// 表示观察者只对被观察者发送的Next事件作出响应
public final Disposable subscribe(Consumer<? super T> onNext) {}
    
// 表示观察者只对被观察者发送的Next事件 & Error事件作出响应
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {} 
    
// 表示观察者只对被观察者发送的Next事件、Error事件 & Complete事件作出响应
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
    
// 表示观察者对被观察者发送的任何事件都作出响应
public final void subscribe(Observer<? super T> observer) {}

Consumer转换成LambdaObserver对象,这是一个Observer的实现类

    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
            Action onComplete, Consumer<? super Disposable> onSubscribe) {
        ObjectHelper.requireNonNull(onNext, "onNext is null");
        ObjectHelper.requireNonNull(onError, "onError is null");
        ObjectHelper.requireNonNull(onComplete, "onComplete is null");
        ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");

        LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);

        subscribe(ls);

        return ls;
    }

LambdaObserver

public final class LambdaObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {

    private static final long serialVersionUID = -7251123623727029452L;
    final Consumer<? super T> onNext;
    final Consumer<? super Throwable> onError;
    final Action onComplete;
    final Consumer<? super Disposable> onSubscribe;

    public LambdaObserver(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
            Action onComplete,
            Consumer<? super Disposable> onSubscribe) {
        super();
        this.onNext = onNext;
        this.onError = onError;
        this.onComplete = onComplete;
        this.onSubscribe = onSubscribe;
    }
}

2、Consumer

        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onError(new RuntimeException("error"));
                emitter.onComplete();
            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Throwable {
                Log.d(TAG, "accept: onNext " + integer);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Throwable {
                Log.d(TAG, "accept: onError "+throwable.toString());

            }
        }, new Action() {
            @Override
            public void run() throws Throwable {
                Log.d(TAG, "run: onComplete");
            }
        });

3、Disposable作用

● 从上游和下游的关系来讲,onComplete和onError是上游来通知下游不再接收消息的,dispose是下游主动告诉上游自己不再接收消息的。
● 可采用 Disposable.dispose() 切断观察者与被观察者之间的连接,即观察者无法继续接收被观察者的事件,但被观察者还是可以继续发送事件

        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        }).subscribe(new Observer<Integer>() {

            private Disposable mDisposable;

            @Override
            public void onSubscribe(Disposable d) {
                mDisposable = d;
                Log.d(TAG, "开始采用subscribe连接");
            }

            // 默认最先调用复写的 onSubscribe()
            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "对Next事件" + value + "作出响应");
                if (value == 2){
                    mDisposable.dispose();
                    Log.d(TAG, "onNext: 切断连接" );
                }
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "对Error事件作出响应");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "对Complete事件作出响应");
            }
        });

运行结果

D/MainActivity: 开始采用subscribe连接
D/MainActivity: 对Next事件1作出响应
D/MainActivity: 对Next事件2作出响应
D/MainActivity: onNext: 切断连接

4、onComplete和onError

● 从上游和下游的关系来讲,onComplete和onError是上游来通知下游不再接收消息的,dispose是下游主动告诉上游自己不再接收消息的。
● 例子

        // onComplete和onError是上游来通知下游不再接收消息的
        // 发送onError事件后之后onComplete就不会响应了
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onError(new RuntimeException("error"));
                emitter.onComplete();
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "开始采用subscribe连接");
            }
            // 默认最先调用复写的 onSubscribe()
            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "对Next事件"+ value +"作出响应"  );
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "对Error事件作出响应");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "对Complete事件作出响应");
            }
        });

运行结果

D/MainActivity: 开始采用subscribe连接
D/MainActivity: 对Next事件1作出响应
D/MainActivity: 对Next事件2作出响应
D/MainActivity: 对Next事件3作出响应
D/MainActivity: 对Error事件作出响应

来自:https://www.yuque.com/xiaomaolv-pb4aw/rtx9u3/ggb8hs

相关文章

网友评论

      本文标题:subscribe订阅 Observer的几种创建方式

      本文链接:https://www.haomeiwen.com/subject/kcbyxdtx.html