RxJava

作者: spilledyear | 来源:发表于2019-01-19 20:08 被阅读0次

    观察者模式和回调机制

    基本使用

    @Test
    public void test01() {
        Observable.create(new ObservableOnSubscribe<Object>() {
            @Override
            public void subscribe(ObservableEmitter<Object> observableEmitter) throws Exception {
                log.info("可以在这里触发消费者的方法");
    
                observableEmitter.onNext("onNext方法被调用");
                observableEmitter.onComplete();
            }
        }).subscribe(new Observer<Object>() {
    
            @Override
            public void onSubscribe(Disposable disposable) {
                log.info("Observable调用subscribe方法时会触发这个onSubscribe方法");
            }
    
            @Override
            public void onNext(Object o) {
                log.info(o.toString());
            }
    
            @Override
            public void onError(Throwable throwable) {
                log.info("onError");
            }
    
            @Override
            public void onComplete() {
                log.info("onComplete方法被调用");
            }
        });
    }
    

    输出结果如下

    Observable调用subscribe方法时会触发这个onSubscribe方法
    可以在这里触发消费者的方法
    onNext方法被调用
    onComplete方法被调用
    

    源码分析

    Observable#create
    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }
    
    ObservableOnSubscribe

    为订阅的每个观察者调用

    public interface ObservableOnSubscribe<T> {
    
        /**
         * Called for each Observer that subscribes.
         * @param emitter the safe emitter instance, never null
         * @throws Exception on error
         */
        void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
    }
    
    ObservableCreate

    实现了Observable接口

    public final class ObservableCreate<T> extends Observable<T> {
        final ObservableOnSubscribe<T> source;
    
        public ObservableCreate(ObservableOnSubscribe<T> source) {
            this.source = source;
        }
    
        ......
    }
    
    RxJavaPlugins.onAssembly

    这里的source就是ObservableCreate,先不分析 f != null的情况, 直接返回source

    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }
    

    Observable的create方法,返回了一个ObservableCreate对象,ObservableCreate内部持有ObservableOnSubscribe实例

    上面分析的是create方法,接下分析Observable的subscribe方法
    接收一个Observer对象,这里指的是代码中的匿名内部类

    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            // 返回observer
            observer = RxJavaPlugins.onSubscribe(this, observer);
    
            ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
    
            // 调用ObservableCreate的subscribeActual方法
            subscribeActual(observer);
        } catch (NullPointerException e) {
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);
    
            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }
    
    RxJavaPlugins.onSubscribe

    不分享f != null情况,直接返回Observer

    public static <T> Observer<? super T> onSubscribe(@NonNull Observable<T> source, @NonNull Observer<? super T> observer) {
        BiFunction<? super Observable, ? super Observer, ? extends Observer> f = onObservableSubscribe;
        if (f != null) {
            return apply(f, source, observer);
        }
        return observer;
    }
    
    ObservableCreate#subscribeActual
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        // 将 observer 包装成 CreateEmitter
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    
        // 调用 observer的 onSubscribe方法
        observer.onSubscribe(parent);
    
        try {
            // 调用ObservableOnSubscribe的subscribe方法,前面说过 ObservableCreate 持有一个ObservableOnSubscribe实例,就是create方法传进来的匿名类
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
    
    CreateEmitter

    ObservableOnSubscribe的subscribe方法,调用CreateEmitter的相关方法,本质上是调用 Observer 的相关方法

    static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {
        private static final long serialVersionUID = -3434801548987643227L;
    
        final Observer<? super T> observer;
    
        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }
    
        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }
    
        @Override
        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }
    
        @Override
        public boolean tryOnError(Throwable t) {
            if (t == null) {
                t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            }
            if (!isDisposed()) {
                try {
                    observer.onError(t);
                } finally {
                    dispose();
                }
                return true;
            }
            return false;
        }
    
        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }
    }
    

    上面是基本版的,下面来一个简便快捷版的

    @Test
    public void test02(){
        Observable.just("呵呵").subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
    
            }
    
            @Override
            public void onNext(String s) {
                log.info(s);
            }
    
            @Override
            public void onError(Throwable e) {
    
            }
    
            @Override
            public void onComplete() {
    
            }
        });
    }
    

    输出结果

    呵呵
    
    Observable#just

    本质上并没有变化,只不过ObservableCreate换成了ObservableJust

    public static <T> Observable<T> just(T item) {
        ObjectHelper.requireNonNull(item, "The item is null");
        return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
    }
    
    ObservableJust

    这个地方好像是有一些区别,之前是触发ObservableOnSubscribe的subscribe方法,然后调用CreateEmitter的相关方法,本质上是调用 Observer 的相关方法。不过这里
    并没有ObservableOnSubscribe相关概念,而是多了一个ScalarDisposable

    public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {
    
        private final T value;
        public ObservableJust(final T value) {
            this.value = value;
        }
    
        @Override
        protected void subscribeActual(Observer<? super T> observer) {
            ScalarDisposable<T> sd = new ScalarDisposable<T>(observer, value);
            // 先调用调用Observer的onSubscribe方法
            observer.onSubscribe(sd);
            sd.run();
        }
        ......
    }
    

    ScalarDisposable

    继承了AtomicInteger,run方法中调用了observer的onNext和onComplete方法

    public static final class ScalarDisposable<T> extends AtomicInteger implements QueueDisposable<T>, Runnable {
        static final int START = 0;
        static final int FUSED = 1;
        static final int ON_NEXT = 2;
        static final int ON_COMPLETE = 3;
    
        public ScalarDisposable(Observer<? super T> observer, T value) {
            this.observer = observer;
            this.value = value;
        }
    
        @Override
        public void run() {
            if (get() == START && compareAndSet(START, ON_NEXT)) {
                observer.onNext(value);
                if (get() == ON_NEXT) {
                    lazySet(ON_COMPLETE);
                    observer.onComplete();
                }
            }
        }
    }
    
    

    背压

    @Test
    public void test03() {
        Flowable.create(new FlowableOnSubscribe<String>() {
            @Override
            public void subscribe(FlowableEmitter<String> emitter) throws Exception {
                if (!emitter.isCancelled()) {
                    emitter.onNext("onNext 1");
                    emitter.onNext("onNext 2");
                    emitter.onNext("onNext 3");
                    emitter.onComplete();
                }
            }
        }, DROP).subscribe(new Subscriber<String>() {
            @Override
            public void onSubscribe(Subscription subscription) {
                subscription.request(2L);
                log.info("背压订阅");
            }
    
            @Override
            public void onNext(String s) {
                log.info(s);
            }
    
            @Override
            public void onError(Throwable e) {
            }
    
            @Override
            public void onComplete() {
            }
        });
    }
    

    输出结果

    背压订阅
    onNext 1
    onNext 2
    

    在subscribe方法里面调用了三次onNext方法,但是控制台只打印了两次,说明被限制了。注意onSubscribe方法中的subscription.request(2L)

    原理分析

    基本上和无背压版本的类似,不过这里的create方法传入了两个参数,一个是FlowableOnSubscribe,另一个是一个枚举类型,暂将它理解成背压策略

    Flowable#create
    public static <T> Flowable<T> create(FlowableOnSubscribe<T> source, BackpressureStrategy mode) {
        ObjectHelper.requireNonNull(source, "source is null");
        ObjectHelper.requireNonNull(mode, "mode is null");
        return RxJavaPlugins.onAssembly(new FlowableCreate<T>(source, mode));
    }
    
    FlowableCreate
    public final class FlowableCreate<T> extends Flowable<T> {
    
        final FlowableOnSubscribe<T> source;
    
        final BackpressureStrategy backpressure;
    
        public FlowableCreate(FlowableOnSubscribe<T> source, BackpressureStrategy backpressure) {
            this.source = source;
            this.backpressure = backpressure;
        }
    
        @Override
        public void subscribeActual(Subscriber<? super T> t) {
            BaseEmitter<T> emitter;
    
            switch (backpressure) {
                case MISSING: {
                    emitter = new MissingEmitter<T>(t);
                    break;
                }
                case ERROR: {
                    emitter = new ErrorAsyncEmitter<T>(t);
                    break;
                }
                case DROP: {
                    emitter = new DropAsyncEmitter<T>(t);
                    break;
                }
                case LATEST: {
                    emitter = new LatestAsyncEmitter<T>(t);
                    break;
                }
                default: {
                    emitter = new BufferAsyncEmitter<T>(t, bufferSize());
                    break;
                }
            }
    
            t.onSubscribe(emitter);
            try {
                source.subscribe(emitter);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                emitter.onError(ex);
            }
        }
    }
    

    根据传进来的参数,这里会使用的DropAsyncEmitter

    DropAsyncEmitter

    继承关系如下


    t.onSubscribe(emitter)也就是匿名内部类的onSubscribe方法

    public void onSubscribe(Subscription subscription) {
       subscription.request(2L);
       log.info("背压订阅");
    }
    

    然后调用BaseEmitter的request方法,BaseEmitter实现了Subscription接口

    BaseEmitter#request
    @Override
    public final void request(long n) {
        // 校验n是否大于0,大于0返回true,小于0返回false
        if (SubscriptionHelper.validate(n)) {
            // 设置 Emitter的值 = Emitter的值 + n
            BackpressureHelper.add(this, n);
            // 空实现
            onRequested();
        }
    }
    

    接下来调用FlowableOnSubscribe的subscrib方法,FlowableOnSubscribe中的subscribe即匿名内部类中的subscribe方法会,
    先调用调用BaseEmitter中的相关方法,BaseEmitter会根据value值选择是否调用Subscriber的相关方法(onNext、onComplete、onError)

    操作符原理

    操作符的核心原理就是包一层,类似于代理,这里以map为例

    @Test
    public void test04() {
        Flowable.create(emitter -> emitter.onNext("onNext 1"), DROP)
                .map(v -> v + " MAP")
                .subscribe(System.out::println);
    }
    
    Flowable#map

    这里返回的是FlowableMap, 创建FlowableMap时以 当前Flowable实例 和 map操作符对应的逻辑函数Function 为参数。即 FlowableMap 持有上一层的Flowable实例

    public final <R> Flowable<R> map(Function<? super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new FlowableMap<T, R>(this, mapper));
    }
    
    FlowableMap
    public final class FlowableMap<T, U> extends AbstractFlowableWithUpstream<T, U> {
        final Function<? super T, ? extends U> mapper;
        public FlowableMap(Flowable<T> source, Function<? super T, ? extends U> mapper) {
            super(source);
            this.mapper = mapper;
        }
    
        @Override
        protected void subscribeActual(Subscriber<? super U> s) {
            if (s instanceof ConditionalSubscriber) {
                source.subscribe(new MapConditionalSubscriber<T, U>((ConditionalSubscriber<? super U>)s, mapper));
            } else {
                source.subscribe(new MapSubscriber<T, U>(s, mapper));
            }
        }
    
        static final class MapSubscriber<T, U> extends BasicFuseableSubscriber<T, U> {
            final Function<? super T, ? extends U> mapper;
    
            MapSubscriber(Subscriber<? super U> actual, Function<? super T, ? extends U> mapper) {
                super(actual);
                this.mapper = mapper;
            }
    
            @Override
            public void onNext(T t) {
                if (done) {
                    return;
                }
    
                if (sourceMode != NONE) {
                    downstream.onNext(null);
                    return;
                }
                U v;
                try {
                    v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
                } catch (Throwable ex) {
                    fail(ex);
                    return;
                }
    
                // 调用上一层的 onNext 方法, 这里指的是 匿名函数 Subscriber 的onNext方法,如果连接了多个操作符,就是指向上一个操作符的onNext方法
                downstream.onNext(v);
            }
    
            @Override
            public int requestFusion(int mode) {
                return transitiveBoundaryFusion(mode);
            }
    
            @Nullable
            @Override
            public U poll() throws Exception {
                T t = qs.poll();
                return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
            }
        }
    }
    

    总结一下

    Flowable.create  =>     传入 FlowableOnSubscribe, 返回 FlowableCreate
    .map =>       以create返回的FlowableCreate为参数,构建一个FlowableMap并返回
    .subscribe =>     以原生的Subscriber作为参数调用Flowable的subscribe方法,然后再对原生的Subscriber做一层包装作为参数,调用FlowableMap的subscribeActual,
    然后再调用FlowableCreate的subscribe方法(即lowable的subscribe方法),然后再以上一层包装的Subscriber作为参数调用FlowableCreate的subscribeActual方法,
    更加背压策略,以包装的Subscriber的作为参数创建BaseEmitter对象,调用包装的Subscriber的onSubscribe方法。以BaseEmitter为参数调用FlowableOnSubscribe的subscribe
    方法,即调用BaseEmitter的相关方法(onNext......),但这个其实本质上还是调用包装的Subscriber的相关方法(onNext......)。在包装的Subscriber内部,执行map中的相关
    逻辑修改值,然后再以新值作为参数,调用原生的Subscriber的相关方法。
    
    也就是说,有多个个操作符,就会包装多少层
    

    线程切换

    subscribeOn

    指定在哪个线程上发射数据

    @Test
    public void test05() throws InterruptedException {
        Flowable.create(emitter -> {
            log.info("发射数据的线程 => {}", Thread.currentThread().getName());
            emitter.onNext("DD");
        }, BUFFER)
    
                // 指定在哪个线程上发射数据
                .subscribeOn(Schedulers.io())
                // 指定接收数据后在哪个线程上执行
                .observeOn(Schedulers.newThread())
    
                .subscribe(new Subscriber<Object>() {
                               @Override
                               public void onSubscribe(Subscription subscription) {
                                   log.info("onSubscribe  Thread => {}", Thread.currentThread().getName());
                               }
    
                               @Override
                               public void onNext(Object s) {
                                   log.info("onNext  Data => {},  Thread => {}", s, Thread.currentThread().getName());
                               }
    
                               @Override
                               public void onError(Throwable throwable) {
                                   log.info("onError  Thread => {}", Thread.currentThread().getName());
                               }
    
                               @Override
                               public void onComplete() {
                                   log.info("onComplete  Thread => {}", Thread.currentThread().getName());
                               }
                           }
                );
    
        Thread.sleep(3000);
    }
    
    1. subscribeOn方法返回一个 FlowableSubscribeOn 对象,创建 FlowableSubscribeOn 的时候以 FlowableCreatescheduler 作为入参;

    2. FlowableSubscribeOn.subscribe 方法内部,先创建一个 Scheduler.Worker 对象,也就是实际的线程调度者,这里的Scheduler为IoScheduler,所以对应的Worker为 EventLoopWorker;然后将入参 Subscriber 封装成一个 SubscribeOnSubscriber 对象;然后在主线程上调用 Subscriber.onSubscribe 方法;最后以 SubscribeOnSubscriber 为入参,调用 Worker.schedule 方法;

    3. EventLoopWorker.schedule 方法内部调用 NewThreadWorker.scheduleActual 方法;

    4. NewThreadWorker.scheduleActual 方法内部将入参 SubscribeOnSubscriber 转换成 ScheduledRunnable ,然后以 ScheduledRunnable 为入参,调用 ScheduledExecutorService.submit 方法;
      最终 ScheduledExecutorService.submit 方法调用顺序如下

    ScheduledRunnable.call => 
    ScheduledRunnable.run =>
    SubscribeOnSubscriber.run =>
    Flowable.subscribe 这里指 FlowableCreate
    

    observeOn

    指定接收数据后在哪个线程上执行

    1. observeOn方法返回一个 FlowableObserveOn 对象,创建 FlowableObserveOn 的时候以 FlowableCreatescheduler 作为入参;

    2. FlowableObserveOn.subscribeActual 方法内部,先创建一个 Scheduler.Worker 对象,也就是实际的线程调度者,这里的Scheduler为NewThreadScheduler,所以对应的Worker为 NewThreadWorker;然后将入参 Subscriber 封装成一个 ObserveOnSubscriber 对象;

    3. Subscriber.onSubscribe 方法在主线程上执行;

    4. 在调用 Subscriber.onNext 等相关方法的时候,通过 worker 进行调度;

    相关文章

      网友评论

          本文标题:RxJava

          本文链接:https://www.haomeiwen.com/subject/buszdqtx.html