美文网首页
RxJava源码分析(三) ----- observeOn

RxJava源码分析(三) ----- observeOn

作者: Simon_z | 来源:发表于2018-02-11 17:38 被阅读11次

    observeOn

        Observable.create((ObservableOnSubscribe<Integer>) e -> {
            System.out.println("observable : " + Thread.currentThread());
            e.onNext(1);
        })
                .subscribeOn(Schedulers.io())
                .observeOn(Schedulers.single())
                .subscribe(integer -> {
                    System.out.println(integer);
                    System.out.println("observer:  " + Thread.currentThread());
                });
    

    通过observeOn可以指定接收事件,所在的线程

    源码分析目的:

    1. 生产和消费事件,是如何在不同线程运行的
    2. 事件是如何从生产者 传递给消费者的

    1. Observable.observeoOn

    public final Observable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }
    
    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }
    
    1. 获取bufferSize(); 即保存事件的buffer的大小, 默认通过Flowable.bufferSize()获取, 默认值是128; 可以通过系统属性rx2.buffer-size修改大小;
    2. 创建ObservableObserveOn;
    3. 对ObservableObserveOn的hook, 默认是没做任何hook处理

    2. ObservableObserveOn

    ObservableObserveOn也是一个Observable, 内部也持有一个Observer, 仍然是一个装饰模式;

    2.1 ObservableObserveOn.subscribeActual

    Observable的sunscribe方法, 最后执行的都是subscribeActual方法, 直接看ObservableObserveOn.subscribeActual的代码;

        @Override
        protected void subscribeActual(Observer<? super T> observer) {
            // 1. TrampolineScheduler 是在当前线程执行的, 直接执行subscribe方法
            if (scheduler instanceof TrampolineScheduler) {
                source.subscribe(observer);
            } else {
                // 1. 根据Schduler创建Worker
                Scheduler.Worker w = scheduler.createWorker();
                // 2. 创建 ObserveOnObserver, 执行被装饰Observable的subscribe
                source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
            }
        }
    
    1. 由schduler创建对应的Worker
    2. 创建ObserveOnObserver
    3. 执行当前Observable持有的Observable的subscribe方法; (此代码中, 执行的是ObservableSubscribeOn.subscribe)
    4. source.subscribe的逻辑, 同第一篇一样, 最后是在当前线程, 直接调用的ObserveOnObserver.onNext方法

    3. ObserveOnObserver

    ObservableObserveOn也是Observer的子类, 同时包含一个Observer, 仍然是装饰模式;

    相关文章

      网友评论

          本文标题:RxJava源码分析(三) ----- observeOn

          本文链接:https://www.haomeiwen.com/subject/qpcltftx.html