美文网首页Android 开发
Rxjava2 Completable第一节create

Rxjava2 Completable第一节create

作者: CODERLIHAO | 来源:发表于2019-07-10 09:15 被阅读0次

    简言

    Completable只会关心onComplete和onError事件,要么成功要么失败

    例子

     CompletableOnSubscribe subscribe = new CompletableOnSubscribe() {
                @Override
                public void subscribe(CompletableEmitter emitter) throws Exception {
                    emitter.onComplete();
                }
            };
    
            Completable.create(subscribe).subscribe(new CompletableObserver() {
                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println(d.isDisposed());
                }
    
                @Override
                public void onComplete() {
                    System.out.println("onComplete");
                }
    
                @Override
                public void onError(Throwable e) {
                    System.out.println(e.getMessage());
                }
            });
        }
    

    一个非常简单的demo,最终执行的是onComplete()方法

    image.png

    Completable抽象类实现的 接口,从源码中看出,Completable只关心onComplete()和onError()方法

    public interface CompletableObserver {
        void onSubscribe(@NonNull Disposable d);
        void onComplete();
        void onError(@NonNull Throwable e);
    

    源码分析

    由于Completable是抽象类,没办法直接new ,但是这个类提供了大量的静态方法用于构造实例,我们先看一个create()方法

      public static Completable create(CompletableOnSubscribe source) {
            ObjectHelper.requireNonNull(source, "source is null");
            return RxJavaPlugins.onAssembly(new CompletableCreate(source));
        }
    

    源码中直接构造CompletableCreate类

    public final class CompletableCreate extends Completable {
    
        final CompletableOnSubscribe source;
    
        public CompletableCreate(CompletableOnSubscribe source) {
            this.source = source;
        }
    
        @Override
        protected void subscribeActual(CompletableObserver observer) {
            Emitter parent = new Emitter(observer);
            observer.onSubscribe(parent);
    
            try {
                source.subscribe(parent);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                parent.onError(ex);
            }
        }
        ...
    }
    

    先看CompletableOnSubscribe,出现再CompletableCreate 构造方法里的source就是我们在开始demo里的subscribe对象,定义成source有数据源头的意思,数据的发射都是从source对象中发射出去,实际上是用CompletableEmitter 实例发射,CompletableEmitter 实例调用onComplete()方法,这样就可以在CompletableObserver中收到数据,我们看看数据流是怎么调用的。

    public interface CompletableOnSubscribe {
        void subscribe(@NonNull CompletableEmitter emitter) throws Exception;
    }
    

    CompletableOnSubscribe 接口中只有一个方法,emitter对象用于数据的发射控制
    要想整个数据流可以用,就要先订阅,类似我们之前的观察者模式,
    订阅完成后也不一定就可以收到数据,需要用emitter对象发射数据。

    Completable 类里面有个重要的方法

    public final void subscribe(CompletableObserver observer) {
               ...
                subscribeActual(observer);
              ...
    }
    

    重要的是调用subscribeActual()方法,注意方法是protected ,只有子类才能调用,这样就把observer传递进来了。

    protected abstract void subscribeActual(CompletableObserver observer);
    

    好,我们回到CompletableCreate 源码,订阅完成后就会调用subscribeActual()方法

     Emitter parent = new Emitter(observer);
     observer.onSubscribe(parent);
    

    方法里new了一个Emitter实例,我们传递进来的observer,调用了onSubscribe()方法


    image.png

    注意source.subscribe(parent);在observer.onSubscribe(parent);代码之后
    这样我们可以在observer.onSubscribe(parent)方法调用后拿到emitter对象
    控制数据流向是否可以直接停止。
    当代码调用source.subscribe(parent);方法时,我们才能在下图中发射数据


    image.png

    接下来看看Emitter类,我们只看onComplete()方法

    static final class Emitter
        extends AtomicReference<Disposable>
        implements CompletableEmitter, Disposable {
    
            private static final long serialVersionUID = -2467358622224974244L;
    
            final CompletableObserver downstream;
    
            Emitter(CompletableObserver downstream) {
                this.downstream = downstream;
            }
    
            @Override
            public void onComplete() {
                if (get() != DisposableHelper.DISPOSED) {
                    Disposable d = getAndSet(DisposableHelper.DISPOSED);
                    if (d != DisposableHelper.DISPOSED) {
                        try {
                            downstream.onComplete();
                        } finally {
                            if (d != null) {
                                d.dispose();
                            }
                        }
                    }
                }
            }
      ...
        }
    

    代码中将CompletableObserver实例传递进来,downstream对象就是


    image.png

    当我们在这里调用onComplete()方法时,就是直接到Emitter实例的onComplete()方法


    image.png

    Emitter是也是AtomicReference的子类,保准多线程调用的安全

        public void onComplete() {
                if (get() != DisposableHelper.DISPOSED) {
                    Disposable d = getAndSet(DisposableHelper.DISPOSED);
                    if (d != DisposableHelper.DISPOSED) {
                        try {
                            downstream.onComplete();
                        } finally {
                            if (d != null) {
                                d.dispose();
                            }
                        }
                    }
                }
            }
    

    最终调用 downstream.onComplete();,然后整个数据dispose了。dispose之后将接收不到数据,但是任然可以发射数据。

    相关文章

      网友评论

        本文标题:Rxjava2 Completable第一节create

        本文链接:https://www.haomeiwen.com/subject/qshqkctx.html