rxjava2基本元素源码分析

作者: 草蜢的逆袭 | 来源:发表于2019-03-28 11:32 被阅读1次

    无背压

    代码示例

    Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
            LogUtils.loge("Observable onSubscribe subscribe...");
            if (!emitter.isDisposed()) {
                emitter.onNext("test1");
                emitter.onComplete();
            }
        }
    }).subscribe(new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {
            LogUtils.loge("Observer onSubscribe ...");
        }
    
        @Override
        public void onNext(String s) {
            LogUtils.loge("Observer onNext str = " + s);
        }
    
        @Override
        public void onError(Throwable e) {
    
        }
    
        @Override
        public void onComplete() {
            LogUtils.loge("Observer onComplete ...");
        }
    });
    

    基本元素

    Observable

    1. 观察得到的-被观察者,不支持背压
    2. 通过Observable创建一个可观察的序列(create)
    3. 通过subscribe去注册一个观察者

    Observer

    1. 用于接收数据----观察者
    2. 作为Observable的subscribe的方法的参数

    Disposable

    1. 和Rxjava1的Susbscription的作用相当
    2. 用于取消订阅和获取当前的订阅状态

    ObservableOnSubscribe

    1. 当订阅时会触发此接口调用
    2. 在Observable内部,实际作用是向观察者发射数据

    Emitter

    1. 一个发送数据的接口,和Observer的方法类似
    2. 本质是对Observer和Subscriber的包装

    流程分解

    io.reactivex.Observable#create

    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }
    

    io.reactivex.plugins.RxJavaPlugins#onAssembly(io.reactivex.Observable<T>)

    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        // onObservableAssembly为里为空,所以f为空。返回的是我们传入的observable
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }
    

    io.reactivex.Observable#subscribe(io.reactivex.Observer<? super T>)

    @SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);
            // 这里实际上是调用的是ObservableCreate中的subscribeActual方法
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
        } catch (Throwable e) {
        }
    }
    

    io.reactivex.plugins.RxJavaPlugins#onSubscribe(io.reactivex.Observable<T>, io.reactivex.Observer<? super T>)####

    public static <T> Observer<? super T> onSubscribe(@NonNull Observable<T> source, @NonNull Observer<? super T> observer) {
        BiFunction<? super Observable, ? super Observer, ? extends Observer> f = onObservableSubscribe;
        // onObservableSubscribe,所以f为空。返回的是我们传入的observer
        if (f != null) {
            return apply(f, source, observer);
        }
        return observer;
    }
    

    io.reactivex.internal.operators.observable.ObservableCreate

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        // 构建Emitter
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        // 调用observer的onSubscribe,并传入创建的Emitter
        observer.onSubscribe(parent);
    
        try {
            // 调用subscribe方法
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
    
    @Override
    public boolean isDisposed() {
        return emitter.isDisposed();
    }
    
    static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {
    
        private static final long serialVersionUID = -3434801548987643227L;
    
        final Observer<? super T> observer;
    
        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }
    
        @Override
        public void onNext(T t) {
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }
    
        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }
    
        @Override
        public void dispose() {
            DisposableHelper.dispose(this);
        }
    
        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }
    }
    

    io.reactivex.internal.disposables.DisposableHelper 单例类

    public enum DisposableHelper implements Disposable {
    
        DISPOSED
        ;
    
        public static boolean isDisposed(Disposable d) {
            // 判断需要dispose的对象是否是已经dispose的
            return d == DISPOSED;
        }
    
        public static boolean dispose(AtomicReference<Disposable> field) {
            / 取到当前的Disposable的对象
            Disposable current = field.get();
            // 得到已经disposed的对象
            Disposable d = DISPOSED;
            if (current != d) {
                // 将AtomicReference中Disposable标识为disposed状态
                current = field.getAndSet(d);
                if (current != d) {
                    if (current != null) {
                        current.dispose();
                    }
                    return true;
                }
            }
            return false;
        }
    
        @Override
        public boolean isDisposed() {
            return true;
        }
    }
    

    有背压

    代码示例

        Flowable.create((FlowableOnSubscribe<String>) emitter -> {
            LogUtils.loge("FlowableOnSubscribe subscribe");
            if (!emitter.isCancelled()) {
                emitter.onNext("test11");
                emitter.onComplete();
            }
        }, BackpressureStrategy.DROP).subscribe(new Subscriber<String>() {
            @Override
            public void onSubscribe(Subscription s) {
                // 需要主动发起请求否则会请求不到数据
                s.request(Integer.MAX_VALUE);
                LogUtils.loge("Subscriber onSubscribe");
            }
    
            @Override
            public void onNext(String s) {
                LogUtils.loge("Subscriber onNext s = " + s);
            }
    
            @Override
            public void onError(Throwable t) {
    
            }
    
            @Override
            public void onComplete() {
                LogUtils.loge("Subscriber onComplete");
            }
        });
    

    基本元素

    Flowable

    1. 易流动的--------被观察者,支持背压
    2. 通过Flowable创建一个可观察的序列(create方法)
    3. 通过subscribe去注册一个观察者

    Subscriber

    1. 一个单独接口,和Observer方法类似
    2. 作为Flowable的subscribe方法的一个参数

    Subscription

    1. 订阅,和Rxjava1有所不同
    2. 支持背压,有用于背压的request方法

    FlowableOnSubscribe

    1. 当订阅时会触发此接口调用
    2. 在Flowable内部,实际作用是向观察者发射数据

    OnSubscribe

    Emitter

    1. 一个发射数据的接口,和Observer的方法类似
    2. 本质是对Observer和Subscriber的包装

    流程分解

    io.reactivex.Flowable#create

    public static <T> Flowable<T> create(FlowableOnSubscribe<T> source, BackpressureStrategy mode) {
        // 这里返回的就是FlowableCreate
        return RxJavaPlugins.onAssembly(new FlowableCreate<T>(source, mode));
    }
    

    io.reactivex.Flowable#subscribe(org.reactivestreams.Subscriber<? super T>)

    public final void subscribe(Subscriber<? super T> s) {
        subscribe(new StrictSubscriber<T>(s));
    }
    

    io.reactivex.Flowable#subscribe(io.reactivex.FlowableSubscriber<? super T>)

    public final void subscribe(FlowableSubscriber<? super T> s) {
        Subscriber<? super T> z = RxJavaPlugins.onSubscribe(this, s);
        // 调用的是FlowableCreate的subscribeActual方法
        subscribeActual(z);
    }
    

    io.reactivex.internal.operators.flowable.FlowableCreate

    @Override
    public void subscribeActual(Subscriber<? super T> t) {
        BaseEmitter<T> emitter;
        // 根据背压策略构建emitter
        switch (backpressure) {
            case MISSING: {
                emitter = new MissingEmitter<T>(t);
                break;
            }
            case ERROR: {
                emitter = new ErrorAsyncEmitter<T>(t);
                break;
            }
            case DROP: {
                emitter = new DropAsyncEmitter<T>(t);
                break;
            }
            case LATEST: {
                emitter = new LatestAsyncEmitter<T>(t);
                break;
            }
            default: {
                emitter = new BufferAsyncEmitter<T>(t, bufferSize());
                break;
            }
        }
        // 调用   Subscriber的onSubscribe
        t.onSubscribe(emitter);
        try {
            // 调用subscribe方法
            source.subscribe(emitter);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            emitter.onError(ex);
        }
    }
    

    io.reactivex.internal.operators.flowable.FlowableCreate#DropAsyncEmitter

    static final class DropAsyncEmitter<T> extends NoOverflowBaseAsyncEmitter<T> {
    
        private static final long serialVersionUID = 8360058422307496563L;
    
        DropAsyncEmitter(Subscriber<? super T> downstream) {
            super(downstream);
        }
    }
    

    io.reactivex.internal.operators.flowable.FlowableCreate#NoOverflowBaseAsyncEmitter

    abstract static class NoOverflowBaseAsyncEmitter<T> extends BaseEmitter<T> {
    
        private static final long serialVersionUID = 4127754106204442833L;
    
        NoOverflowBaseAsyncEmitter(Subscriber<? super T> downstream) {
            super(downstream);
        }
    
        @Override
        public final void onNext(T t) {
            if (isCancelled()) {
                return;
            }
            
            // get的值不为0才会调用onNext方法
            // 只有调用了request(n)的时候,这里才不会为0
            if (get() != 0) {
                downstream.onNext(t);
                BackpressureHelper.produced(this, 1);
            } else {
                onOverflow();
            }
        }
    
        abstract void onOverflow();
    }
    

    io.reactivex.internal.operators.flowable.FlowableCreate#BaseEmitter

    abstract static class BaseEmitter<T>
    extends AtomicLong
    implements FlowableEmitter<T>, Subscription {
        private static final long serialVersionUID = 7326289992464377023L;
    
        final Subscriber<? super T> downstream;
    
        final SequentialDisposable serial;
    
        BaseEmitter(Subscriber<? super T> downstream) {
            this.downstream = downstream;
            this.serial = new SequentialDisposable();
        }
    
        @Override
        public final void cancel() {
            serial.dispose();
            onUnsubscribed();
        }
    
        @Override
        public final boolean isCancelled() {
            return serial.isDisposed();
        }
    
        @Override
        public final void request(long n) {
            if (SubscriptionHelper.validate(n)) {
                BackpressureHelper.add(this, n);
            }
        }
    }
    

    io.reactivex.internal.subscriptions.SubscriptionHelper#validate(long)

    public static boolean validate(long n) {
        /* 如果传入的值小于等于0的时候,就直接返回了, 这就是为啥不调用这个request方法,onNext方法是不走的
        */
        if (n <= 0) {
            RxJavaPlugins.onError(new IllegalArgumentException("n > 0 required but it was " + n));
            return false;
        }
        return true;
    }
    

    io.reactivex.internal.util.BackpressureHelper#add

    public static long add(AtomicLong requested, long n) {
        for (;;) {
            long r = requested.get();
            if (r == Long.MAX_VALUE) {
                return Long.MAX_VALUE;
            }
            long u = addCap(r, n);
            // requested的这里会被设置成最大值
            if (requested.compareAndSet(r, u)) {
                return r;
            }
        }
    }
    

    io.reactivex.internal.util.BackpressureHelper#produced

    public static long produced(AtomicLong requested, long n) {
        for (;;) {
            long current = requested.get();
            if (current == Long.MAX_VALUE) {
                return Long.MAX_VALUE;
            }
            long update = current - n;
            if (update < 0L) {
                RxJavaPlugins.onError(new IllegalStateException("More produced than requested: " + update));
                update = 0L;
            }
            if (requested.compareAndSet(current, update)) {
                return update;
            }
        }
    }
    

    源码阅读总结

    不调用request,onNext不被执行的原因分析

    Subscriber->onSubscribe

    s.request(Integer.MAX_VALUE);

    FlowableCreate.BaseEmitter#request ->

    io.reactivex.internal.util.BackpressureHelper#add

    将设置的值更新到BaseEmitter中,BaseEmitter继承自AtomicLong


    FlowableOnSubscribe-> subscribe
    emitter.onNext(a);

    io.reactivex.internal.operators.flowable.FlowableCreate.NoOverflowBaseAsyncEmitter#onNext

    获取AtomicLong的值,不为0的时候,才会调用Subscriber的onNext方法

    总之,Flowable是复用强制拉取,解决背压策略的

    相关文章

      网友评论

        本文标题:rxjava2基本元素源码分析

        本文链接:https://www.haomeiwen.com/subject/trxqbqtx.html