Rxjava2

作者: 郑捡书 | 来源:发表于2019-07-31 19:32 被阅读0次

    Season_zl给初学者的RxJava2.0教程

    ObservableEmitter<T> emitter

    1.发射器发出onComplete()或者onError()后,接收器将不再接收时间。
    2.游可以不发送onComplete或onError。
    3.最为关键的是onComplete和onError必须唯一并且互斥, 即不能发多个onComplete, 也不能发多个onError, 也不能先发一个onComplete, 然后再发一个onError, 反之亦然。

    注: 关于onComplete和onError唯一并且互斥这一点, 是需要自行在代码中进行控制, 如果你的代码逻辑中违背了这个规则, **并不一定会导致程序崩溃. ** 比如发送多个onComplete是可以正常运行的, 依然是收到第一个onComplete就不再接收了, 但若是发送多个onError, 则收到第二个onError事件会导致程序会崩溃.

    Disposable d

    当调用它的dispose()方法时, 它就会将两根管道切断, 从而导致下游收不到事件,但上游的还会继续发送剩余事件。
    在Activity中将这个Disposable保存起来, 当Activity退出时切断它即可。多个Disposable则使用CompositeDisposable管理,CompositeDisposable.add()CompositeDisposable.clear()

    总结

    ObservableEmitter<T> emitteronComplete()onError(),以及Disposable ddispose()都只会让下游接收不到事件,但上游假如还存在事件则会继续发送,以上的方法都可以视为阶段器,

    subscribeOn()observeOn()
    • subscribeOn()指定的是上游发送事件的线程,observeOn()指定的是下游接收事件的线程.
    • 多次指定上游的线程只有第一次指定的有效, 也就是说多次调用subscribeOn() 只有第一次的有效, 其余的会被忽略.
    • 多次指定下游的线程是可以的, 也就是说每调用一次observeOn() , 下游的线程就会切换一次.
        Schedulers.io() 代表io操作的线程, 通常用于网络,读写文件等io密集型的操作
        Schedulers.computation() 代表CPU计算密集型的操作, 例如需要大量计算的操作
        Schedulers.newThread() 代表一个常规的新线程
        AndroidSchedulers.mainThread()  代表Android的主线程
    
    Map操作符

    对原数据进行变化操作(其实就是一个方法,接收原数据操作然后返回结果数据)

    FlatMap操作符(玩的熟才用,否则容易晕)

    将第一次发送的数据和flatMap发送的数据进行组合再此发送。比如第一次发送ABC,第二次发送123,那么可能(因为不保证顺序)会出现A1A2A3 B1B2B3 C1C2C3 。保证顺序的话用concatMap

    Zip操作符

    对多个发送源的数据进行合并,每个源数据的对应角标的元素进行合并,以最短发送源的为准,较长发送源的剩余元素被舍弃。同一线程一定有会有一个发送源先全部发送完毕。

    Flowable(默认缓存为128个事件,响应式拉取)

    背压策略:BackpressureStrategy(水缸)。一般的使用场景都是发送量大且异步(因为这两个都可以会引起内存溢出)

    • ERROR,上游积压超过128事件则会直接报异常
    • BUFFER, 无限制缓存大小,但是会存在OOM风险
    • DROP, 丢弃超过128个事件的剩余事件(默认缓存为128,你发了129,那么第129不会进入水缸)。 Drop就是直接把存不下的事件丢弃
    • LATEST, Latest就是只保留最新的事件,当水缸(缓存128)已经存满了128个事件,那么这时候还有事件进入的话则前面的事件会被覆盖掉。
    背压源码解析
    Flowable
    //  创建上游的方法
       public static <T> Flowable<T> create(FlowableOnSubscribe<T> source, BackpressureStrategy mode) {
            // 检查是否为null的工具类,不必深究
            ObjectHelper.requireNonNull(source, "source is null");
            ObjectHelper.requireNonNull(mode, "mode is null");
            //  RxJavaPlugins.onAssembly()。因为是链式模式,所以返回本身,这个方法就是一个包裹转换的功能,不必深究
            // FlowableCreate,这个类才是重点
            return RxJavaPlugins.onAssembly(new FlowableCreate<T>(source, mode));
        }
    
    // 订阅下游的方法
    public final void subscribe(Subscriber<? super T> s) {
            // 一般我都是直接new一个Subscriber,所以走else块。
            if (s instanceof FlowableSubscriber) {
                subscribe((FlowableSubscriber<? super T>)s);
            } else {
                ObjectHelper.requireNonNull(s, "s is null");
                // 包裹一层
                subscribe(new StrictSubscriber<T>(s));
            }
    
    
       public final void subscribe(FlowableSubscriber<? super T> s) {
            ObjectHelper.requireNonNull(s, "s is null");
            try {
                Subscriber<? super T> z = RxJavaPlugins.onSubscribe(this, s);
    
                ObjectHelper.requireNonNull(z, "The RxJavaPlugins.onSubscribe hook returned a null FlowableSubscriber. Please check the handler provided to RxJavaPlugins.setOnFlowableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
                
                subscribeActual(z);  // !!!!!!!!!!真实发起订阅(其他代码可不看,就看这个句)
            } catch (NullPointerException e) { // NOPMD
                throw e;
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                // can't call onError because no way to know if a Subscription has been set or not
                // can't call onSubscribe because the call might have set a Subscription already
                RxJavaPlugins.onError(e);
    
                NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
                npe.initCause(e);
                throw npe;
            }
        }
    
    StrictSubscriber 下游类
    public class StrictSubscriber<T>
    extends AtomicInteger
    implements FlowableSubscriber<T>, Subscription {
    
        private static final long serialVersionUID = -4945028590049415624L;
    
        final Subscriber<? super T> downstream;
    
        final AtomicThrowable error;
    
        final AtomicLong requested;
    
        final AtomicReference<Subscription> upstream;
    
        final AtomicBoolean once;
    
        volatile boolean done;
    
        public StrictSubscriber(Subscriber<? super T> downstream) {
            this.downstream = downstream;
            this.error = new AtomicThrowable();
            this.requested = new AtomicLong();
            this.upstream = new AtomicReference<Subscription>();
            this.once = new AtomicBoolean();
        }
    
        @Override
        public void request(long n) {
            if (n <= 0) {
                cancel();
                onError(new IllegalArgumentException("§3.9 violated: positive request amount required but it was " + n));
            } else {
                SubscriptionHelper.deferredRequest(upstream, requested, n);
            }
        }
    
        @Override
        public void cancel() {
            if (!done) {
                SubscriptionHelper.cancel(upstream);
            }
        }
    
        @Override
        public void onSubscribe(Subscription s) {
            if (once.compareAndSet(false, true)) {
    
                downstream.onSubscribe(this);
    
                SubscriptionHelper.deferredSetOnce(this.upstream, requested, s);
            } else {
                s.cancel();
                cancel();
                onError(new IllegalStateException("§2.12 violated: onSubscribe must be called at most once"));
            }
        }
    
        @Override
        public void onNext(T t) {
            HalfSerializer.onNext(downstream, t, this, error);
        }
    
        @Override
        public void onError(Throwable t) {
            done = true;
            HalfSerializer.onError(downstream, t, this, error);
        }
    
        @Override
        public void onComplete() {
            done = true;
            HalfSerializer.onComplete(downstream, this, error);
        }
    }
    
    
    FlowableCreate(继承Flowable)
      // 构造方法
       public FlowableCreate(FlowableOnSubscribe<T> source, BackpressureStrategy backpressure) {
            //  持有把上游对象
            this.source = source;
            // 持有背压模式对象
            this.backpressure = backpressure;
        }
    
      // 实际订阅,Flowable的subscribe()内部会调用这个方法。
      // 当你使用订阅下游的时候,会把下游对象传到这个方法。
    @Override
        public void subscribeActual(Subscriber<? super T> t) {
            BaseEmitter<T> emitter;
            
            // 工厂模式,根据背压模式实例化对应的发射器,且会把下游对象通过发射器的构造方法让发射器内部持有(所以我们在发射器才会知道下游所需的处理能力)。
          // 背压的核心就是这些工厂类,执行的条件不同产生的效果就不同
            switch (backpressure) {
            case MISSING: {
                emitter = new MissingEmitter<T>(t);
                break;
            }
            case ERROR: {
                emitter = new ErrorAsyncEmitter<T>(t);
                break;
            }
            case DROP: {
                emitter = new DropAsyncEmitter<T>(t);
                break;
            }
            case LATEST: {
                emitter = new LatestAsyncEmitter<T>(t);
                break;
            }
            default: {
                emitter = new BufferAsyncEmitter<T>(t, bufferSize());
                break;
            }
            }
            
            // 调用下游的onSubscribe,并且把发射器对象传递过去让下游对象持有。(双向传递,下游和发射器互相持有对方的对象)
            t.onSubscribe(emitter);
            try {
                // 上游持有了发射器对象
                // 使用上游对象执行该对象的subscribe,其实就是走发射事件的逻辑
                source.subscribe(emitter);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                emitter.onError(ex);
            }
        }
    
    BaseEmitter背压发射器基类
    abstract static class BaseEmitter<T>
        extends AtomicLong
        implements FlowableEmitter<T>, Subscription {
            private static final long serialVersionUID = 7326289992464377023L;
    
            final Subscriber<? super T> downstream;
    
            final SequentialDisposable serial;
    
            BaseEmitter(Subscriber<? super T> downstream) {
                // 下游对象
                this.downstream = downstream;
                // 切断对象
                this.serial = new SequentialDisposable();
            }
    
            @Override
            public void onComplete() {
                complete();
            }
    
            protected void complete() {
                // 如果已经切断了就跳过,所以下游不会收到onComplete()事件
                if (isCancelled()) {
                    return;
                }
                try {
                    // 回调下游的onComplete()事件
                    downstream.onComplete();
                } finally {
                    // 切断
                    serial.dispose();
                }
            }
    
            @Override
            public final void onError(Throwable e) {
                if (!tryOnError(e)) {
                    // 已经切断,如果接着发送onError内部会抛异常
                    RxJavaPlugins.onError(e);
                }
            }
    
            @Override
            public boolean tryOnError(Throwable e) {
                return error(e);
            }
    
            protected boolean error(Throwable e) {
                // 判断开发者传递的异常是否为null
                if (e == null) {
                    e = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
                }
             
                if (isCancelled()) {
                    return false;
                }
                try {
                    // 回调下游的方法
                    downstream.onError(e);
                } finally {
                   // 切断
                    serial.dispose();
                }
                return true;
            }
    
            @Override
            public final void cancel() {
                // 切断
                serial.dispose();
                onUnsubscribed();
            }
            
           // 注销订阅,空实现
            void onUnsubscribed() {
                // default is no-op
            }
    
            @Override
            public final boolean isCancelled() {
                return serial.isDisposed();
            }
    
            @Override
            public final void request(long n) {
                if (SubscriptionHelper.validate(n)) {
                    // 将下游请求的事件数存放
                    BackpressureHelper.add(this, n);
                    onRequested();
                }
            }
    
            void onRequested() {
                // default is no-op
            }
    
            @Override
            public final void setDisposable(Disposable d) {
                serial.update(d);
            }
    
            @Override
            public final void setCancellable(Cancellable c) {
                setDisposable(new CancellableDisposable(c));
            }
    
            @Override
            public final long requested() {
                return get();
            }
    
            @Override
            public final FlowableEmitter<T> serialize() {
                return new SerializedEmitter<T>(this);
            }
    
            @Override
            public String toString() {
                return String.format("%s{%s}", getClass().getSimpleName(), super.toString());
            }
        }
    
    ErrorAsyncEmitter背压发射器(继承了NoOverflowBaseAsyncEmitter)
    
     static final class ErrorAsyncEmitter<T> extends NoOverflowBaseAsyncEmitter<T> {
    
            private static final long serialVersionUID = 338953216916120960L;
    
            ErrorAsyncEmitter(Subscriber<? super T> downstream) {
                super(downstream);
            }
    
            @Override
            void onOverflow() {
                // 回调下游的onError(),直接抛出异常
                onError(new MissingBackpressureException("create: could not emit value due to lack of requests"));
            }
        }
    
    
    
       abstract static class NoOverflowBaseAsyncEmitter<T> extends BaseEmitter<T> {
    
            private static final long serialVersionUID = 4127754106204442833L;
    
            NoOverflowBaseAsyncEmitter(Subscriber<? super T> downstream) {
                super(downstream);
            }
    
            @Override
            public final void onNext(T t) {
                if (isCancelled()) {
                    return;
                }
    
                if (t == null) {
                    onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                    return;
                }
                
                if (get() != 0) {  // 下游所需事件不为0,就是下游还有处理的事件
                    downstream.onNext(t);
                    BackpressureHelper.produced(this, 1);
                } else {
                    // 调用子类重写的方法
                    onOverflow();
                }
            }
            
            // 子类重写
            abstract void onOverflow();
        }
    
        // BackpressureHelper的方法
        public static long produced(AtomicLong requested, long n) {
            for (;;) {
                long current = requested.get();
                if (current == Long.MAX_VALUE) {
                    return Long.MAX_VALUE;
                }
                // 下游所需事件 - 1
                long update = current - n;
                if (update < 0L) {
                    RxJavaPlugins.onError(new IllegalStateException("More produced than requested: " + update));
                    update = 0L;
                }
                // 重置所需事件数
                if (requested.compareAndSet(current, update)) {
                    return update;
                }
            }
        }
    
    上游,下游,发射器关系丑图

    相关文章

      网友评论

          本文标题:Rxjava2

          本文链接:https://www.haomeiwen.com/subject/aijiuqtx.html