美文网首页
Android进阶 Rxjava

Android进阶 Rxjava

作者: 月止风溟 | 来源:发表于2018-08-11 16:12 被阅读0次

Android进阶 Rxjava

1.Rxjava的使用

在壳工程的build.gradle里面加上

dependencies {
    ...
    implementation 'io.reactivex.rxjava2:rxjava:2.1.17'

    //因为是Android工程,加上下面这个
    implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'
}

然后在MainActivityonCreate()里写上

    Flowable.create(new FlowableOnSubscribe<String>() {
            @Override
            public void subscribe(FlowableEmitter<String> emitter) throws Exception {
                emitter.onNext("one");
                emitter.onComplete();
            }
        },BackpressureStrategy.BUFFER)
                .subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    @Override public void accept(String s) {
                        Log.e("dragon",s);
                    }
                });

我们一步步去看它怎么运行起来的,先看Create

    //Flowable.class
    @CheckReturnValue
    @BackpressureSupport(BackpressureKind.SPECIAL)
    @SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Flowable<T> create(FlowableOnSubscribe<T> source, BackpressureStrategy mode) {
        ObjectHelper.requireNonNull(source, "source is null");
        ObjectHelper.requireNonNull(mode, "mode is null");
        return RxJavaPlugins.onAssembly(new FlowableCreate<T>(source, mode));
    }

重点看到这一句 RxJavaPlugins.onAssembly(new FlowableCreate<T>(source, mode));
它将Flowable和module封装成了FlowableCreate,再将这个对象直接返回了。
onCreate()结束后,得到了一个FlowableCreate对象。

再看subscribeOn。

    //Flowable.class
    @CheckReturnValue
    @BackpressureSupport(BackpressureKind.PASS_THROUGH)
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Flowable<T> subscribeOn(@NonNull Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return subscribeOn(scheduler, !(this instanceof FlowableCreate));
    }

    @CheckReturnValue
    @BackpressureSupport(BackpressureKind.PASS_THROUGH)
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    @Experimental
    public final Flowable<T> subscribeOn(@NonNull Scheduler scheduler, boolean requestOn) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new FlowableSubscribeOn<T>(this, scheduler, requestOn));
    }

将FlowableCreate对象转换成了FlowableSubscribeOn对象。

再看observeOn。

    //Flowable.class
    @CheckReturnValue
    @BackpressureSupport(BackpressureKind.FULL)
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Flowable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }


    @CheckReturnValue
    @BackpressureSupport(BackpressureKind.FULL)
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Flowable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new FlowableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }

将FlowableSubscribeOn对象转换成了FlowableObserveOn对象。

然后来到subscribe,感觉好戏要开始了。

    //Flowable.class
    @CheckReturnValue
    @BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
    @SchedulerSupport(SchedulerSupport.NONE)
    public final Disposable subscribe(Consumer<? super T> onNext) {
        return subscribe(onNext, Functions.ON_ERROR_MISSING,
                Functions.EMPTY_ACTION, FlowableInternalHelper.RequestMax.INSTANCE);
    }


     @CheckReturnValue
    @BackpressureSupport(BackpressureKind.SPECIAL)
    @SchedulerSupport(SchedulerSupport.NONE)
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
            Action onComplete, Consumer<? super Subscription> onSubscribe) {
        ObjectHelper.requireNonNull(onNext, "onNext is null");
        ObjectHelper.requireNonNull(onError, "onError is null");
        ObjectHelper.requireNonNull(onComplete, "onComplete is null");
        ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");

        LambdaSubscriber<T> ls = new LambdaSubscriber<T>(onNext, onError, onComplete, onSubscribe);

        subscribe(ls);

        return ls;
    }

