美文网首页
RxJava详解之线程调度原理(六)

RxJava详解之线程调度原理(六)

作者: CharonChui | 来源:发表于2018-08-01 10:45 被阅读0次

    RxJava详解之线程调度原理(六)

    Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            subscriber.onNext("Hello");
            subscriber.onCompleted();
            Log.i("@@@", "call" + Thread.currentThread().getName());
        }
    }).subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Subscriber<String>() {
                @Override
                public void onCompleted() {
    
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onNext(String s) {
                    Log.i("@@@", "onNext : " + Thread.currentThread().getName());
                }
            });
    

    执行结果:

    07-26 17:16:49.284 7266-7309/? I/@@@: callRxIoScheduler-2
    07-26 17:16:49.368 7266-7266/? I/@@@: onNext : main
    

    subscribeOn()是指定被观察者事件源的执行线程。
    observeOn()是指定观察者的处理时间的线程。

    subscribeOn源码分析:

    public final Observable<T> subscribeOn(Scheduler scheduler) {
        if (this instanceof ScalarSynchronousObservable) {
            return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
        }
        return create(new OperatorSubscribeOn<T>(this, scheduler));
    }
    
    /**
     * A {@code Scheduler} is an object that schedules units of work. You can find common implementations of this
     * class in {@link Schedulers}.
     * 线程调度器
     */
    public abstract class Scheduler {
        ...
    }
    

    它内部也是通过create()创建一个Observable但是唯一不同的是OnSubscribe传递的是OperatorSubscribeOn对象:

    /**
     * Subscribes Observers on the specified {@code Scheduler}.
     */
    public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {
    
        final Scheduler scheduler;
        final Observable<T> source;
    
        public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
            this.scheduler = scheduler;
            this.source = source;
        }
    
        @Override
        public void call(final Subscriber<? super T> subscriber) {
            final Worker inner = scheduler.createWorker();
            subscriber.add(inner);
    
            inner.schedule(new Action0() {
                @Override
                public void call() {
                    final Thread t = Thread.currentThread();
    
                    Subscriber<T> s = new Subscriber<T>(subscriber) {
                        @Override
                        public void onNext(T t) {
                            subscriber.onNext(t);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            try {
                                subscriber.onError(e);
                            } finally {
                                inner.unsubscribe();
                            }
                        }
    
                        @Override
                        public void onCompleted() {
                            try {
                                subscriber.onCompleted();
                            } finally {
                                inner.unsubscribe();
                            }
                        }
    
                        @Override
                        public void setProducer(final Producer p) {
                            subscriber.setProducer(new Producer() {
                                @Override
                                public void request(final long n) {
                                    if (t == Thread.currentThread()) {
                                        p.request(n);
                                    } else {
                                        inner.schedule(new Action0() {
                                            @Override
                                            public void call() {
                                                p.request(n);
                                            }
                                        });
                                    }
                                }
                            });
                        }
                    };
    
                    source.unsafeSubscribe(s);
                }
            });
        }
    }
    

    它内部原理其实和map是一样一样的。我们就从call()方法说起,首先看一下SubscriberWorker类:

    /**
     * A {@code Scheduler} is an object that schedules units of work. You can find common implementations of this
     * class in {@link Schedulers}.
     */
    public abstract class Scheduler {
        /**
         * Retrieves or creates a new {@link Scheduler.Worker} that represents serial execution of actions.
         * <p>
         * When work is completed it should be unsubscribed using {@link Scheduler.Worker#unsubscribe()}.
         * <p>
         * Work on a {@link Scheduler.Worker} is guaranteed to be sequential.
         *
         * @return a Worker representing a serial queue of actions to be executed
         */
        public abstract Worker createWorker();
    
        /**
         * Sequential Scheduler for executing actions on a single thread or event loop.
         * <p>
         * Unsubscribing the {@link Worker} cancels all outstanding work and allows resources cleanup.
         */
        public abstract static class Worker implements Subscription {
    
            /**
             * Schedules an Action for execution.
             */
            public abstract Subscription schedule(Action0 action);
    
            /**
             * Schedules an Action for execution at some point in the future.
             */
            public abstract Subscription schedule(final Action0 action, final long delayTime, final TimeUnit unit);
        }
    }    
    

    而上面的schedulers是通过Schedulers.io()创建的,这里看一下它的源码:

    public final class Schedulers {
    
        private final Scheduler computationScheduler;
        private final Scheduler ioScheduler;
        private final Scheduler newThreadScheduler;
    
        private static final AtomicReference<Schedulers> INSTANCE = new AtomicReference<Schedulers>();
    
        private static Schedulers getInstance() {
            for (;;) {
                Schedulers current = INSTANCE.get();
                if (current != null) {
                    return current;
                }
                current = new Schedulers();
                if (INSTANCE.compareAndSet(null, current)) {
                    return current;
                } else {
                    current.shutdownInstance();
                }
            }
        }
    
        public static Scheduler io() {
            return RxJavaHooks.onIOScheduler(getInstance().ioScheduler);
        }
    
        private Schedulers() {
            @SuppressWarnings("deprecation")
            RxJavaSchedulersHook hook = RxJavaPlugins.getInstance().getSchedulersHook();
    
            Scheduler c = hook.getComputationScheduler();
            if (c != null) {
                computationScheduler = c;
            } else {
                computationScheduler = RxJavaSchedulersHook.createComputationScheduler();
            }
    
            // 下面几时创建ioScheduler的地方
            Scheduler io = hook.getIOScheduler();
            if (io != null) {
                ioScheduler = io;
            } else {
                ioScheduler = RxJavaSchedulersHook.createIoScheduler();
            }
    
            Scheduler nt = hook.getNewThreadScheduler();
            if (nt != null) {
                newThreadScheduler = nt;
            } else {
                newThreadScheduler = RxJavaSchedulersHook.createNewThreadScheduler();
            }
        }
        ...
    }    
    

    继续onIOScheduler的源码:

    /**
     * 和create()方法类似,这里简单的理解为直接返回参数即可
     * Hook to call when the Schedulers.io() is called.
     * @param scheduler the default io scheduler
     * @return the default of alternative scheduler
     */
    public static Scheduler onIOScheduler(Scheduler scheduler) {
        Func1<Scheduler, Scheduler> f = onIOScheduler;
        if (f != null) {
            return f.call(scheduler);
        }
        return scheduler;
    }
    

    而上面getInstance().ioScheduler的地方最终会调用到RxJavaSchedulersHook.createIoScheduler():

    @Experimental
    public static Scheduler createIoScheduler() {
        return createIoScheduler(new RxThreadFactory("RxIoScheduler-"));
    }
    
    public static Scheduler createIoScheduler(ThreadFactory threadFactory) {
        if (threadFactory == null) {
            throw new NullPointerException("threadFactory == null");
        }
        // 看到了吗? 这里最终的Scheduler是CachedThreadScheduler
        return new CachedThreadScheduler(threadFactory);
    }
    

    而在CachedThreadScheduler类中:

    public final class CachedThreadScheduler extends Scheduler implements SchedulerLifecycle {
    
        @Override
        public Worker createWorker() {
            return new EventLoopWorker(pool.get());
        }
        ...
    }    
    

    所以我们通过Schedulers.io()的源码可以发现这里的具体实现类是CachedThreadScheduler。而对应的Worker的实现类是EventLoopWorker:

       static final class EventLoopWorker extends Scheduler.Worker implements Action0 {
            private final CompositeSubscription innerSubscription = new CompositeSubscription();
            private final CachedWorkerPool pool;
            private final ThreadWorker threadWorker;
            final AtomicBoolean once;
    
            EventLoopWorker(CachedWorkerPool pool) {
                this.pool = pool;
                this.once = new AtomicBoolean();
                this.threadWorker = pool.get();
            }
    
            @Override
            public void unsubscribe() {
                if (once.compareAndSet(false, true)) {
                    // unsubscribe should be idempotent, so only do this once
    
                    // Release the worker _after_ the previous action (if any) has completed
                    threadWorker.schedule(this);
                }
                innerSubscription.unsubscribe();
            }
    
            @Override
            public void call() {
                pool.release(threadWorker);
            }
    
            @Override
            public boolean isUnsubscribed() {
                return innerSubscription.isUnsubscribed();
            }
    
            @Override
            public Subscription schedule(Action0 action) {
                return schedule(action, 0, null);
            }
    
            @Override
            public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
                if (innerSubscription.isUnsubscribed()) {
                    // don't schedule, we are unsubscribed
                    return Subscriptions.unsubscribed();
                }
    
                ScheduledAction s = threadWorker.scheduleActual(new Action0() {
                    @Override
                    public void call() {
                        if (isUnsubscribed()) {
                            return;
                        }
                        // 内部会调用外面传递过来的Action对象的call方法
                        action.call();
                    }
                }, delayTime, unit);
                innerSubscription.add(s);
                s.addParent(innerSubscription);
                return s;
            }
    

    这里会继续执行到threadWorker.scheduleActual:

    public class NewThreadWorker extends Scheduler.Worker implements Subscription {
        private final ScheduledExecutorService executor;
    
        public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
            Action0 decoratedAction = RxJavaHooks.onScheduledAction(action);
            ScheduledAction run = new ScheduledAction(decoratedAction);
            Future<?> f;
            if (delayTime <= 0) {
                f = executor.submit(run);
            } else {
                f = executor.schedule(run, delayTime, unit);
            }
            run.add(f);
    
            return run;
        }
    ...
    }
    

    ScheduledAction的源码:

    /**
     * A {@code Runnable} that executes an {@code Action0} and can be cancelled. The analog is the
     * {@code Subscriber} in respect of an {@code Observer}.
     */
    public final class ScheduledAction extends AtomicReference<Thread> implements Runnable, Subscription {
        /** */
        private static final long serialVersionUID = -3962399486978279857L;
        final SubscriptionList cancel;
        final Action0 action;
    
        public ScheduledAction(Action0 action) {
            this.action = action;
            this.cancel = new SubscriptionList();
        }
        ...
    }    
    

    ScheduledAction实现了Runnable接口,通过线程池executor最终实现了线程切换。上面便是subscribeOn(Schedulers.io())实现线程切换的全部过程。

    observeOn源码分析

    直接上源码:

    public final Observable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, RxRingBuffer.SIZE);
    }
    
    public final Observable<T> observeOn(Scheduler scheduler, int bufferSize) {
        return observeOn(scheduler, false, bufferSize);
    }
    
    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        if (this instanceof ScalarSynchronousObservable) {
            return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
        }
        return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
    }
    

    observeOn内部是通过lift来实现的,看一下lift的源码:

    public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
        return create(new OnSubscribeLift<T, R>(onSubscribe, operator));
    }
    

    归根到底还是使用了create(),那就和之前分析的基本都差不多了,直接看OnSubscribeLift:

    /**
     * Transforms the downstream Subscriber into a Subscriber via an operator
     * callback and calls the parent OnSubscribe.call() method with it.
     */
    public final class OnSubscribeLift<T, R> implements OnSubscribe<R> {
    
        final OnSubscribe<T> parent;
    
        final Operator<? extends R, ? super T> operator;
    
        public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator) {
            this.parent = parent;
            this.operator = operator;
        }
    
        @Override
        public void call(Subscriber<? super R> o) {
            try {
                Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);
                try {
                    // new Subscriber created and being subscribed with so 'onStart' it
                    st.onStart();
                    parent.call(st);
                } catch (Throwable e) {
                    // localized capture of errors rather than it skipping all operators
                    // and ending up in the try/catch of the subscribe method which then
                    // prevents onErrorResumeNext and other similar approaches to error handling
                    Exceptions.throwIfFatal(e);
                    st.onError(e);
                }
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                // if the lift function failed all we can do is pass the error to the final Subscriber
                // as we don't have the operator available to us
                o.onError(e);
            }
        }
    }
    

    也实现了OnSubscribe,通过前面的分析我们知道一旦调用了subscribe()将观察者与被观察绑定后就会触发被观察者所对应的OnSubscribecall()方法,所以这里会触发OnSubscribeLift.call()。在call()中调用了OperatorObserveOn.call()并返回了一个新的观察者Subscriber st,接着调用了前一级Observable对应OnSubscriber.call(st)
    所以这里要看一下OperatorObserveOn类及它的call()方法:

    public final class OperatorObserveOn<T> implements Operator<T, T> {
        private final Scheduler scheduler;
        private final boolean delayError;
        private final int bufferSize;
    
        public OperatorObserveOn(Scheduler scheduler, boolean delayError) {
            this(scheduler, delayError, RxRingBuffer.SIZE);
        }
    
        public OperatorObserveOn(Scheduler scheduler, boolean delayError, int bufferSize) {
            this.scheduler = scheduler;
            this.delayError = delayError;
            this.bufferSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE;
        }
    
        @Override
        public Subscriber<? super T> call(Subscriber<? super T> child) {
            if (scheduler instanceof ImmediateScheduler) {
                // avoid overhead, execute directly
                return child;
            } else if (scheduler instanceof TrampolineScheduler) {
                // avoid overhead, execute directly
                return child;
            } else {
                ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
                parent.init();
                return parent;
            }
        }
    

    这里会走到创建ObserveOnSubscriber然后调用其init()方法:

    static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 {
        final Subscriber<? super T> child;
        final Scheduler.Worker recursiveScheduler;
        final boolean delayError;
        final Queue<Object> queue;
        /** The emission threshold that should trigger a replenishing request. */
        final int limit;
    
        // the status of the current stream
        volatile boolean finished;
    
        final AtomicLong requested = new AtomicLong();
    
        final AtomicLong counter = new AtomicLong();
    
        /**
         * The single exception if not null, should be written before setting finished (release) and read after
         * reading finished (acquire).
         */
        Throwable error;
    
        /** Remembers how many elements have been emitted before the requests run out. */
        long emitted;
    
        // do NOT pass the Subscriber through to couple the subscription chain ... unsubscribing on the parent should
        // not prevent anything downstream from consuming, which will happen if the Subscription is chained
        public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError, int bufferSize) {
            this.child = child;
            // 通过参数传递过来的Schedule创建对应的worker
            this.recursiveScheduler = scheduler.createWorker();
            this.delayError = delayError;
            int calculatedSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE;
            // this formula calculates the 75% of the bufferSize, rounded up to the next integer
            this.limit = calculatedSize - (calculatedSize >> 2);
            if (UnsafeAccess.isUnsafeAvailable()) {
                queue = new SpscArrayQueue<Object>(calculatedSize);
            } else {
                queue = new SpscAtomicArrayQueue<Object>(calculatedSize);
            }
            // signal that this is an async operator capable of receiving this many
            request(calculatedSize);
        }
    
        void init() {
            // don't want this code in the constructor because `this` can escape through the
            // setProducer call
            Subscriber<? super T> localChild = child;
    
            localChild.setProducer(new Producer() {
    
                @Override
                public void request(long n) {
                    if (n > 0L) {
                        BackpressureUtils.getAndAddRequest(requested, n);
                        schedule();
                    }
                }
    
            });
            localChild.add(recursiveScheduler);
            localChild.add(this);
        }
    
        @Override
        public void onNext(final T t) {
            if (isUnsubscribed() || finished) {
                return;
            }
            if (!queue.offer(NotificationLite.next(t))) {
                onError(new MissingBackpressureException());
                return;
            }
            // 有该方法
            schedule();
        }
    
        @Override
        public void onCompleted() {
            if (isUnsubscribed() || finished) {
                return;
            }
            finished = true;
            // 有该方法
            schedule();
        }
    
        @Override
        public void onError(final Throwable e) {
            if (isUnsubscribed() || finished) {
                RxJavaHooks.onError(e);
                return;
            }
            error = e;
            finished = true;
            // 有该方法
            schedule();
        }
    
        protected void schedule() {
            if (counter.getAndIncrement() == 0) {
                // 调用recursiveScheduler,而recursiveScheduler是通过参数传递过来的
                recursiveScheduler.schedule(this);
            }
        }
    
        // only execute this from schedule()
        // 最终会执行到该方法
        @Override
        public void call() {
            long missed = 1L;
            long currentEmission = emitted;
    
            // these are accessed in a tight loop around atomics so
            // loading them into local variables avoids the mandatory re-reading
            // of the constant fields
            final Queue<Object> q = this.queue;
            final Subscriber<? super T> localChild = this.child;
    
            // requested and counter are not included to avoid JIT issues with register spilling
            // and their access is is amortized because they are part of the outer loop which runs
            // less frequently (usually after each bufferSize elements)
    
            for (;;) {
                long requestAmount = requested.get();
    
                while (requestAmount != currentEmission) {
                    boolean done = finished;
                    Object v = q.poll();
                    boolean empty = v == null;
    
                    if (checkTerminated(done, empty, localChild, q)) {
                        return;
                    }
    
                    if (empty) {
                        break;
                    }
                    // 该方法会调用onNext方法
                    localChild.onNext(NotificationLite.<T>getValue(v));
    
                    currentEmission++;
                    if (currentEmission == limit) {
                        requestAmount = BackpressureUtils.produced(requested, currentEmission);
                        request(currentEmission);
                        currentEmission = 0L;
                    }
                }
    
                if (requestAmount == currentEmission) {
                    if (checkTerminated(finished, q.isEmpty(), localChild, q)) {
                        return;
                    }
                }
    
                emitted = currentEmission;
                missed = counter.addAndGet(-missed);
                if (missed == 0L) {
                    break;
                }
            }
        }
    }
    

    我们看到上面的onNextonComplete()onError方法里面都调用了schedule()方法,所以这个方法就是具体的切换线程的部分:

    protected void schedule() {
        if (counter.getAndIncrement() == 0) {
            // 调用recursiveScheduler,而recursiveScheduler是通过参数传递过来的,该方法将所有的事件都切换到了recursiveScheduler对应的线程
            // 这里将this传递进去了,所以最后会执行到当前的call()方法
            recursiveScheduler.schedule(this);
        }
    }
    

    里面会调用recursiveScheduler,而该recursiveScheduler是通过构造函数初始化的:

    public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError, int bufferSize) {
        this.child = child;
        // 通过参数传递过来的Schedule创建对应的worker
        this.recursiveScheduler = scheduler.createWorker();
    }
    

    而在此处我们的示例代码中用到的SchedulerAndroidSchedulers.mainThread():

    public final class AndroidSchedulers {
        private static final AtomicReference<AndroidSchedulers> INSTANCE = new AtomicReference<>();
    
        private final Scheduler mainThreadScheduler;
    
        private static AndroidSchedulers getInstance() {
            for (;;) {
                AndroidSchedulers current = INSTANCE.get();
                if (current != null) {
                    return current;
                }
                current = new AndroidSchedulers();
                if (INSTANCE.compareAndSet(null, current)) {
                    return current;
                }
            }
        }
    
        private AndroidSchedulers() {
            RxAndroidSchedulersHook hook = RxAndroidPlugins.getInstance().getSchedulersHook();
    
            Scheduler main = hook.getMainThreadScheduler();
            if (main != null) {
                mainThreadScheduler = main;
            } else {
                // 具体的实现类是LooperScheduler
                mainThreadScheduler = new LooperScheduler(Looper.getMainLooper());
            }
        }
    
        /** A {@link Scheduler} which executes actions on the Android UI thread. */
        public static Scheduler mainThread() {
            return getInstance().mainThreadScheduler;
        }
        ...
    }    
    

    我们看到这里的具体的Scheduler的实现类是LooperScheduler:

    class LooperScheduler extends Scheduler {
        private final Handler handler;
    
        LooperScheduler(Looper looper) {
            handler = new Handler(looper);
        }
    
        LooperScheduler(Handler handler) {
            this.handler = handler;
        }
    
        @Override
        public Worker createWorker() {
            return new HandlerWorker(handler);
        }
    
        static class HandlerWorker extends Worker {
            private final Handler handler;
            private final RxAndroidSchedulersHook hook;
            private volatile boolean unsubscribed;
    
            HandlerWorker(Handler handler) {
                this.handler = handler;
                this.hook = RxAndroidPlugins.getInstance().getSchedulersHook();
            }
    
            @Override
            public void unsubscribe() {
                unsubscribed = true;
                handler.removeCallbacksAndMessages(this /* token */);
            }
    
            @Override
            public boolean isUnsubscribed() {
                return unsubscribed;
            }
    
            @Override
            public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
                if (unsubscribed) {
                    return Subscriptions.unsubscribed();
                }
    
                action = hook.onSchedule(action);
    
                ScheduledAction scheduledAction = new ScheduledAction(action, handler);
    
                Message message = Message.obtain(handler, scheduledAction);
                message.obj = this; // Used as token for unsubscription operation.
    
                handler.sendMessageDelayed(message, unit.toMillis(delayTime));
    
                if (unsubscribed) {
                    handler.removeCallbacks(scheduledAction);
                    return Subscriptions.unsubscribed();
                }
    
                return scheduledAction;
            }
    
            @Override
            public Subscription schedule(final Action0 action) {
                return schedule(action, 0, TimeUnit.MILLISECONDS);
            }
        }
    
        static final class ScheduledAction implements Runnable, Subscription {
            private final Action0 action;
            private final Handler handler;
            private volatile boolean unsubscribed;
    
            ScheduledAction(Action0 action, Handler handler) {
                this.action = action;
                this.handler = handler;
            }
    
            @Override public void run() {
                try {
                    action.call();
                } catch (Throwable e) {
                    // nothing to do but print a System error as this is fatal and there is nowhere else to throw this
                    IllegalStateException ie;
                    if (e instanceof OnErrorNotImplementedException) {
                        ie = new IllegalStateException("Exception thrown on Scheduler.Worker thread. Add `onError` handling.", e);
                    } else {
                        ie = new IllegalStateException("Fatal Exception thrown on Scheduler.Worker thread.", e);
                    }
                    RxJavaPlugins.getInstance().getErrorHandler().handleError(ie);
                    Thread thread = Thread.currentThread();
                    thread.getUncaughtExceptionHandler().uncaughtException(thread, ie);
                }
            }
    
            @Override public void unsubscribe() {
                unsubscribed = true;
                handler.removeCallbacks(this);
            }
    
            @Override public boolean isUnsubscribed() {
                return unsubscribed;
            }
        }
    }
    

    他里面对应的WorkerHandlerWorker,他里面也是通过ScheduledAction来实现的,他实现了Runnable接口。通过主线程的MainLooper创建一个Handler然后去执行ScheduledAction中的run()方法。然后在run()方法中调用了ObserveOnSubscriber.call(),这便是它实现线程切换的原理。

    更多内容请看下一篇文章RxJava详解(七)


    更多精彩文章请见:Github AndroidNote,欢迎Star

    相关文章

      网友评论

          本文标题:RxJava详解之线程调度原理(六)

          本文链接:https://www.haomeiwen.com/subject/nvahvftx.html