美文网首页Android 开发
Rxjava2 Completable第一节create

Rxjava2 Completable第一节create

作者: CODERLIHAO | 来源:发表于2019-07-10 09:15 被阅读0次

简言

Completable只会关心onComplete和onError事件,要么成功要么失败

例子

 CompletableOnSubscribe subscribe = new CompletableOnSubscribe() {
            @Override
            public void subscribe(CompletableEmitter emitter) throws Exception {
                emitter.onComplete();
            }
        };

        Completable.create(subscribe).subscribe(new CompletableObserver() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println(d.isDisposed());
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println(e.getMessage());
            }
        });
    }

一个非常简单的demo,最终执行的是onComplete()方法

image.png

Completable抽象类实现的 接口,从源码中看出,Completable只关心onComplete()和onError()方法

public interface CompletableObserver {
    void onSubscribe(@NonNull Disposable d);
    void onComplete();
    void onError(@NonNull Throwable e);

源码分析

由于Completable是抽象类,没办法直接new ,但是这个类提供了大量的静态方法用于构造实例,我们先看一个create()方法

  public static Completable create(CompletableOnSubscribe source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new CompletableCreate(source));
    }

源码中直接构造CompletableCreate类

public final class CompletableCreate extends Completable {

    final CompletableOnSubscribe source;

    public CompletableCreate(CompletableOnSubscribe source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(CompletableObserver observer) {
        Emitter parent = new Emitter(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
    ...
}

先看CompletableOnSubscribe,出现再CompletableCreate 构造方法里的source就是我们在开始demo里的subscribe对象,定义成source有数据源头的意思,数据的发射都是从source对象中发射出去,实际上是用CompletableEmitter 实例发射,CompletableEmitter 实例调用onComplete()方法,这样就可以在CompletableObserver中收到数据,我们看看数据流是怎么调用的。

public interface CompletableOnSubscribe {
    void subscribe(@NonNull CompletableEmitter emitter) throws Exception;
}

CompletableOnSubscribe 接口中只有一个方法,emitter对象用于数据的发射控制
要想整个数据流可以用,就要先订阅,类似我们之前的观察者模式,
订阅完成后也不一定就可以收到数据,需要用emitter对象发射数据。

Completable 类里面有个重要的方法

public final void subscribe(CompletableObserver observer) {
           ...
            subscribeActual(observer);
          ...
}

重要的是调用subscribeActual()方法,注意方法是protected ,只有子类才能调用,这样就把observer传递进来了。

protected abstract void subscribeActual(CompletableObserver observer);

好,我们回到CompletableCreate 源码,订阅完成后就会调用subscribeActual()方法

 Emitter parent = new Emitter(observer);
 observer.onSubscribe(parent);

方法里new了一个Emitter实例,我们传递进来的observer,调用了onSubscribe()方法


image.png

注意source.subscribe(parent);在observer.onSubscribe(parent);代码之后
这样我们可以在observer.onSubscribe(parent)方法调用后拿到emitter对象
控制数据流向是否可以直接停止。
当代码调用source.subscribe(parent);方法时,我们才能在下图中发射数据


image.png

接下来看看Emitter类,我们只看onComplete()方法

static final class Emitter
    extends AtomicReference<Disposable>
    implements CompletableEmitter, Disposable {

        private static final long serialVersionUID = -2467358622224974244L;

        final CompletableObserver downstream;

        Emitter(CompletableObserver downstream) {
            this.downstream = downstream;
        }

        @Override
        public void onComplete() {
            if (get() != DisposableHelper.DISPOSED) {
                Disposable d = getAndSet(DisposableHelper.DISPOSED);
                if (d != DisposableHelper.DISPOSED) {
                    try {
                        downstream.onComplete();
                    } finally {
                        if (d != null) {
                            d.dispose();
                        }
                    }
                }
            }
        }
  ...
    }

代码中将CompletableObserver实例传递进来,downstream对象就是


image.png

当我们在这里调用onComplete()方法时,就是直接到Emitter实例的onComplete()方法


image.png

Emitter是也是AtomicReference的子类,保准多线程调用的安全

    public void onComplete() {
            if (get() != DisposableHelper.DISPOSED) {
                Disposable d = getAndSet(DisposableHelper.DISPOSED);
                if (d != DisposableHelper.DISPOSED) {
                    try {
                        downstream.onComplete();
                    } finally {
                        if (d != null) {
                            d.dispose();
                        }
                    }
                }
            }
        }

最终调用 downstream.onComplete();,然后整个数据dispose了。dispose之后将接收不到数据,但是任然可以发射数据。

相关文章

网友评论

    本文标题:Rxjava2 Completable第一节create

    本文链接:https://www.haomeiwen.com/subject/qshqkctx.html