美文网首页
RxJava入门

RxJava入门

作者: 莫库施勒 | 来源:发表于2019-02-21 18:26 被阅读0次

首先是 RxJava 地址
接着我们仿照 拆 RxJava 我们来拆一下 Rx 2.0

我们同样使用这个例子

Observable.just("Hello world")
        .map(String::length)
        .subscribeOn(Schedulers.computation())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(len -> {
            System.out.println("got " + len + " @ " 
                    + Thread.currentThread().getName());
        });

just

    @CheckReturnValue
    @NonNull
    @SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Observable<T> just(T item) {
        ObjectHelper.requireNonNull(item, "item is null");
        return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
    }
public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        ScalarDisposable<T> sd = new ScalarDisposable<T>(observer, value);
        observer.onSubscribe(sd);
        sd.run();
    }
}

subscribeOn

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    @Override
    public void subscribeActual(final Observer<? super T> observer) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
        observer.onSubscribe(parent);
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            source.subscribe(parent);
        }
    }

    static final class SubscribeOnObserver<T> 
                 extends AtomicReference<Disposable> 
                 implements Observer<T>, Disposable {
            @Override
            public void onNext(T t) {
                downstream.onNext(t);
            }
    }
}

map

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    }
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }

    static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;
        @Override
        public void onNext(T t) {
            ...
            try {
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            downstream.onNext(v);
        }
    }
}

observeOn

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }

    static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
    implements Observer<T>, Runnable {
        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            schedule();
        }
        void schedule() {
            if (getAndIncrement() == 0) {
                worker.schedule(this);
            }
        }
        @Override
        public void run() {
            if (outputFused) {
                drainFused();
            } else {
                drainNormal();
            }
        }

        void drainNormal() {
            for (;;) {
                ...
                for (;;) {
                    ...
                    try {
                        v = q.poll();
                    } catch (Throwable ex) {
                        Exceptions.throwIfFatal(ex);
                        disposed = true;
                        upstream.dispose();
                        q.clear();
                        a.onError(ex);
                        worker.dispose();
                        return;
                    }
                    ...
                    a.onNext(v);
                }

                missed = addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            }
        }
    }
}

我们从代码可以看到,subscribeOn是在 subscribeActual 执行,影响其上游, observerOn只是在 subscribeActual 中创建了一个 worker 作为参数传递给了 Observer,真正调用是在 Observer 的 onNext 中,影响其下游,更直观的我们可以看下图

just_all.jpg
我们看到,subscribeOn 会一直影响上游的 subscribeActualonNext,如果没有其他线程,会一直执行在当前 subscribeOn 线程中。
至于map,会在相应的 Observer 的 onNext 中执行 mapper.apply(t)

Just 调用过程

Just

Just-Map 调用过程

Just-Map

Just-Map-SubscribeOn调用过程

Just-Map-SubscribeOn

Backpressure

在 RxJava 1.x 中,数据都是从 observable push 到 subscriber 的,但要是 observable 发得太快,subscriber 处理不过来,该怎么办?一种办法是,把数据保存起来,但这显然可能导致内存耗尽;另一种办法是,多余的数据来了之后就丢掉,至于丢掉和保留的策略可以按需制定;还有一种办法就是让 subscriber 向 observable 主动请求数据,subscriber 不请求,observable 就不发出数据。它俩相互协调,避免出现过多的数据,而协调的桥梁,就是 producer。

相关文章

网友评论

      本文标题:RxJava入门

      本文链接:https://www.haomeiwen.com/subject/hypryqtx.html