从上面可以看到,Consumer被转换成LambdaSubscriber对象,我们看看subscribe(ls)会做什么动作。`

    //Flowable.class
    @BackpressureSupport(BackpressureKind.SPECIAL)
    @SchedulerSupport(SchedulerSupport.NONE)
    @Beta
    public final void subscribe(FlowableSubscriber<? super T> s) {
        ObjectHelper.requireNonNull(s, "s is null");
        try {
            Subscriber<? super T> z = RxJavaPlugins.onSubscribe(this, s);

            ObjectHelper.requireNonNull(z, "Plugin returned null Subscriber");

            subscribeActual(z);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Subscription has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

不着急,先看try方法块里面的方法

    //Flowable.class
    @SuppressWarnings({ "rawtypes", "unchecked" })
    @NonNull
    public static <T> Subscriber<? super T> onSubscribe(@NonNull Flowable<T> source, @NonNull Subscriber<? super T> subscriber) {
        BiFunction<? super Flowable, ? super Subscriber, ? extends Subscriber> f = onFlowableSubscribe;
        if (f != null) {
            return apply(f, source, subscriber);
        }
        return subscriber;
    }

奇怪了,在这个示例中,这个方法什么都没做。
那么关键的方案应该就是subscribeActual(z)了。我们现在已经是FlowableObserveOn了,我们去这个里面看看它做些什么。

    //FlowableObserveOn.class
    @Override
    public void subscribeActual(Subscriber<? super T> s) {
        Worker worker = scheduler.createWorker();

        if (s instanceof ConditionalSubscriber) {
            source.subscribe(new ObserveOnConditionalSubscriber<T>(
                    (ConditionalSubscriber<? super T>) s, worker, delayError, prefetch));
        } else {
            source.subscribe(new ObserveOnSubscriber<T>(s, worker, delayError, prefetch));
        }
    }

我们还是一行行地看,这个scheduler是什么?在将FlowableSubscribeOn对象转换成FlowableObserveOn对象的时候,传入了AndroidSchedulers.mainThread()。我们可以看看这个最常用的AndroidSchedulers.mainThread()
(一)

public final class AndroidSchedulers {

    private static final class MainHolder {

        static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
    }

    private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
            new Callable<Scheduler>() {
                @Override public Scheduler call() throws Exception {
                    return MainHolder.DEFAULT;
                }
            });

    /** A {@link Scheduler} which executes actions on the Android main thread. */
    public static Scheduler mainThread() {
        return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
    }

    /** A {@link Scheduler} which executes actions on {@code looper}. */
    public static Scheduler from(Looper looper) {
        if (looper == null) throw new NullPointerException("looper == null");
        return new HandlerScheduler(new Handler(looper));
    }

    private AndroidSchedulers() {
        throw new AssertionError("No instances.");
    }
}  

mainThread()会返回的是MainHolder.DEFAULT

那我们去看看这个MainHolder.DEFAULT的createWorker()方法。

    //HandlerScheduler.class
    @Override
    public Worker createWorker() {
        return new HandlerWorker(handler);
    }

从HandlerWorker来看,它是继承了Worker的一个类,重载了@Override schedule(..)把传入的Runnable用Handler发射了出去。
(二)
接下来,程序跑到了
source.subscribe(new ObserveOnSubscriber<T>(s, worker, delayError, prefetch))。

  • s,我们的LambdaSubscriber对象
  • worker,HandlerScheduler
  • delayError,false
  • prefetch,128

我们知道,source是我们刚传进来的FlowableSubscribeOn对象,那我们去看看它接下来的subscribeActual(..)方法。
这个时候,执行了FlowableSubscribeOn.subscribeActual((上面得到的)ObserveOnSubscriber)。

先看方法内容

    //FlowableSubscribeOn.class
    @Override
    public void subscribeActual(final Subscriber<? super T> s) {
        Scheduler.Worker w = scheduler.createWorker();
        final SubscribeOnSubscriber<T> sos = new SubscribeOnSubscriber<T>(s, w, source, nonScheduledRequests);
        s.onSubscribe(sos);

        w.schedule(sos);
    }

这个Worker是我们之前传入的Schedulers.newThread()得到的,那我们看看这个是什么。

    /**
     *Schedulers.class 
     */
    @NonNull
    public static Scheduler newThread() {
        //老规矩,返回的就是NEW_THREAD
        return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD);
    }

    @NonNull
    static final Scheduler NEW_THREAD;

    static {
        ...

        NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
    }

    static final class NewThreadTask implements Callable<Scheduler> {
        @Override
        public Scheduler call() throws Exception {
            return NewThreadHolder.DEFAULT;
        }
    }

    static final class NewThreadHolder {
        static final Scheduler DEFAULT = new NewThreadScheduler();
    }

    @NonNull
    public static Scheduler initNewThreadScheduler(@NonNull Callable<Scheduler> defaultScheduler) {
        ObjectHelper.requireNonNull(defaultScheduler, "Scheduler Callable can't be null");
        Function<? super Callable<Scheduler>, ? extends Scheduler> f = onInitNewThreadHandler;
        if (f == null) {
            return callRequireNonNull(defaultScheduler);
        }
        return applyRequireNonNull(f, defaultScheduler);
    }

    @NonNull
    static Scheduler callRequireNonNull(@NonNull Callable<Scheduler> s) {
        try {
            return ObjectHelper.requireNonNull(s.call(), "Scheduler Callable result can't be null");
        } catch (Throwable ex) {
            throw ExceptionHelper.wrapOrThrow(ex);
        }
    }

从上面的跟踪可以看出,NEW_THREAD就是NewThreadHolder.DEFAULT。
仔细看下面这个NewThreadScheduler的内容。

public final class NewThreadScheduler extends Scheduler {

    final ThreadFactory threadFactory;

    private static final String THREAD_NAME_PREFIX = "RxNewThreadScheduler";
    private static final RxThreadFactory THREAD_FACTORY;

    /** The name of the system property for setting the thread priority for this Scheduler. */
    private static final String KEY_NEWTHREAD_PRIORITY = "rx2.newthread-priority";

    static {
        int priority = Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY,
                Integer.getInteger(KEY_NEWTHREAD_PRIORITY, Thread.NORM_PRIORITY)));

        THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX, priority);
    }

    public NewThreadScheduler() {
        this(THREAD_FACTORY);
    }

    public NewThreadScheduler(ThreadFactory threadFactory) {
        this.threadFactory = threadFactory;
    }

    @NonNull
    @Override
    public Worker createWorker() {
        return new NewThreadWorker(threadFactory);
    }
}

NewTreadWorker传入的是一个创建线程为新守护线程,优先级为normal的线程工厂。
拿到NewThreadWorker对象后,看之后这行
(一)final SubscribeOnSubscriber<T> sos = new SubscribeOnSubscriber<T>(s, w, source, nonScheduledRequests);

  • s,上面得到的ObserveOnSubscriber
  • w,NewThreadWorker
  • source,之前生成的得到了一个FlowableCreate对象对象
  • nonScheduledRequests,false

接下来 s.onSubscribe(sos)
也就是上面得到的ObserveOnSubscriber.onSubscribe(SubscribeOnSubscriber)

  1. 跑到了LambdaSubscriber.onSubscribe(ObserveOnSubscriber),作用是
    • Atomically sets the subscription on the field if it is still null.
    • <p>If the field is not null and doesn't contain the {@link #CANCELLED}
    • instance, the {@link #reportSubscriptionSet()} is called.
    • 没太懂。
  2. s.request(prefetch);也就是SubscribeOnSubscriber.request(128).
    这里也不太懂。

(二)w.schedule(sos);也就是NewThreadWorker.schedule(SubscribeOnSubscriber)。
SubscribeOnSubscriber被ScheduledExecutorService给提交运行。

        @Override
        public void run() {
            lazySet(Thread.currentThread());
            //之前生成的得到了一个FlowableCreate对象对象
            Publisher<T> src = source;
            source = null;
            src.subscribe(this);
        }

看到src.subscribe(this),也就是FlowableCreate.subscribe(SubscribeOnSubscriber)

@Override
    public void subscribeActual(Subscriber<? super T> t) {
        BaseEmitter<T> emitter;

        switch (backpressure) {
        ...
        default: {
            //将SubscribeOnSubscriber作为actual传给了BufferAsyncEmitter
            //bufferSize() 128
            emitter = new BufferAsyncEmitter<T>(t, bufferSize());
            break;
        }
        }

        //SubscribeOnSubscriber.onSubscribe(BufferAsyncEmitter)
        t.onSubscribe(emitter);
        try {
            source.subscribe(emitter);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            emitter.onError(ex);
        }
    }

(一)先看 t.onSubscribe(emitter);

   //SubscribeOnSubscriber.class
        @Override
        public void onSubscribe(Subscription s) {
            if (SubscriptionHelper.setOnce(this.s, s)) {
                long r = requested.getAndSet(0L);
                if (r != 0L) {
                    requestUpstream(r, s);
                }
            }
        }

        void requestUpstream(final long n, final Subscription s) {
            if (nonScheduledRequests || Thread.currentThread() == get()) {
                s.request(n);
            } else {
                worker.schedule(new Request(s, n));
            }
        }

   //SubscribeOnSubscriber.class
        @Override
        public void onSubscribe(Subscription s) {
            if (SubscriptionHelper.setOnce(this.s, s)) {
                long r = requested.getAndSet(0L);
                if (r != 0L) {
                    requestUpstream(r, s);
                }
            }
        }

        void requestUpstream(final long n, final Subscription s) {
            if (nonScheduledRequests || Thread.currentThread() == get()) {
                s.request(n);
            } else {
                worker.schedule(new Request(s, n));
            }
        }

        //BufferAsyncEmitter.class
        @Override
        public final void request(long n) {
            if (SubscriptionHelper.validate(n)) {
                BackpressureHelper.add(this, n);
                onRequested();
            }
        }

        @Override
        void onRequested() {
            drain();
        }

        //感觉什么都没做
        void drain() {
            if (wip.getAndIncrement() != 0) {
                return;
            }

            int missed = 1;
            final Subscriber<? super T> a = actual;
            final SpscLinkedArrayQueue<T> q = queue;

            for (;;) {
                long r = get();
                long e = 0L;

                while (e != r) {
                    if (isCancelled()) {
                        q.clear();
                        return;
                    }

                    boolean d = done;

                    T o = q.poll();

                    boolean empty = o == null;

                    if (d && empty) {
                        Throwable ex = error;
                        if (ex != null) {
                            error(ex);
                        } else {
                            complete();
                        }
                        return;
                    }

                    if (empty) {
                        break;
                    }

                    a.onNext(o);

                    e++;
                }

                if (e == r) {
                    if (isCancelled()) {
                        q.clear();
                        return;
                    }

                    boolean d = done;

                    boolean empty = q.isEmpty();

                    if (d && empty) {
                        Throwable ex = error;
                        if (ex != null) {
                            error(ex);
                        } else {
                            complete();
                        }
                        return;
                    }
                }

                if (e != 0) {
                    BackpressureHelper.produced(this, e);
                }

                missed = wip.addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            }
        }

跑了一圈,感觉什么都没做啊
(二) 再看source.subscribe(emitter);
这个source很容易理解,我们一开始传进来的发射器。
刺激的事情来了。我们在发射器里发射的数据,被放进了队列。

        @Override
        public void onNext(T t) {
            ...
            queue.offer(t);
            drain();
        }

        //感觉什么都没做
        void drain() {
            ...

            for (;;) {
                ...

                while (e != r) {
                    ...

                    a.onNext(o);

                    ...
                }

                ...
            }
        }

好,我们发现调用了SubscribeOnSubscriber.onNext("one");

        //SubscribeOnSubscriber.class
        @Override
        public void onNext(T t) {
            actual.onNext(t);
        }

跑到了

   //ObserveOnSubscriber.class
        @Override
        public final void onNext(T t) {
            ...
            trySchedule();
        }

        final void trySchedule() {
            ...
            worker.schedule(this);
        }

得了。这个worker是什么?HandlerScheduler!所以它把本身作为Runable抛给了主Looper。

   //ObserveOnSubscriber.class
        @Override
        public final void run() {
            ...
                runAsync();
            ...
        }

        @Override
        void runAsync() {
            int missed = 1;

            //我们的LambdaSubscriber对象
            final Subscriber<? super T> a = actual;
            ...

                    a.onNext(v);
            ...
        }

LambdaSubscriber的actual对象就是我们传入的Consumer。

相关文章

网友评论

      本文标题:Android进阶 Rxjava

      本文链接:https://www.haomeiwen.com/subject/evudbftx.html