Rxjava2

作者: 郑捡书 | 来源:发表于2019-07-31 19:32 被阅读0次

Season_zl给初学者的RxJava2.0教程

ObservableEmitter<T> emitter

1.发射器发出onComplete()或者onError()后,接收器将不再接收时间。
2.游可以不发送onComplete或onError。
3.最为关键的是onComplete和onError必须唯一并且互斥, 即不能发多个onComplete, 也不能发多个onError, 也不能先发一个onComplete, 然后再发一个onError, 反之亦然。

注: 关于onComplete和onError唯一并且互斥这一点, 是需要自行在代码中进行控制, 如果你的代码逻辑中违背了这个规则, **并不一定会导致程序崩溃. ** 比如发送多个onComplete是可以正常运行的, 依然是收到第一个onComplete就不再接收了, 但若是发送多个onError, 则收到第二个onError事件会导致程序会崩溃.

Disposable d

当调用它的dispose()方法时, 它就会将两根管道切断, 从而导致下游收不到事件,但上游的还会继续发送剩余事件。
在Activity中将这个Disposable保存起来, 当Activity退出时切断它即可。多个Disposable则使用CompositeDisposable管理,CompositeDisposable.add()CompositeDisposable.clear()

总结

ObservableEmitter<T> emitteronComplete()onError(),以及Disposable ddispose()都只会让下游接收不到事件,但上游假如还存在事件则会继续发送,以上的方法都可以视为阶段器,

subscribeOn()observeOn()
  • subscribeOn()指定的是上游发送事件的线程,observeOn()指定的是下游接收事件的线程.
  • 多次指定上游的线程只有第一次指定的有效, 也就是说多次调用subscribeOn() 只有第一次的有效, 其余的会被忽略.
  • 多次指定下游的线程是可以的, 也就是说每调用一次observeOn() , 下游的线程就会切换一次.
    Schedulers.io() 代表io操作的线程, 通常用于网络,读写文件等io密集型的操作
    Schedulers.computation() 代表CPU计算密集型的操作, 例如需要大量计算的操作
    Schedulers.newThread() 代表一个常规的新线程
    AndroidSchedulers.mainThread()  代表Android的主线程
Map操作符

对原数据进行变化操作(其实就是一个方法,接收原数据操作然后返回结果数据)

FlatMap操作符(玩的熟才用,否则容易晕)

将第一次发送的数据和flatMap发送的数据进行组合再此发送。比如第一次发送ABC,第二次发送123,那么可能(因为不保证顺序)会出现A1A2A3 B1B2B3 C1C2C3 。保证顺序的话用concatMap

Zip操作符

对多个发送源的数据进行合并,每个源数据的对应角标的元素进行合并,以最短发送源的为准,较长发送源的剩余元素被舍弃。同一线程一定有会有一个发送源先全部发送完毕。

Flowable(默认缓存为128个事件,响应式拉取)

背压策略:BackpressureStrategy(水缸)。一般的使用场景都是发送量大且异步(因为这两个都可以会引起内存溢出)

  • ERROR,上游积压超过128事件则会直接报异常
  • BUFFER, 无限制缓存大小,但是会存在OOM风险
  • DROP, 丢弃超过128个事件的剩余事件(默认缓存为128,你发了129,那么第129不会进入水缸)。 Drop就是直接把存不下的事件丢弃
  • LATEST, Latest就是只保留最新的事件,当水缸(缓存128)已经存满了128个事件,那么这时候还有事件进入的话则前面的事件会被覆盖掉。
背压源码解析
Flowable
//  创建上游的方法
   public static <T> Flowable<T> create(FlowableOnSubscribe<T> source, BackpressureStrategy mode) {
        // 检查是否为null的工具类,不必深究
        ObjectHelper.requireNonNull(source, "source is null");
        ObjectHelper.requireNonNull(mode, "mode is null");
        //  RxJavaPlugins.onAssembly()。因为是链式模式,所以返回本身,这个方法就是一个包裹转换的功能,不必深究
        // FlowableCreate,这个类才是重点
        return RxJavaPlugins.onAssembly(new FlowableCreate<T>(source, mode));
    }
// 订阅下游的方法
public final void subscribe(Subscriber<? super T> s) {
        // 一般我都是直接new一个Subscriber,所以走else块。
        if (s instanceof FlowableSubscriber) {
            subscribe((FlowableSubscriber<? super T>)s);
        } else {
            ObjectHelper.requireNonNull(s, "s is null");
            // 包裹一层
            subscribe(new StrictSubscriber<T>(s));
        }


   public final void subscribe(FlowableSubscriber<? super T> s) {
        ObjectHelper.requireNonNull(s, "s is null");
        try {
            Subscriber<? super T> z = RxJavaPlugins.onSubscribe(this, s);

            ObjectHelper.requireNonNull(z, "The RxJavaPlugins.onSubscribe hook returned a null FlowableSubscriber. Please check the handler provided to RxJavaPlugins.setOnFlowableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
            
            subscribeActual(z);  // !!!!!!!!!!真实发起订阅(其他代码可不看,就看这个句)
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Subscription has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }
StrictSubscriber 下游类
public class StrictSubscriber<T>
extends AtomicInteger
implements FlowableSubscriber<T>, Subscription {

    private static final long serialVersionUID = -4945028590049415624L;

    final Subscriber<? super T> downstream;

    final AtomicThrowable error;

    final AtomicLong requested;

    final AtomicReference<Subscription> upstream;

    final AtomicBoolean once;

    volatile boolean done;

    public StrictSubscriber(Subscriber<? super T> downstream) {
        this.downstream = downstream;
        this.error = new AtomicThrowable();
        this.requested = new AtomicLong();
        this.upstream = new AtomicReference<Subscription>();
        this.once = new AtomicBoolean();
    }

    @Override
    public void request(long n) {
        if (n <= 0) {
            cancel();
            onError(new IllegalArgumentException("§3.9 violated: positive request amount required but it was " + n));
        } else {
            SubscriptionHelper.deferredRequest(upstream, requested, n);
        }
    }

    @Override
    public void cancel() {
        if (!done) {
            SubscriptionHelper.cancel(upstream);
        }
    }

    @Override
    public void onSubscribe(Subscription s) {
        if (once.compareAndSet(false, true)) {

            downstream.onSubscribe(this);

            SubscriptionHelper.deferredSetOnce(this.upstream, requested, s);
        } else {
            s.cancel();
            cancel();
            onError(new IllegalStateException("§2.12 violated: onSubscribe must be called at most once"));
        }
    }

    @Override
    public void onNext(T t) {
        HalfSerializer.onNext(downstream, t, this, error);
    }

    @Override
    public void onError(Throwable t) {
        done = true;
        HalfSerializer.onError(downstream, t, this, error);
    }

    @Override
    public void onComplete() {
        done = true;
        HalfSerializer.onComplete(downstream, this, error);
    }
}

FlowableCreate(继承Flowable)
  // 构造方法
   public FlowableCreate(FlowableOnSubscribe<T> source, BackpressureStrategy backpressure) {
        //  持有把上游对象
        this.source = source;
        // 持有背压模式对象
        this.backpressure = backpressure;
    }

  // 实际订阅,Flowable的subscribe()内部会调用这个方法。
  // 当你使用订阅下游的时候,会把下游对象传到这个方法。
@Override
    public void subscribeActual(Subscriber<? super T> t) {
        BaseEmitter<T> emitter;
        
        // 工厂模式,根据背压模式实例化对应的发射器,且会把下游对象通过发射器的构造方法让发射器内部持有(所以我们在发射器才会知道下游所需的处理能力)。
      // 背压的核心就是这些工厂类,执行的条件不同产生的效果就不同
        switch (backpressure) {
        case MISSING: {
            emitter = new MissingEmitter<T>(t);
            break;
        }
        case ERROR: {
            emitter = new ErrorAsyncEmitter<T>(t);
            break;
        }
        case DROP: {
            emitter = new DropAsyncEmitter<T>(t);
            break;
        }
        case LATEST: {
            emitter = new LatestAsyncEmitter<T>(t);
            break;
        }
        default: {
            emitter = new BufferAsyncEmitter<T>(t, bufferSize());
            break;
        }
        }
        
        // 调用下游的onSubscribe,并且把发射器对象传递过去让下游对象持有。(双向传递,下游和发射器互相持有对方的对象)
        t.onSubscribe(emitter);
        try {
            // 上游持有了发射器对象
            // 使用上游对象执行该对象的subscribe,其实就是走发射事件的逻辑
            source.subscribe(emitter);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            emitter.onError(ex);
        }
    }
BaseEmitter背压发射器基类
abstract static class BaseEmitter<T>
    extends AtomicLong
    implements FlowableEmitter<T>, Subscription {
        private static final long serialVersionUID = 7326289992464377023L;

        final Subscriber<? super T> downstream;

        final SequentialDisposable serial;

        BaseEmitter(Subscriber<? super T> downstream) {
            // 下游对象
            this.downstream = downstream;
            // 切断对象
            this.serial = new SequentialDisposable();
        }

        @Override
        public void onComplete() {
            complete();
        }

        protected void complete() {
            // 如果已经切断了就跳过,所以下游不会收到onComplete()事件
            if (isCancelled()) {
                return;
            }
            try {
                // 回调下游的onComplete()事件
                downstream.onComplete();
            } finally {
                // 切断
                serial.dispose();
            }
        }

        @Override
        public final void onError(Throwable e) {
            if (!tryOnError(e)) {
                // 已经切断,如果接着发送onError内部会抛异常
                RxJavaPlugins.onError(e);
            }
        }

        @Override
        public boolean tryOnError(Throwable e) {
            return error(e);
        }

        protected boolean error(Throwable e) {
            // 判断开发者传递的异常是否为null
            if (e == null) {
                e = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            }
         
            if (isCancelled()) {
                return false;
            }
            try {
                // 回调下游的方法
                downstream.onError(e);
            } finally {
               // 切断
                serial.dispose();
            }
            return true;
        }

        @Override
        public final void cancel() {
            // 切断
            serial.dispose();
            onUnsubscribed();
        }
        
       // 注销订阅,空实现
        void onUnsubscribed() {
            // default is no-op
        }

        @Override
        public final boolean isCancelled() {
            return serial.isDisposed();
        }

        @Override
        public final void request(long n) {
            if (SubscriptionHelper.validate(n)) {
                // 将下游请求的事件数存放
                BackpressureHelper.add(this, n);
                onRequested();
            }
        }

        void onRequested() {
            // default is no-op
        }

        @Override
        public final void setDisposable(Disposable d) {
            serial.update(d);
        }

        @Override
        public final void setCancellable(Cancellable c) {
            setDisposable(new CancellableDisposable(c));
        }

        @Override
        public final long requested() {
            return get();
        }

        @Override
        public final FlowableEmitter<T> serialize() {
            return new SerializedEmitter<T>(this);
        }

        @Override
        public String toString() {
            return String.format("%s{%s}", getClass().getSimpleName(), super.toString());
        }
    }
ErrorAsyncEmitter背压发射器(继承了NoOverflowBaseAsyncEmitter)

 static final class ErrorAsyncEmitter<T> extends NoOverflowBaseAsyncEmitter<T> {

        private static final long serialVersionUID = 338953216916120960L;

        ErrorAsyncEmitter(Subscriber<? super T> downstream) {
            super(downstream);
        }

        @Override
        void onOverflow() {
            // 回调下游的onError(),直接抛出异常
            onError(new MissingBackpressureException("create: could not emit value due to lack of requests"));
        }
    }



   abstract static class NoOverflowBaseAsyncEmitter<T> extends BaseEmitter<T> {

        private static final long serialVersionUID = 4127754106204442833L;

        NoOverflowBaseAsyncEmitter(Subscriber<? super T> downstream) {
            super(downstream);
        }

        @Override
        public final void onNext(T t) {
            if (isCancelled()) {
                return;
            }

            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            
            if (get() != 0) {  // 下游所需事件不为0,就是下游还有处理的事件
                downstream.onNext(t);
                BackpressureHelper.produced(this, 1);
            } else {
                // 调用子类重写的方法
                onOverflow();
            }
        }
        
        // 子类重写
        abstract void onOverflow();
    }

    // BackpressureHelper的方法
    public static long produced(AtomicLong requested, long n) {
        for (;;) {
            long current = requested.get();
            if (current == Long.MAX_VALUE) {
                return Long.MAX_VALUE;
            }
            // 下游所需事件 - 1
            long update = current - n;
            if (update < 0L) {
                RxJavaPlugins.onError(new IllegalStateException("More produced than requested: " + update));
                update = 0L;
            }
            // 重置所需事件数
            if (requested.compareAndSet(current, update)) {
                return update;
            }
        }
    }
上游,下游,发射器关系丑图

相关文章

网友评论

      本文标题:Rxjava2

      本文链接:https://www.haomeiwen.com/subject/aijiuqtx.html