美文网首页Android高级技术Android开发Android开发
Android 架构师之路21 响应式编程RxJava 线程变换

Android 架构师之路21 响应式编程RxJava 线程变换

作者: 香沙小熊 | 来源:发表于2018-06-19 13:28 被阅读160次

    Android 架构师之路 目录

    前言

    Scheduler体现了响应式编程思想:通过Scheduler实现了变化,并能向下传播。(变化传播)

    线程变换功能模块:
    1. 让代码可以在不同的线程执行
    2. subscribeOn-订阅时的线程
    3. observeOn- 接收时的线程
    4. Scheduler - 实际做线程变换
    1.RxJava1 线程变换
    1. Scheduler调度者
    2. Operator操作者符接口
    3. lift核心操作符
    实例
       Observable.
                            create(new Observable.OnSubscribe<String>() {
                                @Override
                                public void call(Subscriber<? super String> subscriber) {
                                    if (!subscriber.isUnsubscribed()) {
                                        Log.d(TAG, "currentThread:" + Thread.currentThread());
                                        subscriber.onNext("test");
                                        subscriber.onCompleted();
                                    }
                                }
                            }).
                            subscribeOn(Schedulers.newThread()).
                            observeOn(AndroidSchedulers.mainThread()).
                            subscribe(new Observer<String>() {
                                @Override
                                public void onCompleted() {
    
                                }
    
                                @Override
                                public void onError(Throwable e) {
    
                                }
    
                                @Override
                                public void onNext(String s) {
                                    Log.d(TAG, "onNext:" + s + "currentThread:" + Thread.currentThread());
                                }
                            });
    
    运行
    06-12 17:00:13.846 6227-6495/com.haocai.rxjavademo D/kpioneer: currentThread:Thread[RxNewThreadScheduler-1,5,main]
    06-12 17:00:13.856 6227-6227/com.haocai.rxjavademo D/kpioneer: onNext:testcurrentThread:Thread[main,5,main]
    
    2.RxJava2 线程变换
    1. Scheduler调度者
    2. AbstractObservableWithUpStream抽象类
                       /*---------无背压---------*/
                    Observable.
                            create(new ObservableOnSubscribe<String>() {
                                @Override
                                public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                                    if (!emitter.isDisposed()) {
                                        Log.d(TAG, "Observable currentThread:" + Thread.currentThread());
                                        emitter.onNext("test");
                                        emitter.onComplete();
                                    }
                                }
                            }).
                            subscribeOn(Schedulers.newThread()).
                            observeOn(AndroidSchedulers.mainThread()).
                            subscribe(new Observer<String>() {
                                @Override
                                public void onSubscribe(Disposable d) {
    
                                }
    
                                @Override
                                public void onNext(String o) {
                                    Log.d(TAG, "Observable onNext:" + o);
                                    Log.d(TAG, "Observable currentThread:" + Thread.currentThread());
    
                                }
    
                                @Override
                                public void onError(Throwable e) {
    
                                }
    
                                @Override
                                public void onComplete() {
    
                                }
                            });
                      /*---------有背压---------*/
                    Flowable.create(new FlowableOnSubscribe<String>() {
                        @Override
                        public void subscribe(FlowableEmitter<String> emitter) throws Exception {
                            if (!emitter.isCancelled()) {
                                Log.d(TAG, "Flowable currentThread:" + Thread.currentThread());
                                emitter.onNext("test");
                                emitter.onComplete();
                            }
                        }
                    }, BackpressureStrategy.DROP).
                            subscribeOn(Schedulers.newThread()).
                            observeOn(AndroidSchedulers.mainThread()).
                            subscribe(new Subscriber<String>() {
                                @Override
                                public void onSubscribe(Subscription s) {
                                    s.request(Long.MAX_VALUE);
                                }
    
                                @Override
                                public void onNext(String s) {
                                    Log.d(TAG, "Flowable onNext:" + s);
                                    Log.d(TAG, "Flowable currentThread:" + Thread.currentThread());
                                }
    
                                @Override
                                public void onError(Throwable t) {
    
                                }
    
                                @Override
                                public void onComplete() {
    
                                }
                            });
    
    
    06-13 13:37:13.009 3063-3949/com.haocai.rxjavademo D/kpioneer: Observable currentThread:Thread[RxNewThreadScheduler-1,5,main]
    06-13 13:37:13.019 3063-3063/com.haocai.rxjavademo D/kpioneer: Observable onNext:test
    06-13 13:37:13.019 3063-3063/com.haocai.rxjavademo D/kpioneer: Observable currentThread:Thread[main,5,main]
    06-13 13:37:13.019 3063-3950/com.haocai.rxjavademo D/kpioneer: Flowable currentThread:Thread[RxNewThreadScheduler-2,5,main]
    06-13 13:37:13.029 3063-3063/com.haocai.rxjavademo D/kpioneer: Flowable onNext:test
    06-13 13:37:13.029 3063-3063/com.haocai.rxjavademo D/kpioneer: Flowable currentThread:Thread[main,5,main]
    
    3.RxJava1 Scheduler调度者源码分析
    1. Scheduler:抽象类
    2. Worker:真正做线程调度的类
    3. Action0: 在线程中执行的操作
    4. schedule: 实际做线程调度的方法,入参为Action0
    public abstract class Scheduler {
    
        public abstract Worker createWorker();
    
        /**
         * Sequential Scheduler for executing actions on a single thread or event loop.
         * <p>
         * Unsubscribing the {@link Worker} cancels all outstanding work and allows resources cleanup.
         */
        public abstract static class Worker implements Subscription {
    
            /**
             * Schedules an Action for execution.
             *
             * @param action
             *            Action to schedule
             * @return a subscription to be able to prevent or cancel the execution of the action
             */
            public abstract Subscription schedule(Action0 action);
    
    
            public abstract Subscription schedule(final Action0 action, final long delayTime, final TimeUnit unit);
    
    
            public Subscription schedulePeriodically(final Action0 action, long initialDelay, long period, TimeUnit unit) {
                return SchedulePeriodicHelper.schedulePeriodically(this, action,
                        initialDelay, period, unit, null);
            }
    
    
            public long now() {
                return System.currentTimeMillis();
            }
        }
    
    
        public long now() {
            return System.currentTimeMillis();
        }
    
    
        @SuppressWarnings("unchecked")
        public <S extends Scheduler & Subscription> S when(Func1<Observable<Observable<Completable>>, Completable> combine) {
            return (S) new SchedulerWhen(combine, this);
        }
    }
    
    线程调度过程:
    1. 传入不同Scheduler来使用不同的线程
        public final Observable<T> subscribeOn(Scheduler scheduler) {
            return subscribeOn(scheduler, !(this.onSubscribe instanceof OnSubscribeCreate));
        }
    
        public final Observable<T> observeOn(Scheduler scheduler) {
            return observeOn(scheduler, RxRingBuffer.SIZE);
        }
    
    1. 用Scheduler创建Worker来使用真正的线程池

    NewThreadWorker 中创建线程池

    public class NewThreadWorker extends Scheduler.Worker implements Subscription {
        private final ScheduledExecutorService executor;
        volatile boolean isUnsubscribed;
        /** The purge frequency in milliseconds. */
        private static final String FREQUENCY_KEY = "rx.scheduler.jdk6.purge-frequency-millis";
        /** Force the use of purge (true/false). */
        private static final String PURGE_FORCE_KEY = "rx.scheduler.jdk6.purge-force";
        private static final String PURGE_THREAD_PREFIX = "RxSchedulerPurge-";
        private static final boolean SHOULD_TRY_ENABLE_CANCEL_POLICY;
        /** The purge frequency in milliseconds. */
        public static final int PURGE_FREQUENCY;
        private static final ConcurrentHashMap<ScheduledThreadPoolExecutor, ScheduledThreadPoolExecutor> EXECUTORS;
        private static final AtomicReference<ScheduledExecutorService> PURGE;
        /**
         * Improves performance of {@link #tryEnableCancelPolicy(ScheduledExecutorService)}.
         * Also, it works even for inheritance: {@link Method} of base class can be invoked on the instance of child class.
         */
        private static volatile Object cachedSetRemoveOnCancelPolicyMethod;
    
        /**
         * Possible value of {@link #cachedSetRemoveOnCancelPolicyMethod} which means that cancel policy is not supported.
         */
        private static final Object SET_REMOVE_ON_CANCEL_POLICY_METHOD_NOT_SUPPORTED = new Object();
    
        static {
            EXECUTORS = new ConcurrentHashMap<ScheduledThreadPoolExecutor, ScheduledThreadPoolExecutor>();
            PURGE = new AtomicReference<ScheduledExecutorService>();
            PURGE_FREQUENCY = Integer.getInteger(FREQUENCY_KEY, 1000);
    
            // Forces the use of purge even if setRemoveOnCancelPolicy is available
            final boolean purgeForce = Boolean.getBoolean(PURGE_FORCE_KEY);
    
            final int androidApiVersion = PlatformDependent.getAndroidApiVersion();
    
            // According to http://developer.android.com/reference/java/util/concurrent/ScheduledThreadPoolExecutor.html#setRemoveOnCancelPolicy(boolean)
            // setRemoveOnCancelPolicy available since Android API 21
            SHOULD_TRY_ENABLE_CANCEL_POLICY = !purgeForce
                    && (androidApiVersion == ANDROID_API_VERSION_IS_NOT_ANDROID || androidApiVersion >= 21);
        }
        /**
         * Registers the given executor service and starts the purge thread if not already started.
         * <p>{@code public} visibility reason: called from other package(s) within RxJava
         * @param service a scheduled thread pool executor instance
         */
        public static void registerExecutor(ScheduledThreadPoolExecutor service) {
            do {
                ScheduledExecutorService exec = PURGE.get();
                if (exec != null) {
                    break;
                }
                exec = Executors.newScheduledThreadPool(1, new RxThreadFactory(PURGE_THREAD_PREFIX));
                if (PURGE.compareAndSet(null, exec)) {
                    exec.scheduleAtFixedRate(new Runnable() {
                        @Override
                        public void run() {
                            purgeExecutors();
                        }
                    }, PURGE_FREQUENCY, PURGE_FREQUENCY, TimeUnit.MILLISECONDS);
    
                    break;
                } else {
                    exec.shutdownNow();
                }
            } while (true);
    
            EXECUTORS.putIfAbsent(service, service);
        }
        /**
         * Deregisters the executor service.
         * <p>{@code public} visibility reason: called from other package(s) within RxJava
         * @param service a scheduled thread pool executor instance
         */
        public static void deregisterExecutor(ScheduledExecutorService service) {
            EXECUTORS.remove(service);
        }
    
        /** Purges each registered executor and eagerly evicts shutdown executors. */
        static void purgeExecutors() {
            try {
                // This prevents map.keySet to compile to a Java 8+ KeySetView return type
                // and cause NoSuchMethodError on Java 6-7 runtimes.
                Map<ScheduledThreadPoolExecutor, ScheduledThreadPoolExecutor> map = EXECUTORS;
                Iterator<ScheduledThreadPoolExecutor> it = map.keySet().iterator();
                while (it.hasNext()) {
                    ScheduledThreadPoolExecutor exec = it.next();
                    if (!exec.isShutdown()) {
                        exec.purge();
                    } else {
                        it.remove();
                    }
                }
            } catch (Throwable t) {
                Exceptions.throwIfFatal(t);
                RxJavaHooks.onError(t);
            }
        }
    
        /**
         * Tries to enable the Java 7+ setRemoveOnCancelPolicy.
         * <p>{@code public} visibility reason: called from other package(s) within RxJava.
         * If the method returns false, the {@link #registerExecutor(ScheduledThreadPoolExecutor)} may
         * be called to enable the backup option of purging the executors.
         * @param executor the executor to call setRemoveOnCancelPolicy if available.
         * @return true if the policy was successfully enabled
         */
        public static boolean tryEnableCancelPolicy(ScheduledExecutorService executor) {
            if (SHOULD_TRY_ENABLE_CANCEL_POLICY) { // NOPMD
                final boolean isInstanceOfScheduledThreadPoolExecutor = executor instanceof ScheduledThreadPoolExecutor;
    
                Method methodToCall;
    
                if (isInstanceOfScheduledThreadPoolExecutor) {
                    final Object localSetRemoveOnCancelPolicyMethod = cachedSetRemoveOnCancelPolicyMethod;
    
                    if (localSetRemoveOnCancelPolicyMethod == SET_REMOVE_ON_CANCEL_POLICY_METHOD_NOT_SUPPORTED) {
                        return false;
                    }
    
                    if (localSetRemoveOnCancelPolicyMethod == null) {
                        Method method = findSetRemoveOnCancelPolicyMethod(executor);
    
                        cachedSetRemoveOnCancelPolicyMethod = method != null
                                ? method
                                : SET_REMOVE_ON_CANCEL_POLICY_METHOD_NOT_SUPPORTED;
    
                        methodToCall = method;
                    } else {
                        methodToCall = (Method) localSetRemoveOnCancelPolicyMethod;
                    }
                } else {
                    methodToCall = findSetRemoveOnCancelPolicyMethod(executor);
                }
    
                if (methodToCall != null) {
                    try {
                        methodToCall.invoke(executor, true);
                        return true;
                    } catch (InvocationTargetException e) {
                        RxJavaHooks.onError(e);
                    } catch (IllegalAccessException e) {
                        RxJavaHooks.onError(e);
                    } catch (IllegalArgumentException e) {
                        RxJavaHooks.onError(e);
                    }
                }
            }
    
            return false;
        }
    
        /**
         * Tries to find {@code "setRemoveOnCancelPolicy(boolean)"} method in the class of passed executor.
         *
         * @param executor whose class will be used to search for required method.
         * @return {@code "setRemoveOnCancelPolicy(boolean)"} {@link Method}
         * or {@code null} if required {@link Method} was not found.
         */
        static Method findSetRemoveOnCancelPolicyMethod(ScheduledExecutorService executor) {
            // The reason for the loop is to avoid NoSuchMethodException being thrown on JDK 6
            // which is more costly than looping through ~70 methods.
            for (final Method method : executor.getClass().getMethods()) {
                if (method.getName().equals("setRemoveOnCancelPolicy")) {
                    final Class<?>[] parameterTypes = method.getParameterTypes();
    
                    if (parameterTypes.length == 1 && parameterTypes[0] == Boolean.TYPE) {
                        return method;
                    }
                }
            }
    
            return null;
        }
    
        /* package */
        public NewThreadWorker(ThreadFactory threadFactory) {
            ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, threadFactory);
            // Java 7+: cancelled future tasks can be removed from the executor thus avoiding memory leak
            boolean cancelSupported = tryEnableCancelPolicy(exec);
            if (!cancelSupported && exec instanceof ScheduledThreadPoolExecutor) {
                registerExecutor((ScheduledThreadPoolExecutor)exec);
            }
            executor = exec;
        }
    
        @Override
        public Subscription schedule(final Action0 action) {
            return schedule(action, 0, null);
        }
    
        @Override
        public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
            if (isUnsubscribed) {
                return Subscriptions.unsubscribed();
            }
            return scheduleActual(action, delayTime, unit);
        }
    
        /**
         * Schedules the given action by wrapping it into a ScheduledAction on the
         * underlying ExecutorService, returning the ScheduledAction.
         * @param action the action to wrap and schedule
         * @param delayTime the delay in execution
         * @param unit the time unit of the delay
         * @return the wrapper ScheduledAction
         */
        public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
            Action0 decoratedAction = RxJavaHooks.onScheduledAction(action);
            ScheduledAction run = new ScheduledAction(decoratedAction);
            Future<?> f;
            if (delayTime <= 0) {
                f = executor.submit(run);
            } else {
                f = executor.schedule(run, delayTime, unit);
            }
            run.add(f);
    
            return run;
        }
        public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit, CompositeSubscription parent) {
            Action0 decoratedAction = RxJavaHooks.onScheduledAction(action);
            ScheduledAction run = new ScheduledAction(decoratedAction, parent);
            parent.add(run);
    
            Future<?> f;
            if (delayTime <= 0) {
                f = executor.submit(run);
            } else {
                f = executor.schedule(run, delayTime, unit);
            }
            run.add(f);
    
            return run;
        }
    
        public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit, SubscriptionList parent) {
            Action0 decoratedAction = RxJavaHooks.onScheduledAction(action);
            ScheduledAction run = new ScheduledAction(decoratedAction, parent);
            parent.add(run);
    
            Future<?> f;
            if (delayTime <= 0) {
                f = executor.submit(run);
            } else {
                f = executor.schedule(run, delayTime, unit);
            }
            run.add(f);
    
            return run;
        }
    
        @Override
        public void unsubscribe() {
            isUnsubscribed = true;
            executor.shutdownNow();
            deregisterExecutor(executor);
        }
    
        @Override
        public boolean isUnsubscribed() {
            return isUnsubscribed;
        }
    }
    
    1. 传入具体操作Action0

    2. 通过scheduler方法来实现调度

    ScheduledAction 中 action.call(); 执行具体操作

    public final class ScheduledAction extends AtomicReference<Thread> implements Runnable, Subscription {
        /** */
        private static final long serialVersionUID = -3962399486978279857L;
        final SubscriptionList cancel;
        final Action0 action;
    
        public ScheduledAction(Action0 action) {
            this.action = action;
            this.cancel = new SubscriptionList();
        }
        public ScheduledAction(Action0 action, CompositeSubscription parent) {
            this.action = action;
            this.cancel = new SubscriptionList(new Remover(this, parent));
        }
        public ScheduledAction(Action0 action, SubscriptionList parent) {
            this.action = action;
            this.cancel = new SubscriptionList(new Remover2(this, parent));
        }
    
        @Override
        public void run() {
            try {
                lazySet(Thread.currentThread());
                action.call();
            } catch (OnErrorNotImplementedException e) {
                signalError(new IllegalStateException("Exception thrown on Scheduler.Worker thread. Add `onError` handling.", e));
            } catch (Throwable e) {
                signalError(new IllegalStateException("Fatal Exception thrown on Scheduler.Worker thread.", e));
            } finally {
                unsubscribe();
            }
        }
    
        void signalError(Throwable ie) {
            RxJavaHooks.onError(ie);
            Thread thread = Thread.currentThread();
            thread.getUncaughtExceptionHandler().uncaughtException(thread, ie);
        }
    
        @Override
        public boolean isUnsubscribed() {
            return cancel.isUnsubscribed();
        }
    
        @Override
        public void unsubscribe() {
            if (!cancel.isUnsubscribed()) {
                cancel.unsubscribe();
            }
        }
    
        /**
         * Adds a general Subscription to this {@code ScheduledAction} that will be unsubscribed
         * if the underlying {@code action} completes or the this scheduled action is cancelled.
         *
         * @param s the Subscription to add
         */
        public void add(Subscription s) {
            cancel.add(s);
        }
    
        /**
         * Adds the given Future to the unsubscription composite in order to support
         * cancelling the underlying task in the executor framework.
         * @param f the future to add
         */
        public void add(final Future<?> f) {
            cancel.add(new FutureCompleter(f));
        }
    
        /**
         * Adds a parent {@link CompositeSubscription} to this {@code ScheduledAction} so when the action is
         * cancelled or terminates, it can remove itself from this parent.
         *
         * @param parent
         *            the parent {@code CompositeSubscription} to add
         */
        public void addParent(CompositeSubscription parent) {
            cancel.add(new Remover(this, parent));
        }
    
        /**
         * Adds a parent {@link CompositeSubscription} to this {@code ScheduledAction} so when the action is
         * cancelled or terminates, it can remove itself from this parent.
         *
         * @param parent
         *            the parent {@code CompositeSubscription} to add
         */
        public void addParent(SubscriptionList parent) {
            cancel.add(new Remover2(this, parent));
        }
    
        /**
         * Cancels the captured future if the caller of the call method
         * is not the same as the runner of the outer ScheduledAction to
         * prevent unnecessary self-interrupting if the unsubscription
         * happens from the same thread.
         */
        final class FutureCompleter implements Subscription {
            private final Future<?> f;
    
            FutureCompleter(Future<?> f) {
                this.f = f;
            }
    
            @Override
            public void unsubscribe() {
                if (ScheduledAction.this.get() != Thread.currentThread()) {
                    f.cancel(true);
                } else {
                    f.cancel(false);
                }
            }
            @Override
            public boolean isUnsubscribed() {
                return f.isCancelled();
            }
        }
    
        /** Remove a child subscription from a composite when unsubscribing. */
        static final class Remover extends AtomicBoolean implements Subscription {
            /** */
            private static final long serialVersionUID = 247232374289553518L;
            final ScheduledAction s;
            final CompositeSubscription parent;
    
            public Remover(ScheduledAction s, CompositeSubscription parent) {
                this.s = s;
                this.parent = parent;
            }
    
            @Override
            public boolean isUnsubscribed() {
                return s.isUnsubscribed();
            }
    
            @Override
            public void unsubscribe() {
                if (compareAndSet(false, true)) {
                    parent.remove(s);
                }
            }
    
        }
        /** Remove a child subscription from a composite when unsubscribing. */
        static final class Remover2 extends AtomicBoolean implements Subscription {
            /** */
            private static final long serialVersionUID = 247232374289553518L;
            final ScheduledAction s;
            final SubscriptionList parent;
    
            public Remover2(ScheduledAction s, SubscriptionList parent) {
                this.s = s;
                this.parent = parent;
            }
    
            @Override
            public boolean isUnsubscribed() {
                return s.isUnsubscribed();
            }
    
            @Override
            public void unsubscribe() {
                if (compareAndSet(false, true)) {
                    parent.remove(s);
                }
            }
    
        }
    }
    
    rxJava1: rxandroid中的Scheduler

    通过Handler和Looper来实现执行在主线程

    /** Android-specific Schedulers. */
    public final class AndroidSchedulers {
        private static final AtomicReference<AndroidSchedulers> INSTANCE = new AtomicReference<>();
    
        private final Scheduler mainThreadScheduler;
    
        private static AndroidSchedulers getInstance() {
            for (;;) {
                AndroidSchedulers current = INSTANCE.get();
                if (current != null) {
                    return current;
                }
                current = new AndroidSchedulers();
                if (INSTANCE.compareAndSet(null, current)) {
                    return current;
                }
            }
        }
    
        private AndroidSchedulers() {
            RxAndroidSchedulersHook hook = RxAndroidPlugins.getInstance().getSchedulersHook();
    
            Scheduler main = hook.getMainThreadScheduler();
            if (main != null) {
                mainThreadScheduler = main;
            } else {
                mainThreadScheduler = new LooperScheduler(Looper.getMainLooper());
            }
        }
    
        /** A {@link Scheduler} which executes actions on the Android UI thread. */
        public static Scheduler mainThread() {
            return getInstance().mainThreadScheduler;
        }
    
        /** A {@link Scheduler} which executes actions on {@code looper}. */
        public static Scheduler from(Looper looper) {
            if (looper == null) throw new NullPointerException("looper == null");
            return new LooperScheduler(looper);
        }
    
        /**
         * Resets the current {@link AndroidSchedulers} instance.
         * This will re-init the cached schedulers on the next usage,
         * which can be useful in testing.
         */
        @Experimental
        public static void reset() {
            INSTANCE.set(null);
        }
    }
    
    class LooperScheduler extends Scheduler {
        private final Handler handler;
    
        LooperScheduler(Looper looper) {
            handler = new Handler(looper);
        }
    
        LooperScheduler(Handler handler) {
            this.handler = handler;
        }
    
        @Override
        public Worker createWorker() {
            return new HandlerWorker(handler);
        }
    
        static class HandlerWorker extends Worker {
            private final Handler handler;
            private final RxAndroidSchedulersHook hook;
            private volatile boolean unsubscribed;
    
            HandlerWorker(Handler handler) {
                this.handler = handler;
                this.hook = RxAndroidPlugins.getInstance().getSchedulersHook();
            }
    
            @Override
            public void unsubscribe() {
                unsubscribed = true;
                handler.removeCallbacksAndMessages(this /* token */);
            }
    
            @Override
            public boolean isUnsubscribed() {
                return unsubscribed;
            }
    
            @Override
            public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
                if (unsubscribed) {
                    return Subscriptions.unsubscribed();
                }
    
                action = hook.onSchedule(action);
    
                ScheduledAction scheduledAction = new ScheduledAction(action, handler);
    
                Message message = Message.obtain(handler, scheduledAction);
                message.obj = this; // Used as token for unsubscription operation.
    
                handler.sendMessageDelayed(message, unit.toMillis(delayTime));
    
                if (unsubscribed) {
                    handler.removeCallbacks(scheduledAction);
                    return Subscriptions.unsubscribed();
                }
    
                return scheduledAction;
            }
    
            @Override
            public Subscription schedule(final Action0 action) {
                return schedule(action, 0, TimeUnit.MILLISECONDS);
            }
        }
    
        static final class ScheduledAction implements Runnable, Subscription {
            private final Action0 action;
            private final Handler handler;
            private volatile boolean unsubscribed;
    
            ScheduledAction(Action0 action, Handler handler) {
                this.action = action;
                this.handler = handler;
            }
    
            @Override public void run() {
                try {
                    action.call();
                } catch (Throwable e) {
                    // nothing to do but print a System error as this is fatal and there is nowhere else to throw this
                    IllegalStateException ie;
                    if (e instanceof OnErrorNotImplementedException) {
                        ie = new IllegalStateException("Exception thrown on Scheduler.Worker thread. Add `onError` handling.", e);
                    } else {
                        ie = new IllegalStateException("Fatal Exception thrown on Scheduler.Worker thread.", e);
                    }
                    RxJavaPlugins.getInstance().getErrorHandler().handleError(ie);
                    Thread thread = Thread.currentThread();
                    thread.getUncaughtExceptionHandler().uncaughtException(thread, ie);
                }
            }
    
            @Override public void unsubscribe() {
                unsubscribed = true;
                handler.removeCallbacks(this);
            }
    
            @Override public boolean isUnsubscribed() {
                return unsubscribed;
            }
        }
    }
    
    

    4.RxJava2 Scheduler调度者源码分析

    public abstract class Scheduler {
      
        static final long CLOCK_DRIFT_TOLERANCE_NANOSECONDS;
        static {
            CLOCK_DRIFT_TOLERANCE_NANOSECONDS = TimeUnit.MINUTES.toNanos(
                    Long.getLong("rx2.scheduler.drift-tolerance", 15));
        }
       
        public static long clockDriftTolerance() {
            return CLOCK_DRIFT_TOLERANCE_NANOSECONDS;
        }
    
        
        @NonNull
        public abstract io.reactivex.Scheduler.Worker createWorker();
        
        public long now(@NonNull TimeUnit unit) {
            return unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }
        
        public void start() {
    
        }
    
        public void shutdown() {
    
        }
        
        @NonNull
        public Disposable scheduleDirect(@NonNull Runnable run) {
            return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
        }
    
     
        @NonNull
        public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
            final io.reactivex.Scheduler.Worker w = createWorker();
    
            final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    
            io.reactivex.Scheduler.DisposeTask task = new io.reactivex.Scheduler.DisposeTask(decoratedRun, w);
    
            w.schedule(task, delay, unit);
    
            return task;
        }
    
        @NonNull
        public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, @NonNull TimeUnit unit) {
            final io.reactivex.Scheduler.Worker w = createWorker();
    
            final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    
            io.reactivex.Scheduler.PeriodicDirectTask periodicTask = new io.reactivex.Scheduler.PeriodicDirectTask(decoratedRun, w);
    
            Disposable d = w.schedulePeriodically(periodicTask, initialDelay, period, unit);
            if (d == EmptyDisposable.INSTANCE) {
                return d;
            }
    
            return periodicTask;
        }
    
      
        @SuppressWarnings("unchecked")
        @NonNull
        public <S extends io.reactivex.Scheduler & Disposable> S when(@NonNull Function<Flowable<Flowable<Completable>>, Completable> combine) {
            return (S) new SchedulerWhen(combine, this);
        }
    
       
        public abstract static class Worker implements Disposable {
            /**
             * Schedules a Runnable for execution without any time delay.
             *
             * <p>The default implementation delegates to {@link #schedule(Runnable, long, TimeUnit)}.
             *
             * @param run
             *            Runnable to schedule
             * @return a Disposable to be able to unsubscribe the action (cancel it if not executed)
             */
            @NonNull
            public Disposable schedule(@NonNull Runnable run) {
                return schedule(run, 0L, TimeUnit.NANOSECONDS);
            }
    
            /**
             * Schedules an Runnable for execution at some point in the future specified by a time delay
             * relative to the current time.
             * <p>
             * Note to implementors: non-positive {@code delayTime} should be regarded as non-delayed schedule, i.e.,
             * as if the {@link #schedule(Runnable)} was called.
             *
             * @param run
             *            the Runnable to schedule
             * @param delay
             *            time to "wait" before executing the action; non-positive values indicate an non-delayed
             *            schedule
             * @param unit
             *            the time unit of {@code delayTime}
             * @return a Disposable to be able to unsubscribe the action (cancel it if not executed)
             */
            @NonNull
            public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit);
    
     
            @NonNull
            public Disposable schedulePeriodically(@NonNull Runnable run, final long initialDelay, final long period, @NonNull final TimeUnit unit) {
                final SequentialDisposable first = new SequentialDisposable();
    
                final SequentialDisposable sd = new SequentialDisposable(first);
    
                final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    
                final long periodInNanoseconds = unit.toNanos(period);
                final long firstNowNanoseconds = now(TimeUnit.NANOSECONDS);
                final long firstStartInNanoseconds = firstNowNanoseconds + unit.toNanos(initialDelay);
    
                Disposable d = schedule(new io.reactivex.Scheduler.Worker.PeriodicTask(firstStartInNanoseconds, decoratedRun, firstNowNanoseconds, sd,
                        periodInNanoseconds), initialDelay, unit);
    
                if (d == EmptyDisposable.INSTANCE) {
                    return d;
                }
                first.replace(d);
    
                return sd;
            }
    
            /**
             * Returns the 'current time' of the Worker in the specified time unit.
             * @param unit the time unit
             * @return the 'current time'
             * @since 2.0
             */
            public long now(@NonNull TimeUnit unit) {
                return unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
            }
    
            /**
             * Holds state and logic to calculate when the next delayed invocation
             * of this task has to happen (accounting for clock drifts).
             */
            final class PeriodicTask implements Runnable, SchedulerRunnableIntrospection {
                @NonNull
                final Runnable decoratedRun;
                @NonNull
                final SequentialDisposable sd;
                final long periodInNanoseconds;
                long count;
                long lastNowNanoseconds;
                long startInNanoseconds;
    
                PeriodicTask(long firstStartInNanoseconds, @NonNull Runnable decoratedRun,
                             long firstNowNanoseconds, @NonNull SequentialDisposable sd, long periodInNanoseconds) {
                    this.decoratedRun = decoratedRun;
                    this.sd = sd;
                    this.periodInNanoseconds = periodInNanoseconds;
                    lastNowNanoseconds = firstNowNanoseconds;
                    startInNanoseconds = firstStartInNanoseconds;
                }
    
                @Override
                public void run() {
                    decoratedRun.run();
    
                    if (!sd.isDisposed()) {
    
                        long nextTick;
    
                        long nowNanoseconds = now(TimeUnit.NANOSECONDS);
                        // If the clock moved in a direction quite a bit, rebase the repetition period
                        if (nowNanoseconds + CLOCK_DRIFT_TOLERANCE_NANOSECONDS < lastNowNanoseconds
                                || nowNanoseconds >= lastNowNanoseconds + periodInNanoseconds + CLOCK_DRIFT_TOLERANCE_NANOSECONDS) {
                            nextTick = nowNanoseconds + periodInNanoseconds;
                            /*
                             * Shift the start point back by the drift as if the whole thing
                             * started count periods ago.
                             */
                            startInNanoseconds = nextTick - (periodInNanoseconds * (++count));
                        } else {
                            nextTick = startInNanoseconds + (++count * periodInNanoseconds);
                        }
                        lastNowNanoseconds = nowNanoseconds;
    
                        long delay = nextTick - nowNanoseconds;
                        sd.replace(schedule(this, delay, TimeUnit.NANOSECONDS));
                    }
                }
    
                @Override
                public Runnable getWrappedRunnable() {
                    return this.decoratedRun;
                }
            }
        }
    
        static final class PeriodicDirectTask
                implements Disposable, Runnable, SchedulerRunnableIntrospection {
    
            @NonNull
            final Runnable run;
    
            @NonNull
            final io.reactivex.Scheduler.Worker worker;
    
            volatile boolean disposed;
    
            PeriodicDirectTask(@NonNull Runnable run, @NonNull io.reactivex.Scheduler.Worker worker) {
                this.run = run;
                this.worker = worker;
            }
    
            @Override
            public void run() {
                if (!disposed) {
                    try {
                        run.run();
                    } catch (Throwable ex) {
                        Exceptions.throwIfFatal(ex);
                        worker.dispose();
                        throw ExceptionHelper.wrapOrThrow(ex);
                    }
                }
            }
    
            @Override
            public void dispose() {
                disposed = true;
                worker.dispose();
            }
    
            @Override
            public boolean isDisposed() {
                return disposed;
            }
    
            @Override
            public Runnable getWrappedRunnable() {
                return run;
            }
        }
    
        static final class DisposeTask implements Disposable, Runnable, SchedulerRunnableIntrospection {
    
            @NonNull
            final Runnable decoratedRun;
    
            @NonNull
            final io.reactivex.Scheduler.Worker w;
    
            @Nullable
            Thread runner;
    
            DisposeTask(@NonNull Runnable decoratedRun, @NonNull io.reactivex.Scheduler.Worker w) {
                this.decoratedRun = decoratedRun;
                this.w = w;
            }
    
            @Override
            public void run() {
                runner = Thread.currentThread();
                try {
                    decoratedRun.run();
                } finally {
                    dispose();
                    runner = null;
                }
            }
    
            @Override
            public void dispose() {
                if (runner == Thread.currentThread() && w instanceof NewThreadWorker) {
                    ((NewThreadWorker)w).shutdown();
                } else {
                    w.dispose();
                }
            }
    
            @Override
            public boolean isDisposed() {
                return w.isDisposed();
            }
    
            @Override
            public Runnable getWrappedRunnable() {
                return this.decoratedRun;
            }
        }
    }
    
    1. 传入不同的Scheduler来使用不同的线程
    2. 用Scheduler创建Worker来使用真正的线程池
    public class NewThreadWorker extends Scheduler.Worker implements Disposable {
        private final ScheduledExecutorService executor;
    
        volatile boolean disposed;
    
        public NewThreadWorker(ThreadFactory threadFactory) {
            executor = SchedulerPoolFactory.create(threadFactory);
        }
    
        @NonNull
        @Override
        public Disposable schedule(@NonNull final Runnable run) {
            return schedule(run, 0, null);
        }
    
        @NonNull
        @Override
        public Disposable schedule(@NonNull final Runnable action, long delayTime, @NonNull TimeUnit unit) {
            if (disposed) {
                return EmptyDisposable.INSTANCE;
            }
            return scheduleActual(action, delayTime, unit, null);
        }
    
        /**
         * Schedules the given runnable on the underlying executor directly and
         * returns its future wrapped into a Disposable.
         * @param run the Runnable to execute in a delayed fashion
         * @param delayTime the delay amount
         * @param unit the delay time unit
         * @return the ScheduledRunnable instance
         */
        public Disposable scheduleDirect(final Runnable run, long delayTime, TimeUnit unit) {
            ScheduledDirectTask task = new ScheduledDirectTask(RxJavaPlugins.onSchedule(run));
            try {
                Future<?> f;
                if (delayTime <= 0L) {
                    f = executor.submit(task);
                } else {
                    f = executor.schedule(task, delayTime, unit);
                }
                task.setFuture(f);
                return task;
            } catch (RejectedExecutionException ex) {
                RxJavaPlugins.onError(ex);
                return EmptyDisposable.INSTANCE;
            }
        }
    
        /**
         * Schedules the given runnable periodically on the underlying executor directly
         * and returns its future wrapped into a Disposable.
         * @param run the Runnable to execute in a periodic fashion
         * @param initialDelay the initial delay amount
         * @param period the repeat period amount
         * @param unit the time unit for both the initialDelay and period
         * @return the ScheduledRunnable instance
         */
        public Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, long period, TimeUnit unit) {
            final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
            if (period <= 0L) {
    
                InstantPeriodicTask periodicWrapper = new InstantPeriodicTask(decoratedRun, executor);
                try {
                    Future<?> f;
                    if (initialDelay <= 0L) {
                        f = executor.submit(periodicWrapper);
                    } else {
                        f = executor.schedule(periodicWrapper, initialDelay, unit);
                    }
                    periodicWrapper.setFirst(f);
                } catch (RejectedExecutionException ex) {
                    RxJavaPlugins.onError(ex);
                    return EmptyDisposable.INSTANCE;
                }
    
                return periodicWrapper;
            }
            ScheduledDirectPeriodicTask task = new ScheduledDirectPeriodicTask(decoratedRun);
            try {
                Future<?> f = executor.scheduleAtFixedRate(task, initialDelay, period, unit);
                task.setFuture(f);
                return task;
            } catch (RejectedExecutionException ex) {
                RxJavaPlugins.onError(ex);
                return EmptyDisposable.INSTANCE;
            }
        }
    
    
        /**
         * Wraps the given runnable into a ScheduledRunnable and schedules it
         * on the underlying ScheduledExecutorService.
         * <p>If the schedule has been rejected, the ScheduledRunnable.wasScheduled will return
         * false.
         * @param run the runnable instance
         * @param delayTime the time to delay the execution
         * @param unit the time unit
         * @param parent the optional tracker parent to add the created ScheduledRunnable instance to before it gets scheduled
         * @return the ScheduledRunnable instance
         */
        @NonNull
        public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
            Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    
            ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
    
            if (parent != null) {
                if (!parent.add(sr)) {
                    return sr;
                }
            }
    
            Future<?> f;
            try {
                if (delayTime <= 0) {
                    f = executor.submit((Callable<Object>)sr);
                } else {
                    f = executor.schedule((Callable<Object>)sr, delayTime, unit);
                }
                sr.setFuture(f);
            } catch (RejectedExecutionException ex) {
                if (parent != null) {
                    parent.remove(sr);
                }
                RxJavaPlugins.onError(ex);
            }
    
            return sr;
        }
    
        @Override
        public void dispose() {
            if (!disposed) {
                disposed = true;
                executor.shutdownNow();
            }
        }
    
        /**
         * Shuts down the underlying executor in a non-interrupting fashion.
         */
        public void shutdown() {
            if (!disposed) {
                disposed = true;
                executor.shutdown();
            }
        }
    
        @Override
        public boolean isDisposed() {
            return disposed;
        }
    }
    
    1. 传入具体操作Runnable
    2. 通过schedule方法来实现调度
    RxJava2: rxandroid中的Scheduler

    与RxJava1类似通过Handler和Looper来实现执行在主线程

    public final class AndroidSchedulers {
    
        private static final class MainHolder {
    
            static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
        }
    
        private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
                new Callable<Scheduler>() {
                    @Override public Scheduler call() throws Exception {
                        return MainHolder.DEFAULT;
                    }
                });
    
        /** A {@link Scheduler} which executes actions on the Android main thread. */
        public static Scheduler mainThread() {
            return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
        }
    
        /** A {@link Scheduler} which executes actions on {@code looper}. */
        public static Scheduler from(Looper looper) {
            if (looper == null) throw new NullPointerException("looper == null");
            return new HandlerScheduler(new Handler(looper));
        }
    
        private AndroidSchedulers() {
            throw new AssertionError("No instances.");
        }
    }
    
    
    final class HandlerScheduler extends Scheduler {
        private final Handler handler;
    
        HandlerScheduler(Handler handler) {
            this.handler = handler;
        }
    
        @Override
        public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
            if (run == null) throw new NullPointerException("run == null");
            if (unit == null) throw new NullPointerException("unit == null");
    
            run = RxJavaPlugins.onSchedule(run);
            ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
            handler.postDelayed(scheduled, unit.toMillis(delay));
            return scheduled;
        }
    
        @Override
        public Worker createWorker() {
            return new HandlerWorker(handler);
        }
    
        private static final class HandlerWorker extends Worker {
            private final Handler handler;
    
            private volatile boolean disposed;
    
            HandlerWorker(Handler handler) {
                this.handler = handler;
            }
    
            @Override
            public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
                if (run == null) throw new NullPointerException("run == null");
                if (unit == null) throw new NullPointerException("unit == null");
    
                if (disposed) {
                    return Disposables.disposed();
                }
    
                run = RxJavaPlugins.onSchedule(run);
    
                ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
    
                Message message = Message.obtain(handler, scheduled);
                message.obj = this; // Used as token for batch disposal of this worker's runnables.
    
                handler.sendMessageDelayed(message, unit.toMillis(delay));
    
                // Re-check disposed state for removing in case we were racing a call to dispose().
                if (disposed) {
                    handler.removeCallbacks(scheduled);
                    return Disposables.disposed();
                }
    
                return scheduled;
            }
    
            @Override
            public void dispose() {
                disposed = true;
                handler.removeCallbacksAndMessages(this /* token */);
            }
    
            @Override
            public boolean isDisposed() {
                return disposed;
            }
        }
    
        private static final class ScheduledRunnable implements Runnable, Disposable {
            private final Handler handler;
            private final Runnable delegate;
    
            private volatile boolean disposed;
    
            ScheduledRunnable(Handler handler, Runnable delegate) {
                this.handler = handler;
                this.delegate = delegate;
            }
    
            @Override
            public void run() {
                try {
                    delegate.run();
                } catch (Throwable t) {
                    RxJavaPlugins.onError(t);
                }
            }
    
            @Override
            public void dispose() {
                disposed = true;
                handler.removeCallbacks(this);
            }
    
            @Override
            public boolean isDisposed() {
                return disposed;
            }
        }
    }
    
    

    5.RxJava1 Scheduler线程变换仿写

    主要对象:
    Switcher线程切换者
    1. 用于线程切换
    2. 有一个createWorker方法
    /**
     * Created by Xionghu on 2018/6/14.
     * Desc: 用于线程切换
     */
    
    public abstract  class Switcher {
        public abstract Worker createWorker();
    
        public static abstract class Worker implements Calling{
            public abstract Calling switches(Action0 action0);
        }
    }
    
    Worker
    1. 真正执行线程变换的类
    2. 通过switches方法执行变换
    NewThreadSwitcher
    1. 切换到新线程的Switcher
    2. 实现createWorker方法
    /**
     * Created by Xionghu on 2018/6/14.
     * Desc: 新线程的switcher
     */
    
    public class NewThreadSwitcher extends Switcher {
        @Override
        public Worker createWorker() {
            return new NewThreadWorker();
        }
    }
    
    NewThreadWorker
    1. 有一个只有一个线程的线程池
    2. 实现切换线程的switches方法
    3. 将真正的操作用Runnable包裹丢入线程池执行
    /**
     * Created by Xionghu on 2018/6/14.
     * Desc:新线程的工作类
     */
    
    public class NewThreadWorker extends Switcher.Worker {
    
        //newScheduledThreadPool :创建一个大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。
        private final ExecutorService mExecutor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
            @Override
            public Thread newThread(@NonNull Runnable runnable) {
                return new Thread(runnable, " NewThreadWorker");
            }
        });
        private volatile boolean mIsUnCalled;
    
        @Override
        public void unCall() {
            mIsUnCalled = true;
        }
    
        @Override
        public boolean isUnCalled() {
            return mIsUnCalled;
        }
    
        @Override
        public Calling switches(Action0 action0) {
            SwitcherAction switcherAction = new SwitcherAction(action0);
            mExecutor.submit(switcherAction);
            return switcherAction;
        }
    
        private static class SwitcherAction implements Runnable, Calling {
            private final Action0 action0;
            private volatile boolean mIsUnCalled;
    
            public SwitcherAction(Action0 action0) {
                this.action0 = action0;
            }
    
            @Override
            public void unCall() {
                mIsUnCalled = true;
            }
    
            @Override
            public boolean isUnCalled() {
                return mIsUnCalled;
            }
    
            @Override
            public void run() {
                action0.call();
            }
        }
    }
    
    LooperSwitcher
     Android中切换到某个线程的Looper中
    /**
     * Created by Xionghu on 2018/6/14.
     * Desc: 用于Android中Looper的Switcher
     */
    
    public class LooperSwitcher extends Switcher {
    
        private Handler mHandler;
    
        public LooperSwitcher(Looper looper) {
            mHandler = new Handler(looper);
        }
    
        @Override
        public Worker createWorker() {
            return new HandlerWorker(mHandler);
        }
    }
    
    HandlerWorker

    将具体操作发送到指定的Looper中执行

    import android.os.Handler;
    import android.os.Message;
    
    /**
     * Created by Xionghu on 2018/6/14.
     * Desc: 用于Android 的Worker
     */
    
    public class HandlerWorker extends Switcher.Worker {
        private final Handler mHandler;
    
        private volatile boolean mIsUnCalled;
    
        public HandlerWorker(Handler mHandler) {
            this.mHandler = mHandler;
        }
    
        @Override
        public void unCall() {
            mIsUnCalled = true;
            mHandler.removeCallbacksAndMessages(this);
        }
    
        @Override
        public boolean isUnCalled() {
            return mIsUnCalled;
        }
    
        @Override
        public Calling switches(Action0 action0) {
            SwitcherAction switcherAction = new SwitcherAction(action0, mHandler);
            Message message = Message.obtain(mHandler, switcherAction);
            message.obj = this;
            mHandler.sendMessage(message);
            return switcherAction;
        }
    
        private static class SwitcherAction implements Runnable, Calling {
            private final Action0 action0;
            private final Handler handler;
            private volatile boolean mIsUnCalled;
    
            public SwitcherAction(Action0 action0, Handler handler) {
                this.action0 = action0;
                this.handler = handler;
            }
    
            @Override
            public void unCall() {
                mIsUnCalled = true;
                handler.removeCallbacks(this);
            }
    
            @Override
            public boolean isUnCalled() {
                return mIsUnCalled;
            }
    
            @Override
            public void run() {
                action0.call();
            }
        }
    }
    
    

    6.RxJava2 Scheduler线程变换仿写

    Switcher线程切换者
    1. 用于线程切换
    2. 有一个createWorker方法
    3. 本身就包含一个switches方法(与RxJava1有区别)
    /**
     * Created by Xionghu on 2018/6/14.
     * Desc: 用于线程切换的抽象类
     */
    
    public abstract class Switcher {
    
        public abstract Worker createWorker();
    
        public Release switches(final Runnable runnable) {
            Worker worker = createWorker();
            worker.switches(new Runnable() {
                @Override
                public void run() {
                    runnable.run();
                }
            });
            return worker;
        }
    
        public static abstract class Worker implements Release {
            public abstract Release switches(Runnable runnable);
        }
    }
    
    
    Worker
    1. 真正执行线程变换的类
    2. 通过switches方法执行变换
    NewThreadSwitcher
    1. 切换到新线程的Switcher
    2. 实现createWorker方法
    /**
     * Created by Xionghu on 2018/6/14.
     * Desc: 新线程的switcher
     */
    
    public class NewThreadSwitcher extends Switcher {
        @Override
        public Worker createWorker() {
            return new NewThreadWorker();
        }
    }
    
    NewThreadWorker
    1. 有一个只有一个线程的线程池
    2. 实现切换线程的switches方法
    3. 将真正的操作用Runnable包裹丢入线程池执行
    import android.support.annotation.NonNull;
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ThreadFactory;
    
    /**
     * Created by Xionghu on 2018/6/14.
     * Desc:新线程的工作类
     */
    
    public class NewThreadWorker extends Switcher.Worker {
        private final ExecutorService mExecutor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
            @Override
            public Thread newThread(@NonNull Runnable runnable) {
                return new Thread(runnable, "NewThreadWorker");
            }
        });
        private volatile boolean mIsReleased;
    
        @Override
        public boolean isReleased() {
            return mIsReleased;
        }
    
        @Override
        public void release() {
            mIsReleased = true;
        }
    
        @Override
        public Release switches(Runnable runnable) {
            SwitcherAction switcherAction = new SwitcherAction(runnable);
            mExecutor.submit((Callable<Object>) switcherAction);
            return switcherAction;
        }
    
        private static class SwitcherAction implements Runnable, Callable<Object>, Release {
    
            private final Runnable mRunnable;
    
            private volatile boolean mIsReleased;
    
            public SwitcherAction(Runnable mRunnable) {
                this.mRunnable = mRunnable;
            }
    
            @Override
            public boolean isReleased() {
                return mIsReleased;
            }
    
            @Override
            public void release() {
                mIsReleased = true;
            }
    
            @Override
            public void run() {
                mRunnable.run();
            }
    
            @Override
            public Object call() throws Exception {
                run();
                return null;
            }
        }
    }
    
    LooperSwitcher

    Android 中切换到某个线程的Looper中

    HandlerWorker

    将具体操作发送到指定的Looper中执行

    7.RxJava1 subscribeOn 原理分析

    1. 通过OnSubscribe来做原理
    2. 利用Scheduler将发出动作放到线程中执行
        public final Observable<T> subscribeOn(Scheduler scheduler, boolean requestOn) {
            if (this instanceof ScalarSynchronousObservable) {
                return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
            }
            return unsafeCreate(new OperatorSubscribeOn<T>(this, scheduler, requestOn));
        }
    
    public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {
    
        final Scheduler scheduler;
        final Observable<T> source;
        final boolean requestOn;
    
        public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler, boolean requestOn) {
            this.scheduler = scheduler;
            this.source = source;
            this.requestOn = requestOn;
        }
    
        @Override
        public void call(final Subscriber<? super T> subscriber) {
            final Worker inner = scheduler.createWorker();
    
            SubscribeOnSubscriber<T> parent = new SubscribeOnSubscriber<T>(subscriber, requestOn, inner, source);
            subscriber.add(parent);
            subscriber.add(inner);
    
            inner.schedule(parent);
        }
    
        static final class SubscribeOnSubscriber<T> extends Subscriber<T> implements Action0 {
    
            final Subscriber<? super T> actual;
    
            final boolean requestOn;
    
            final Worker worker;
    
            Observable<T> source;
    
            Thread t;
    
            SubscribeOnSubscriber(Subscriber<? super T> actual, boolean requestOn, Worker worker, Observable<T> source) {
                this.actual = actual;
                this.requestOn = requestOn;
                this.worker = worker;
                this.source = source;
            }
    
            @Override
            public void onNext(T t) {
                actual.onNext(t);
            }
    
            @Override
            public void onError(Throwable e) {
                try {
                    actual.onError(e);
                } finally {
                    worker.unsubscribe();
                }
            }
    
            @Override
            public void onCompleted() {
                try {
                    actual.onCompleted();
                } finally {
                    worker.unsubscribe();
                }
            }
    
            @Override
            public void call() {
                Observable<T> src = source;
                source = null;
                t = Thread.currentThread();
                src.unsafeSubscribe(this);
            }
    
            @Override
            public void setProducer(final Producer p) {
                actual.setProducer(new Producer() {
                    @Override
                    public void request(final long n) {
                        if (t == Thread.currentThread() || !requestOn) {
                            p.request(n);
                        } else {
                            worker.schedule(new Action0() {
                                @Override
                                public void call() {
                                    p.request(n);
                                }
                            });
                        }
                    }
                });
            }
        }
    

    利用了代理机制

    8.RxJava2 subscribeOn 原理分析

    8.1.RxJava2(无背压) subscribeOn
        @CheckReturnValue
        @SchedulerSupport(SchedulerSupport.CUSTOM)
        public final Observable<T> subscribeOn(Scheduler scheduler) {
            ObjectHelper.requireNonNull(scheduler, "scheduler is null");
            return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
        }
    
    public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
        final Scheduler scheduler;
    
        public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
            super(source);
            this.scheduler = scheduler;
        }
    
        @Override
        public void subscribeActual(final Observer<? super T> s) {
            final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
    
            s.onSubscribe(parent);
    
            parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
        }
    
        static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
    
            private static final long serialVersionUID = 8094547886072529208L;
            final Observer<? super T> actual;
    
            final AtomicReference<Disposable> s;
    
            SubscribeOnObserver(Observer<? super T> actual) {
                this.actual = actual;
                this.s = new AtomicReference<Disposable>();
            }
    
            @Override
            public void onSubscribe(Disposable s) {
                DisposableHelper.setOnce(this.s, s);
            }
    
            @Override
            public void onNext(T t) {
                actual.onNext(t);
            }
    
            @Override
            public void onError(Throwable t) {
                actual.onError(t);
            }
    
            @Override
            public void onComplete() {
                actual.onComplete();
            }
    
            @Override
            public void dispose() {
                DisposableHelper.dispose(s);
                DisposableHelper.dispose(this);
            }
    
            @Override
            public boolean isDisposed() {
                return DisposableHelper.isDisposed(get());
            }
    
            void setDisposable(Disposable d) {
                DisposableHelper.setOnce(this, d);
            }
        }
    
        final class SubscribeTask implements Runnable {
            private final SubscribeOnObserver<T> parent;
    
            SubscribeTask(SubscribeOnObserver<T> parent) {
                this.parent = parent;
            }
    
            @Override
            public void run() {
                source.subscribe(parent);
            }
        }
    }
    
    1. 继承AbstractObservableWithUpstream
    2. 实现subscribeActual方法
    3. 利用Scheduler将发送动作放到线程中执行
    8.2.RxJava2(有背压) subscribeOn
      @CheckReturnValue
        @BackpressureSupport(BackpressureKind.PASS_THROUGH)
        @SchedulerSupport(SchedulerSupport.CUSTOM)
        @Experimental
        public final Flowable<T> subscribeOn(@NonNull Scheduler scheduler, boolean requestOn) {
            ObjectHelper.requireNonNull(scheduler, "scheduler is null");
            return RxJavaPlugins.onAssembly(new FlowableSubscribeOn<T>(this, scheduler, requestOn));
        }
    
    
    public final class FlowableSubscribeOn<T> extends AbstractFlowableWithUpstream<T , T> {
    
        final Scheduler scheduler;
    
        final boolean nonScheduledRequests;
    
        public FlowableSubscribeOn(Flowable<T> source, Scheduler scheduler, boolean nonScheduledRequests) {
            super(source);
            this.scheduler = scheduler;
            this.nonScheduledRequests = nonScheduledRequests;
        }
    
        @Override
        public void subscribeActual(final Subscriber<? super T> s) {
            Scheduler.Worker w = scheduler.createWorker();
            final SubscribeOnSubscriber<T> sos = new SubscribeOnSubscriber<T>(s, w, source, nonScheduledRequests);
            s.onSubscribe(sos);
    
            w.schedule(sos);
        }
    
        static final class SubscribeOnSubscriber<T> extends AtomicReference<Thread>
        implements FlowableSubscriber<T>, Subscription, Runnable {
    
            private static final long serialVersionUID = 8094547886072529208L;
    
            final Subscriber<? super T> actual;
    
            final Scheduler.Worker worker;
    
            final AtomicReference<Subscription> s;
    
            final AtomicLong requested;
    
            final boolean nonScheduledRequests;
    
            Publisher<T> source;
    
            SubscribeOnSubscriber(Subscriber<? super T> actual, Scheduler.Worker worker, Publisher<T> source, boolean requestOn) {
                this.actual = actual;
                this.worker = worker;
                this.source = source;
                this.s = new AtomicReference<Subscription>();
                this.requested = new AtomicLong();
                this.nonScheduledRequests = !requestOn;
            }
    
            @Override
            public void run() {
                lazySet(Thread.currentThread());
                Publisher<T> src = source;
                source = null;
                src.subscribe(this);
            }
    
            @Override
            public void onSubscribe(Subscription s) {
                if (SubscriptionHelper.setOnce(this.s, s)) {
                    long r = requested.getAndSet(0L);
                    if (r != 0L) {
                        requestUpstream(r, s);
                    }
                }
            }
    
            @Override
            public void onNext(T t) {
                actual.onNext(t);
            }
    
            @Override
            public void onError(Throwable t) {
                actual.onError(t);
                worker.dispose();
            }
    
            @Override
            public void onComplete() {
                actual.onComplete();
                worker.dispose();
            }
    
            @Override
            public void request(final long n) {
                if (SubscriptionHelper.validate(n)) {
                    Subscription s = this.s.get();
                    if (s != null) {
                        requestUpstream(n, s);
                    } else {
                        BackpressureHelper.add(requested, n);
                        s = this.s.get();
                        if (s != null) {
                            long r = requested.getAndSet(0L);
                            if (r != 0L) {
                                requestUpstream(r, s);
                            }
                        }
                    }
                }
            }
    
            void requestUpstream(final long n, final Subscription s) {
                if (nonScheduledRequests || Thread.currentThread() == get()) {
                    s.request(n);
                } else {
                    worker.schedule(new Request(s, n));
                }
            }
    
            @Override
            public void cancel() {
                SubscriptionHelper.cancel(s);
                worker.dispose();
            }
    
            static final class Request implements Runnable {
                private final Subscription s;
                private final long n;
    
                Request(Subscription s, long n) {
                    this.s = s;
                    this.n = n;
                }
    
                @Override
                public void run() {
                    s.request(n);
                }
            }
        }
    }
    
    
    1. 继承AbstractFlowableWithUpstream
    2. 实现subscribeActual方法
    3. 利用Scheduler将发出动作放到线程中执行

    9. RxJava1 subscribeOn仿写

        public final Caller<T> callOn(Switcher switcher) {
            return create(new OperatorCallOn<>(switcher, this));
        }
    
    /**
     * Created by Xionghu on 2018/6/14.
     * Desc: 用于callOn的OnCall
     */
    
    public class OperatorCallOn<T> implements Caller.OnCall<T> {
    
        private final Switcher switcher;
        private final Caller<T> tCaller;
    
        public OperatorCallOn(Switcher switcher, Caller<T> tCaller) {
            this.switcher = switcher;
            this.tCaller = tCaller;
        }
    
        @Override
        public void call(final Receiver<T> tReceiver) {
            Switcher.Worker worker = switcher.createWorker();
            worker.switches(new Action0() {
                @Override
                public void call() {
                    Receiver<T> tReceiver1 = new Receiver<T>() {
                        @Override
                        public void onCompleted() {
                            tReceiver.onCompleted();
                        }
    
                        @Override
                        public void onError(Throwable t) {
                            tReceiver.onError(t);
                        }
    
                        @Override
                        public void onReceive(T t) {
                            tReceiver.onReceive(t);
                        }
                    };
                    tCaller.call(tReceiver1);
                }
            });
        }
    }
    
    
    用于callOn的OnCall
    1. 持有原Caller和Switcher
    2. 创建新的Receiver包裹旧的丢入线程中
    运行
    import android.os.Bundle;
    import android.support.v7.app.AppCompatActivity;
    import android.util.Log;
    
    import com.haocai.mylibrary.rxJava1.Caller;
    import com.haocai.mylibrary.rxJava1.NewThreadSwitcher;
    import com.haocai.mylibrary.rxJava1.Receiver;
    import com.haocai.rxjavademo.R;
    
    import butterknife.ButterKnife;
    import butterknife.OnClick;
    
    /**
     * Created by Xionghu on 2018/6/11.
     * Desc: .RxJava1  subscribeOn仿写
     */
    
    public class Lesson3_2Activity extends AppCompatActivity {
        public static final String TAG = "kpioneer";
    
        @Override
        protected void onCreate(final Bundle savedInstanceState) {
            super.onCreate(savedInstanceState);
            setContentView(R.layout.activity_custom_test);
            ButterKnife.bind(this);
    
        }
    
        @OnClick(R.id.testDo)
        public void onViewClicked() {
            Caller.
                    create(new Caller.OnCall<String>() {
                        @Override
                        public void call(Receiver<String> stringReceiver) {
                            if (!stringReceiver.isUnCalled()) {
                                stringReceiver.onReceive("test");
                                stringReceiver.onCompleted();
                            }
                        }
                    }).
                    callOn(new NewThreadSwitcher()).
                    call(new Receiver<String>() {
                        @Override
                        public void onCompleted() {
    
                        }
    
                        @Override
                        public void onError(Throwable t) {
    
                        }
    
                        @Override
                        public void onReceive(String o) {
                            Log.d(TAG, "onReceive:" + o);
                            Log.d(TAG, "currentThread:" + Thread.currentThread());
                        }
                    });
        }
    }
    
    06-15 14:19:02.219 17153-17366/com.haocai.rxjavademo D/kpioneer: onReceive:test
    06-15 14:19:02.219 17153-17366/com.haocai.rxjavademo D/kpioneer: currentThread:Thread[ NewThreadWorker,5,main]
    

    10. RxJava2 subscribeOn仿写

    10.1RxJava2(无背压)
       public Caller<T> callOn(Switcher switcher) {
            return new CallerCallOn<>(this, switcher);
        }
    
    /**
     * Created by Xionghu on 2018/6/15.
     * Desc: 用于callOn
     */
    
    public class CallerCallOn<T> extends CallerWithUpstream<T, T> {
        private Switcher mSwitcher;
    
        public CallerCallOn(Caller<T> source, Switcher mSwitcher) {
            super(source);
            this.mSwitcher = mSwitcher;
        }
    
        @Override
        protected void callActual(Callee<T> callee) {
            final CallOnCallee<T> tCallOnCallee = new CallOnCallee<>(callee);
            callee.onCall(tCallOnCallee);
            mSwitcher.switches(new Runnable() {
                @Override
                public void run() {
                    source.call(tCallOnCallee);
                }
            });
        }
    
        private static final class CallOnCallee<T> implements Callee<T>, Release {
    
            private final Callee<T> callee;
    
            public CallOnCallee(Callee<T> callee) {
                this.callee = callee;
            }
    
            @Override
            public void onCall(Release release) {
    
            }
    
            @Override
            public void onReceive(T t) {
                callee.onReceive(t);
            }
    
            @Override
            public void onCompleted() {
                callee.onCompleted();
            }
    
            @Override
            public void onError(Throwable t) {
                callee.onError(t);
            }
    
            @Override
            public boolean isReleased() {
                return false;
            }
    
            @Override
            public void release() {
    
            }
        }
    }
    
    
    CallerCallOn
    1. 继承自CallerWithUpstream
    2. 持有原Caller和Switcher
    3. 创建新的Callee包裹旧的丢入线程中
    10.2RxJava2(有背压)
        public Telephoner<T> callOn(Switcher switcher) {
            return new TelephonerCallOn<>(this, switcher);
        }
    
    import com.haocai.mylibrary.rxJava2.Switcher;
    import java.util.concurrent.atomic.AtomicLong;
    
    /**
     * Created by Xionghu on 2018/6/15.
     * Desc:用于callOn
     */
    
    public class TelephonerCallOn<T> extends TelephonerWithUpstream<T, T> {
    
        private final Switcher mSwitcher;
    
        public TelephonerCallOn(Telephoner<T> source, Switcher switcher) {
            super(source);
            mSwitcher = switcher;
        }
    
        @Override
        protected void callActual(Receiver<T> receiver) {
            final CallOnReceiver<T> tCallOnReceiver = new CallOnReceiver<>(receiver);
            receiver.onCall(tCallOnReceiver);
            mSwitcher.switches(new Runnable() {
                @Override
                public void run() {
                    source.call(tCallOnReceiver);
                }
            });
    
        }
    
        private static final class CallOnReceiver<T> extends AtomicLong implements Receiver<T>, Drop {
    
            private final Receiver<T> mReceiver;
    
            public CallOnReceiver(Receiver<T> receiver) {
                mReceiver = receiver;
            }
    
            @Override
            public void request(long n) {
                BackpressureHelper.add(this, n);
            }
    
            @Override
            public void drop() {
    
            }
    
            @Override
            public void onCall(Drop d) {
                mReceiver.onCall(d);
            }
    
            @Override
            public void onReceive(T t) {
                if (get() != 0) {
                    mReceiver.onReceive(t);
                    BackpressureHelper.produced(this, 1);
                }
            }
    
            @Override
            public void onError(Throwable t) {
                mReceiver.onError(t);
            }
    
            @Override
            public void onCompleted() {
                mReceiver.onCompleted();
            }
        }
    }
    
    TelephonerCallOn
    1. 继承自TelephonerWithUpstream
    2. 持有原Telephoner和Switcher
    3. 创建新的Receiver包裹旧的丢入线程中
    10.3 运行
    /**
     * Created by Xionghu on 2018/6/11.
     * Desc: .RxJava2  subscribeOn仿写
     */
    
    public class Lesson3_3Activity extends AppCompatActivity {
        public static final String TAG = "kpioneer";
    
        @Override
        protected void onCreate(final Bundle savedInstanceState) {
            super.onCreate(savedInstanceState);
            setContentView(R.layout.activity_custom_test);
            ButterKnife.bind(this);
    
        }
    
        @OnClick(R.id.testDo)
        public void onViewClicked() {
             /*---------无背压---------*/
            Caller.
                    create(new CallerOnCall<String>() {
                        @Override
                        public void call(CallerEmitter<String> callerEmitter) {
                            callerEmitter.onReceive("test");
                            callerEmitter.onCompleted();
                        }
                    }).
                    callOn(new NewThreadSwitcher()).
                    call(new Callee<String>() {
                        @Override
                        public void onCall(Release release) {
    
                        }
    
                        @Override
                        public void onReceive(String string) {
                            Log.d(TAG, "无背压:onReceive:" + string);
                            Log.d(TAG, "无背压:currentThread:" + Thread.currentThread());
                        }
    
                        @Override
                        public void onCompleted() {
    
                        }
    
                        @Override
                        public void onError(Throwable t) {
    
                        }
                    });
    
            /*---------有背压---------*/
            Telephoner.
                    create(new TelephonerOnCall<String>() {
                        @Override
                        public void call(TelephonerEmitter<String> telephonerEmitter) {
                            telephonerEmitter.onReceive("test");
                            telephonerEmitter.onCompleted();
                        }
                    }).
                    callOn(new NewThreadSwitcher()).
                    call(new Receiver<String>() {
                        @Override
                        public void onCall(Drop d) {
                            d.request(Long.MAX_VALUE);
                        }
    
                        @Override
                        public void onReceive(String s) {
                            Log.d(TAG, "有背压:onReceive:" + s);
                            Log.d(TAG, "有背压:currentThread:" + Thread.currentThread());
                        }
    
                        @Override
                        public void onError(Throwable t) {
    
                        }
    
                        @Override
                        public void onCompleted() {
    
                        }
                    });
        }
    }
    
    
    06-15 16:13:27.002 813-1150/com.haocai.rxjavademo D/kpioneer: 无背压:onReceive:test
    06-15 16:13:27.003 813-1150/com.haocai.rxjavademo D/kpioneer: 无背压:currentThread:Thread[NewThreadWorker,5,main]
    06-15 16:13:27.011 813-1151/com.haocai.rxjavademo D/kpioneer: 有背压:onReceive:test
    06-15 16:13:27.011 813-1151/com.haocai.rxjavademo D/kpioneer: 有背压:currentThread:Thread[NewThreadWorker,5,main]
    

    11 RxJava1 observeOn原理分析

        public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
            if (this instanceof ScalarSynchronousObservable) {
                return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
            }
            return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
        }
    
    
    public final class OperatorObserveOn<T> implements Operator<T, T> {
    
        private final Scheduler scheduler;
        private final boolean delayError;
        private final int bufferSize;
    
        /**
         * @param scheduler the scheduler to use
         * @param delayError delay errors until all normal events are emitted in the other thread?
         */
        public OperatorObserveOn(Scheduler scheduler, boolean delayError) {
            this(scheduler, delayError, RxRingBuffer.SIZE);
        }
    
        /**
         * @param scheduler the scheduler to use
         * @param delayError delay errors until all normal events are emitted in the other thread?
         * @param bufferSize for the buffer feeding the Scheduler workers, defaults to {@code RxRingBuffer.MAX} if <= 0
         */
        public OperatorObserveOn(Scheduler scheduler, boolean delayError, int bufferSize) {
            this.scheduler = scheduler;
            this.delayError = delayError;
            this.bufferSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE;
        }
    
        @Override
        public Subscriber<? super T> call(Subscriber<? super T> child) {
            if (scheduler instanceof ImmediateScheduler) {
                // avoid overhead, execute directly
                return child;
            } else if (scheduler instanceof TrampolineScheduler) {
                // avoid overhead, execute directly
                return child;
            } else {
                ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
                parent.init();
                return parent;
            }
        }
    
        public static <T> Operator<T, T> rebatch(final int n) {
            return new Operator<T, T>() {
                @Override
                public Subscriber<? super T> call(Subscriber<? super T> child) {
                    ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(Schedulers.immediate(), child, false, n);
                    parent.init();
                    return parent;
                }
            };
        }
    
        /** Observe through individual queue per observer. */
        static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 {
            final Subscriber<? super T> child;
            final Scheduler.Worker recursiveScheduler;
            final boolean delayError;
            final Queue<Object> queue;
            /** The emission threshold that should trigger a replenishing request. */
            final int limit;
    
            // the status of the current stream
            volatile boolean finished;
    
            final AtomicLong requested = new AtomicLong();
    
            final AtomicLong counter = new AtomicLong();
    
            /**
             * The single exception if not null, should be written before setting finished (release) and read after
             * reading finished (acquire).
             */
            Throwable error;
    
            /** Remembers how many elements have been emitted before the requests run out. */
            long emitted;
    
            // do NOT pass the Subscriber through to couple the subscription chain ... unsubscribing on the parent should
            // not prevent anything downstream from consuming, which will happen if the Subscription is chained
            public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError, int bufferSize) {
                this.child = child;
                this.recursiveScheduler = scheduler.createWorker();
                this.delayError = delayError;
                int calculatedSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE;
                // this formula calculates the 75% of the bufferSize, rounded up to the next integer
                this.limit = calculatedSize - (calculatedSize >> 2);
                if (UnsafeAccess.isUnsafeAvailable()) {
                    queue = new SpscArrayQueue<Object>(calculatedSize);
                } else {
                    queue = new SpscAtomicArrayQueue<Object>(calculatedSize);
                }
                // signal that this is an async operator capable of receiving this many
                request(calculatedSize);
            }
    
            void init() {
                // don't want this code in the constructor because `this` can escape through the
                // setProducer call
                Subscriber<? super T> localChild = child;
    
                localChild.setProducer(new Producer() {
    
                    @Override
                    public void request(long n) {
                        if (n > 0L) {
                            BackpressureUtils.getAndAddRequest(requested, n);
                            schedule();
                        }
                    }
    
                });
                localChild.add(recursiveScheduler);
                localChild.add(this);
            }
    
            @Override
            public void onNext(final T t) {
                if (isUnsubscribed() || finished) {
                    return;
                }
                if (!queue.offer(NotificationLite.next(t))) {
                    onError(new MissingBackpressureException());
                    return;
                }
                schedule();
            }
    
            @Override
            public void onCompleted() {
                if (isUnsubscribed() || finished) {
                    return;
                }
                finished = true;
                schedule();
            }
    
            @Override
            public void onError(final Throwable e) {
                if (isUnsubscribed() || finished) {
                    RxJavaHooks.onError(e);
                    return;
                }
                error = e;
                finished = true;
                schedule();
            }
    
            protected void schedule() {
                if (counter.getAndIncrement() == 0) {
                    recursiveScheduler.schedule(this);
                }
            }
    
            // only execute this from schedule()
            @Override
            public void call() {
                long missed = 1L;
                long currentEmission = emitted;
    
                // these are accessed in a tight loop around atomics so
                // loading them into local variables avoids the mandatory re-reading
                // of the constant fields
                final Queue<Object> q = this.queue;
                final Subscriber<? super T> localChild = this.child;
    
                // requested and counter are not included to avoid JIT issues with register spilling
                // and their access is is amortized because they are part of the outer loop which runs
                // less frequently (usually after each bufferSize elements)
    
                for (;;) {
                    long requestAmount = requested.get();
    
                    while (requestAmount != currentEmission) {
                        boolean done = finished;
                        Object v = q.poll();
                        boolean empty = v == null;
    
                        if (checkTerminated(done, empty, localChild, q)) {
                            return;
                        }
    
                        if (empty) {
                            break;
                        }
    
                        localChild.onNext(NotificationLite.<T>getValue(v));
    
                        currentEmission++;
                        if (currentEmission == limit) {
                            requestAmount = BackpressureUtils.produced(requested, currentEmission);
                            request(currentEmission);
                            currentEmission = 0L;
                        }
                    }
    
                    if (requestAmount == currentEmission) {
                        if (checkTerminated(finished, q.isEmpty(), localChild, q)) {
                            return;
                        }
                    }
    
                    emitted = currentEmission;
                    missed = counter.addAndGet(-missed);
                    if (missed == 0L) {
                        break;
                    }
                }
            }
    
            boolean checkTerminated(boolean done, boolean isEmpty, Subscriber<? super T> a, Queue<Object> q) {
                if (a.isUnsubscribed()) {
                    q.clear();
                    return true;
                }
    
                if (done) {
                    if (delayError) {
                        if (isEmpty) {
                            Throwable e = error;
                            try {
                                if (e != null) {
                                    a.onError(e);
                                } else {
                                    a.onCompleted();
                                }
                            } finally {
                                recursiveScheduler.unsubscribe();
                            }
                        }
                    } else {
                        Throwable e = error;
                        if (e != null) {
                            q.clear();
                            try {
                                a.onError(e);
                            } finally {
                                recursiveScheduler.unsubscribe();
                            }
                            return true;
                        } else
                        if (isEmpty) {
                            try {
                                a.onCompleted();
                            } finally {
                                recursiveScheduler.unsubscribe();
                            }
                            return true;
                        }
                    }
    
                }
    
                return false;
            }
        }
    }
    
    用于observeOn的Operator
    1. 是observeOn的Operator
    2. 通过lift去变换这个Operator
    3. 在Operator中返回一个用于observeOn的Subscriber
    用于observeOn的Subscriber

    在调用到onNext等方法时丢到线程中去执行

    12 RxJava2 observeOn原理分析

    12.1 RxJava2(无背压) observeOn原理分析
        public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
            ObjectHelper.requireNonNull(scheduler, "scheduler is null");
            ObjectHelper.verifyPositive(bufferSize, "bufferSize");
            return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
        }
    
    public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
        final Scheduler scheduler;
        final boolean delayError;
        final int bufferSize;
        public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
            super(source);
            this.scheduler = scheduler;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }
    
        @Override
        protected void subscribeActual(Observer<? super T> observer) {
            if (scheduler instanceof TrampolineScheduler) {
                source.subscribe(observer);
            } else {
                Scheduler.Worker w = scheduler.createWorker();
    
                source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
            }
        }
    
        static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
        implements Observer<T>, Runnable {
    
            private static final long serialVersionUID = 6576896619930983584L;
            final Observer<? super T> actual;
            final Scheduler.Worker worker;
            final boolean delayError;
            final int bufferSize;
    
            SimpleQueue<T> queue;
    
            Disposable s;
    
            Throwable error;
            volatile boolean done;
    
            volatile boolean cancelled;
    
            int sourceMode;
    
            boolean outputFused;
    
            ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
                this.actual = actual;
                this.worker = worker;
                this.delayError = delayError;
                this.bufferSize = bufferSize;
            }
    
            @Override
            public void onSubscribe(Disposable s) {
                if (DisposableHelper.validate(this.s, s)) {
                    this.s = s;
                    if (s instanceof QueueDisposable) {
                        @SuppressWarnings("unchecked")
                        QueueDisposable<T> qd = (QueueDisposable<T>) s;
    
                        int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
    
                        if (m == QueueDisposable.SYNC) {
                            sourceMode = m;
                            queue = qd;
                            done = true;
                            actual.onSubscribe(this);
                            schedule();
                            return;
                        }
                        if (m == QueueDisposable.ASYNC) {
                            sourceMode = m;
                            queue = qd;
                            actual.onSubscribe(this);
                            return;
                        }
                    }
    
                    queue = new SpscLinkedArrayQueue<T>(bufferSize);
    
                    actual.onSubscribe(this);
                }
            }
    
            @Override
            public void onNext(T t) {
                if (done) {
                    return;
                }
    
                if (sourceMode != QueueDisposable.ASYNC) {
                    queue.offer(t);
                }
                schedule();
            }
    
            @Override
            public void onError(Throwable t) {
                if (done) {
                    RxJavaPlugins.onError(t);
                    return;
                }
                error = t;
                done = true;
                schedule();
            }
    
            @Override
            public void onComplete() {
                if (done) {
                    return;
                }
                done = true;
                schedule();
            }
    
            @Override
            public void dispose() {
                if (!cancelled) {
                    cancelled = true;
                    s.dispose();
                    worker.dispose();
                    if (getAndIncrement() == 0) {
                        queue.clear();
                    }
                }
            }
    
            @Override
            public boolean isDisposed() {
                return cancelled;
            }
    
            void schedule() {
                if (getAndIncrement() == 0) {
                    worker.schedule(this);
                }
            }
    
            void drainNormal() {
                int missed = 1;
    
                final SimpleQueue<T> q = queue;
                final Observer<? super T> a = actual;
    
                for (;;) {
                    if (checkTerminated(done, q.isEmpty(), a)) {
                        return;
                    }
    
                    for (;;) {
                        boolean d = done;
                        T v;
    
                        try {
                            v = q.poll();
                        } catch (Throwable ex) {
                            Exceptions.throwIfFatal(ex);
                            s.dispose();
                            q.clear();
                            a.onError(ex);
                            worker.dispose();
                            return;
                        }
                        boolean empty = v == null;
    
                        if (checkTerminated(d, empty, a)) {
                            return;
                        }
    
                        if (empty) {
                            break;
                        }
    
                        a.onNext(v);
                    }
    
                    missed = addAndGet(-missed);
                    if (missed == 0) {
                        break;
                    }
                }
            }
    
            void drainFused() {
                int missed = 1;
    
                for (;;) {
                    if (cancelled) {
                        return;
                    }
    
                    boolean d = done;
                    Throwable ex = error;
    
                    if (!delayError && d && ex != null) {
                        actual.onError(error);
                        worker.dispose();
                        return;
                    }
    
                    actual.onNext(null);
    
                    if (d) {
                        ex = error;
                        if (ex != null) {
                            actual.onError(ex);
                        } else {
                            actual.onComplete();
                        }
                        worker.dispose();
                        return;
                    }
    
                    missed = addAndGet(-missed);
                    if (missed == 0) {
                        break;
                    }
                }
            }
    
            @Override
            public void run() {
                if (outputFused) {
                    drainFused();
                } else {
                    drainNormal();
                }
            }
    
            boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
                if (cancelled) {
                    queue.clear();
                    return true;
                }
                if (d) {
                    Throwable e = error;
                    if (delayError) {
                        if (empty) {
                            if (e != null) {
                                a.onError(e);
                            } else {
                                a.onComplete();
                            }
                            worker.dispose();
                            return true;
                        }
                    } else {
                        if (e != null) {
                            queue.clear();
                            a.onError(e);
                            worker.dispose();
                            return true;
                        } else
                        if (empty) {
                            a.onComplete();
                            worker.dispose();
                            return true;
                        }
                    }
                }
                return false;
            }
    
            @Override
            public int requestFusion(int mode) {
                if ((mode & ASYNC) != 0) {
                    outputFused = true;
                    return ASYNC;
                }
                return NONE;
            }
    
            @Nullable
            @Override
            public T poll() throws Exception {
                return queue.poll();
            }
    
            @Override
            public void clear() {
                queue.clear();
            }
    
            @Override
            public boolean isEmpty() {
                return queue.isEmpty();
            }
        }
    }
    
    
    1. 继承自AbstractObservableWithUpstream
    2. 利用subscribeActual方法
    3. 创建一个新的Observer包裹旧的
    4. 在调用到onNext等方法时丢到线程中去执行
    12.2 RxJava2(有背压) observeOn原理分析
        @CheckReturnValue
        @BackpressureSupport(BackpressureKind.FULL)
        @SchedulerSupport(SchedulerSupport.CUSTOM)
        public final Flowable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
            ObjectHelper.requireNonNull(scheduler, "scheduler is null");
            ObjectHelper.verifyPositive(bufferSize, "bufferSize");
            return RxJavaPlugins.onAssembly(new FlowableObserveOn<T>(this, scheduler, delayError, bufferSize));
        }
    
    public final class FlowableObserveOn<T> extends AbstractFlowableWithUpstream<T, T> {
    final Scheduler scheduler;
    
        final boolean delayError;
    
        final int prefetch;
    
        public FlowableObserveOn(
                Flowable<T> source,
                Scheduler scheduler,
                boolean delayError,
                int prefetch) {
            super(source);
            this.scheduler = scheduler;
            this.delayError = delayError;
            this.prefetch = prefetch;
        }
    
        @Override
        public void subscribeActual(Subscriber<? super T> s) {
            Worker worker = scheduler.createWorker();
    
            if (s instanceof ConditionalSubscriber) {
                source.subscribe(new ObserveOnConditionalSubscriber<T>(
                        (ConditionalSubscriber<? super T>) s, worker, delayError, prefetch));
            } else {
                source.subscribe(new ObserveOnSubscriber<T>(s, worker, delayError, prefetch));
            }
        }
    
        abstract static class BaseObserveOnSubscriber<T>
        extends BasicIntQueueSubscription<T>
        implements FlowableSubscriber<T>, Runnable {
            private static final long serialVersionUID = -8241002408341274697L;
    
            final Worker worker;
    
            final boolean delayError;
    
            final int prefetch;
    
            final int limit;
    
            final AtomicLong requested;
    
            Subscription s;
    
            SimpleQueue<T> queue;
    
            volatile boolean cancelled;
    
            volatile boolean done;
    
            Throwable error;
    
            int sourceMode;
    
            long produced;
    
            boolean outputFused;
    
            BaseObserveOnSubscriber(
                    Worker worker,
                    boolean delayError,
                    int prefetch) {
                this.worker = worker;
                this.delayError = delayError;
                this.prefetch = prefetch;
                this.requested = new AtomicLong();
                this.limit = prefetch - (prefetch >> 2);
            }
    
            @Override
            public final void onNext(T t) {
                if (done) {
                    return;
                }
                if (sourceMode == ASYNC) {
                    trySchedule();
                    return;
                }
                if (!queue.offer(t)) {
                    s.cancel();
    
                    error = new MissingBackpressureException("Queue is full?!");
                    done = true;
                }
                trySchedule();
            }
    
            @Override
            public final void onError(Throwable t) {
                if (done) {
                    RxJavaPlugins.onError(t);
                    return;
                }
                error = t;
                done = true;
                trySchedule();
            }
    
            @Override
            public final void onComplete() {
                if (!done) {
                    done = true;
                    trySchedule();
                }
            }
    
            @Override
            public final void request(long n) {
                if (SubscriptionHelper.validate(n)) {
                    BackpressureHelper.add(requested, n);
                    trySchedule();
                }
            }
    
            @Override
            public final void cancel() {
                if (cancelled) {
                    return;
                }
    
                cancelled = true;
                s.cancel();
                worker.dispose();
    
                if (getAndIncrement() == 0) {
                    queue.clear();
                }
            }
    
            final void trySchedule() {
                if (getAndIncrement() != 0) {
                    return;
                }
                worker.schedule(this);
            }
    
            @Override
            public final void run() {
                if (outputFused) {
                    runBackfused();
                } else if (sourceMode == SYNC) {
                    runSync();
                } else {
                    runAsync();
                }
            }
    
            abstract void runBackfused();
    
            abstract void runSync();
    
            abstract void runAsync();
    
            final boolean checkTerminated(boolean d, boolean empty, Subscriber<?> a) {
                if (cancelled) {
                    clear();
                    return true;
                }
                if (d) {
                    if (delayError) {
                        if (empty) {
                            Throwable e = error;
                            if (e != null) {
                                a.onError(e);
                            } else {
                                a.onComplete();
                            }
                            worker.dispose();
                            return true;
                        }
                    } else {
                        Throwable e = error;
                        if (e != null) {
                            clear();
                            a.onError(e);
                            worker.dispose();
                            return true;
                        } else
                        if (empty) {
                            a.onComplete();
                            worker.dispose();
                            return true;
                        }
                    }
                }
    
                return false;
            }
    
            @Override
            public final int requestFusion(int requestedMode) {
                if ((requestedMode & ASYNC) != 0) {
                    outputFused = true;
                    return ASYNC;
                }
                return NONE;
            }
    
            @Override
            public final void clear() {
                queue.clear();
            }
    
            @Override
            public final boolean isEmpty() {
                return queue.isEmpty();
            }
        }
    
        static final class ObserveOnSubscriber<T> extends BaseObserveOnSubscriber<T>
        implements FlowableSubscriber<T> {
    
            private static final long serialVersionUID = -4547113800637756442L;
    
            final Subscriber<? super T> actual;
    
            ObserveOnSubscriber(
                    Subscriber<? super T> actual,
                    Worker worker,
                    boolean delayError,
                    int prefetch) {
                super(worker, delayError, prefetch);
                this.actual = actual;
            }
    
            @Override
            public void onSubscribe(Subscription s) {
                if (SubscriptionHelper.validate(this.s, s)) {
                    this.s = s;
    
                    if (s instanceof QueueSubscription) {
                        @SuppressWarnings("unchecked")
                        QueueSubscription<T> f = (QueueSubscription<T>) s;
    
                        int m = f.requestFusion(ANY | BOUNDARY);
    
                        if (m == SYNC) {
                            sourceMode = SYNC;
                            queue = f;
                            done = true;
    
                            actual.onSubscribe(this);
                            return;
                        } else
                        if (m == ASYNC) {
                            sourceMode = ASYNC;
                            queue = f;
    
                            actual.onSubscribe(this);
    
                            s.request(prefetch);
    
                            return;
                        }
                    }
    
                    queue = new SpscArrayQueue<T>(prefetch);
    
                    actual.onSubscribe(this);
    
                    s.request(prefetch);
                }
            }
    
            @Override
            void runSync() {
                int missed = 1;
    
                final Subscriber<? super T> a = actual;
                final SimpleQueue<T> q = queue;
    
                long e = produced;
    
                for (;;) {
    
                    long r = requested.get();
    
                    while (e != r) {
                        T v;
    
                        try {
                            v = q.poll();
                        } catch (Throwable ex) {
                            Exceptions.throwIfFatal(ex);
                            s.cancel();
                            a.onError(ex);
                            worker.dispose();
                            return;
                        }
    
                        if (cancelled) {
                            return;
                        }
                        if (v == null) {
                            a.onComplete();
                            worker.dispose();
                            return;
                        }
    
                        a.onNext(v);
    
                        e++;
                    }
    
                    if (cancelled) {
                        return;
                    }
    
                    if (q.isEmpty()) {
                        a.onComplete();
                        worker.dispose();
                        return;
                    }
    
                    int w = get();
                    if (missed == w) {
                        produced = e;
                        missed = addAndGet(-missed);
                        if (missed == 0) {
                            break;
                        }
                    } else {
                        missed = w;
                    }
                }
            }
    
            @Override
            void runAsync() {
                int missed = 1;
    
                final Subscriber<? super T> a = actual;
                final SimpleQueue<T> q = queue;
    
                long e = produced;
    
                for (;;) {
    
                    long r = requested.get();
    
                    while (e != r) {
                        boolean d = done;
                        T v;
    
                        try {
                            v = q.poll();
                        } catch (Throwable ex) {
                            Exceptions.throwIfFatal(ex);
    
                            s.cancel();
                            q.clear();
    
                            a.onError(ex);
                            worker.dispose();
                            return;
                        }
    
                        boolean empty = v == null;
    
                        if (checkTerminated(d, empty, a)) {
                            return;
                        }
    
                        if (empty) {
                            break;
                        }
    
                        a.onNext(v);
    
                        e++;
                        if (e == limit) {
                            if (r != Long.MAX_VALUE) {
                                r = requested.addAndGet(-e);
                            }
                            s.request(e);
                            e = 0L;
                        }
                    }
    
                    if (e == r && checkTerminated(done, q.isEmpty(), a)) {
                        return;
                    }
    
                    int w = get();
                    if (missed == w) {
                        produced = e;
                        missed = addAndGet(-missed);
                        if (missed == 0) {
                            break;
                        }
                    } else {
                        missed = w;
                    }
                }
            }
    
            @Override
            void runBackfused() {
                int missed = 1;
    
                for (;;) {
    
                    if (cancelled) {
                        return;
                    }
    
                    boolean d = done;
    
                    actual.onNext(null);
    
                    if (d) {
                        Throwable e = error;
                        if (e != null) {
                            actual.onError(e);
                        } else {
                            actual.onComplete();
                        }
                        worker.dispose();
                        return;
                    }
    
                    missed = addAndGet(-missed);
                    if (missed == 0) {
                        break;
                    }
                }
            }
    
            @Nullable
            @Override
            public T poll() throws Exception {
                T v = queue.poll();
                if (v != null && sourceMode != SYNC) {
                    long p = produced + 1;
                    if (p == limit) {
                        produced = 0;
                        s.request(p);
                    } else {
                        produced = p;
                    }
                }
                return v;
            }
    
        }
    
        static final class ObserveOnConditionalSubscriber<T>
        extends BaseObserveOnSubscriber<T> {
    
            private static final long serialVersionUID = 644624475404284533L;
    
            final ConditionalSubscriber<? super T> actual;
    
            long consumed;
    
            ObserveOnConditionalSubscriber(
                    ConditionalSubscriber<? super T> actual,
                    Worker worker,
                    boolean delayError,
                    int prefetch) {
                super(worker, delayError, prefetch);
                this.actual = actual;
            }
    
            @Override
            public void onSubscribe(Subscription s) {
                if (SubscriptionHelper.validate(this.s, s)) {
                    this.s = s;
    
                    if (s instanceof QueueSubscription) {
                        @SuppressWarnings("unchecked")
                        QueueSubscription<T> f = (QueueSubscription<T>) s;
    
                        int m = f.requestFusion(ANY | BOUNDARY);
    
                        if (m == SYNC) {
                            sourceMode = SYNC;
                            queue = f;
                            done = true;
    
                            actual.onSubscribe(this);
                            return;
                        } else
                        if (m == ASYNC) {
                            sourceMode = ASYNC;
                            queue = f;
    
                            actual.onSubscribe(this);
    
                            s.request(prefetch);
    
                            return;
                        }
                    }
    
                    queue = new SpscArrayQueue<T>(prefetch);
    
                    actual.onSubscribe(this);
    
                    s.request(prefetch);
                }
            }
    
            @Override
            void runSync() {
                int missed = 1;
    
                final ConditionalSubscriber<? super T> a = actual;
                final SimpleQueue<T> q = queue;
    
                long e = produced;
    
                for (;;) {
    
                    long r = requested.get();
    
                    while (e != r) {
                        T v;
                        try {
                            v = q.poll();
                        } catch (Throwable ex) {
                            Exceptions.throwIfFatal(ex);
                            s.cancel();
                            a.onError(ex);
                            worker.dispose();
                            return;
                        }
    
                        if (cancelled) {
                            return;
                        }
                        if (v == null) {
                            a.onComplete();
                            worker.dispose();
                            return;
                        }
    
                        if (a.tryOnNext(v)) {
                            e++;
                        }
                    }
    
                    if (cancelled) {
                        return;
                    }
    
                    if (q.isEmpty()) {
                        a.onComplete();
                        worker.dispose();
                        return;
                    }
    
                    int w = get();
                    if (missed == w) {
                        produced = e;
                        missed = addAndGet(-missed);
                        if (missed == 0) {
                            break;
                        }
                    } else {
                        missed = w;
                    }
                }
            }
    
            @Override
            void runAsync() {
                int missed = 1;
    
                final ConditionalSubscriber<? super T> a = actual;
                final SimpleQueue<T> q = queue;
    
                long emitted = produced;
                long polled = consumed;
    
                for (;;) {
    
                    long r = requested.get();
    
                    while (emitted != r) {
                        boolean d = done;
                        T v;
                        try {
                            v = q.poll();
                        } catch (Throwable ex) {
                            Exceptions.throwIfFatal(ex);
    
                            s.cancel();
                            q.clear();
    
                            a.onError(ex);
                            worker.dispose();
                            return;
                        }
                        boolean empty = v == null;
    
                        if (checkTerminated(d, empty, a)) {
                            return;
                        }
    
                        if (empty) {
                            break;
                        }
    
                        if (a.tryOnNext(v)) {
                            emitted++;
                        }
    
                        polled++;
    
                        if (polled == limit) {
                            s.request(polled);
                            polled = 0L;
                        }
                    }
    
                    if (emitted == r && checkTerminated(done, q.isEmpty(), a)) {
                        return;
                    }
    
                    int w = get();
                    if (missed == w) {
                        produced = emitted;
                        consumed = polled;
                        missed = addAndGet(-missed);
                        if (missed == 0) {
                            break;
                        }
                    } else {
                        missed = w;
                    }
                }
    
            }
    
            @Override
            void runBackfused() {
                int missed = 1;
    
                for (;;) {
    
                    if (cancelled) {
                        return;
                    }
    
                    boolean d = done;
    
                    actual.onNext(null);
    
                    if (d) {
                        Throwable e = error;
                        if (e != null) {
                            actual.onError(e);
                        } else {
                            actual.onComplete();
                        }
                        worker.dispose();
                        return;
                    }
    
                    missed = addAndGet(-missed);
                    if (missed == 0) {
                        break;
                    }
                }
            }
    
            @Nullable
            @Override
            public T poll() throws Exception {
                T v = queue.poll();
                if (v != null && sourceMode != SYNC) {
                    long p = consumed + 1;
                    if (p == limit) {
                        consumed = 0;
                        s.request(p);
                    } else {
                        consumed = p;
                    }
                }
                return v;
            }
        }
    }
    
    1. 继承自AbstractFlowableWithUpstream
    2. 利用subscribeActual方法
    3. 创建一个新的Subscriber包裹旧的.
    4. 在调用到onNext等方法时丢到线程中去执行

    13 RxJava1 observeOn仿写

    /**
     * Created by Xionghu on 2018/6/14.
     * Desc: 用于CallbackOn
     */
    
    public class OperatorCallbackOn<T> implements Caller.Operator<T, T> {
    
        private final Switcher switcher;
    
        public OperatorCallbackOn(Switcher switcher) {
            this.switcher = switcher;
        }
    
        @Override
        public Receiver<T> call(final Receiver<T> tReceiver) {
    
            return new CallbackOnReceiver<>(tReceiver, switcher);
        }
    
        private static final class CallbackOnReceiver<T> extends Receiver<T> implements Action0 {
    
            private final Receiver<T> tReceiver;
            private final Switcher.Worker worker;
            private final Queue<T> tQueue = new LinkedList<>();
    
            public CallbackOnReceiver(Receiver<T> tReceiver, Switcher switcher) {
                this.tReceiver = tReceiver;
                this.worker = switcher.createWorker();
            }
    
            @Override
            public void call() {
                T t = tQueue.poll(); //移除元素,如果队列为空,则返回null
                tReceiver.onReceive(t);
            }
    
            @Override
            public void onCompleted() {
            }
    
            @Override
            public void onError(Throwable t) {
            }
    
            @Override
            public void onReceive(T t) {
                tQueue.offer(t);// offer:添加一个元素并返回true        如果队列已满,则返回false
                switches();
            }
    
            private void switches() {
                worker.switches(this);
            }
        }
    }
    
    OperatorCallbackOn
    1. 持有Switcher
    2. call方法中返回用于callbackOn的Receiver
    CallbackOnReceiver
    1. 持有原Caller和Switcher
    2. 在onReceive等方法中做调度
    3. 调度后用原Receiver再调用onReceive
    实例
    /**
     * Created by Xionghu on 2018/6/11.
     * Desc: .RxJava1  observeOn
     */
    
    public class Lesson3_4Activity extends AppCompatActivity {
        public static final String TAG = "kpioneer";
    
        @Override
        protected void onCreate(final Bundle savedInstanceState) {
            super.onCreate(savedInstanceState);
            setContentView(R.layout.activity_custom_test);
            ButterKnife.bind(this);
    
        }
    
        @OnClick(R.id.testDo)
        public void onViewClicked() {
            Caller.
                    create(new Caller.OnCall<String>() {
                        @Override
                        public void call(Receiver<String> stringReceiver) {
                            stringReceiver.onReceive("test");
                            Log.d(TAG, "currentThread:" + Thread.currentThread());
                            stringReceiver.onCompleted();
                        }
                    }).
                    callOn(new NewThreadSwitcher()).
                    callbackOn(new LooperSwitcher(getMainLooper())).
                    call(new Receiver<String>() {
                        @Override
                        public void onCompleted() {
    
                        }
    
                        @Override
                        public void onError(Throwable t) {
    
                        }
    
                        @Override
                        public void onReceive(String s) {
                            Log.d(TAG, "onReceive:" + s);
                            Log.d(TAG, "currentThread:" + Thread.currentThread());
                        }
                    });
        }
    }
    
    06-19 10:05:36.245 11194-11346/com.haocai.rxjavademo D/kpioneer: currentThread:Thread[ NewThreadWorker,5,main]
    06-19 10:05:36.265 11194-11194/com.haocai.rxjavademo D/kpioneer: onReceive:test
    06-19 10:05:36.265 11194-11194/com.haocai.rxjavademo D/kpioneer: currentThread:Thread[main,5,main]
    

    14 RxJava2 observeOn仿写

    14.1 RxJava2(无背压) observeOn
        public Caller<T> callbackOn(Switcher switcher) {
            return new CallerCallbackOn<>(this, switcher);
        }
    
    /**
     * Created by Xionghu on 2018/6/19.
     * Desc: 用于callbackon
     */
    
    public class CallerCallbackOn<T> extends CallerWithUpstream<T, T> {
    
        private final Switcher mSwitcher;
    
        public CallerCallbackOn(Caller<T> source, Switcher mSwitcher) {
            super(source);
            this.mSwitcher = mSwitcher;
        }
    
        @Override
        protected void callActual(Callee<T> callee) {
            source.call(new CallbackOnCallee<T>(callee, mSwitcher));
        }
    
        private static final class CallbackOnCallee<T> implements Callee<T>, Runnable {
            private final Callee<T> mCallee;
            private final Switcher.Worker worker;
            private final Queue<T> tQueue = new LinkedList<>();
    
            public CallbackOnCallee(Callee<T> mCallee, Switcher switcher) {
                this.mCallee = mCallee;
                this.worker = switcher.createWorker();
            }
    
            @Override
            public void onCall(Release release) {
    
            }
    
            @Override
            public void onReceive(T t) {
                tQueue.offer(t);
                switches();
            }
    
    
            @Override
            public void onCompleted() {
    
            }
    
            @Override
            public void onError(Throwable t) {
    
            }
    
            @Override
            public void run() {
                T t = tQueue.poll();
                mCallee.onReceive(t);
            }
    
            private void switches() {
                worker.switches(this);
            }
        }
    }
    
    
    CallerCallbackOn
    1. 持有Switcher
    2. 实现callActual方法
    3. 用原有Caller去call一个用于线程切换的Callee
    CallbackOnCallee
    1. 持有原Callee和Switcher
    2. 在onReceive等方法中做调度
    3. 调度后用原Callee在调用onReceive
    14.2 RxJava2(有背压) observeOn
        public Telephoner<T> callbackOn(Switcher switcher){
         return new TelephonerCallbackOn<>(this,switcher);
        }
    
    /**
     * Created by Xionghu on 2018/6/19.
     * Desc:用于callbackon
     */
    
    public class TelephonerCallbackOn<T> extends TelephonerWithUpstream<T, T> {
    
        private final Switcher mSwitcher;
    
        public TelephonerCallbackOn(Telephoner<T> source, Switcher mSwitcher) {
            super(source);
            this.mSwitcher = mSwitcher;
        }
    
        @Override
        protected void callActual(Receiver<T> receiver) {
            source.call(new CallbackOnReceiver<>(receiver, mSwitcher));
        }
    
        private static final class CallbackOnReceiver<T> implements Receiver<T>, Runnable {
            private final Receiver<T> tReceiver;
            private final Switcher.Worker worker;
            private final Queue<T> tQueue = new LinkedList<>();
    
            public CallbackOnReceiver(Receiver<T> tReceiver, Switcher switcher) {
                this.tReceiver = tReceiver;
                this.worker = switcher.createWorker();
            }
    
            @Override
            public void onCall(Drop d) {
                tReceiver.onCall(d);
            }
    
            @Override
            public void onReceive(T t) {
                tQueue.offer(t);
                switches();
            }
    
    
            @Override
            public void onError(Throwable t) {
    
            }
    
            @Override
            public void onCompleted() {
    
            }
    
            @Override
            public void run() {
                T t = tQueue.poll();
                tReceiver.onReceive(t);
    
            }
    
            private void switches() {
                worker.switches(this);
            }
        }
    }
    
    TelephonerCallbackOn
    1. 持有Switcher
    2. 实现callActual方法
    3. 用原Telephoner去call一个用于线程切换的Receiver
    CallbackOnReceiver
    1. 持有原Telephoner和Switcher
    2. 在onReceive等方法中做调度
    3. 调度后用原Receiver再调用onReceive
    实例
    
    /**
     * Created by Xionghu on 2018/6/11.
     * Desc: .RxJava2  observeOn仿写
     */
    
    public class Lesson3_5Activity extends AppCompatActivity {
        public static final String TAG = "kpioneer";
    
        @Override
        protected void onCreate(final Bundle savedInstanceState) {
            super.onCreate(savedInstanceState);
            setContentView(R.layout.activity_custom_test);
            ButterKnife.bind(this);
    
        }
    
        @OnClick(R.id.testDo)
        public void onViewClicked() {
               /*---------无背压---------*/
            Caller.
                    create(new CallerOnCall<String>() {
                        @Override
                        public void call(CallerEmitter<String> callerEmitter) {
                            callerEmitter.onReceive("test");
                            Log.d(TAG, "无背压 currentThread:" + Thread.currentThread());
                            callerEmitter.onCompleted();
                        }
                    }).
                    callOn(new NewThreadSwitcher()).
                    callbackOn(new LooperSwitcher(getMainLooper())).
                    call(new Callee<String>() {
                        @Override
                        public void onCall(Release release) {
    
                        }
    
                        @Override
                        public void onReceive(String s) {
                            Log.d(TAG, "无背压 onReceive:" + s);
                            Log.d(TAG, "无背压 currentThread:" + Thread.currentThread());
                        }
    
                        @Override
                        public void onCompleted() {
    
                        }
    
                        @Override
                        public void onError(Throwable t) {
    
                        }
                    });
    
                       /*---------有背压---------*/
            Telephoner.
                    create(new TelephonerOnCall<String>() {
                        @Override
                        public void call(TelephonerEmitter<String> telephonerEmitter) {
                            telephonerEmitter.onReceive("test");
                            Log.d(TAG, "有背压 currentThread:" + Thread.currentThread());
                        }
                    }).
                    callOn(new NewThreadSwitcher()).
                    callbackOn(new LooperSwitcher(getMainLooper())).
                    call(new Receiver<String>() {
                        @Override
                        public void onCall(Drop d) {
                            d.request(Long.MAX_VALUE);
                        }
    
                        @Override
                        public void onReceive(String s) {
                            Log.d(TAG, "有背压 onReceive:" + s);
                            Log.d(TAG, "有背压 currentThread:" + Thread.currentThread());
                        }
    
                        @Override
                        public void onError(Throwable t) {
    
                        }
    
                        @Override
                        public void onCompleted() {
    
                        }
                    });
        }
    }
    
    
    06-19 13:19:29.185 25626-25771/com.haocai.rxjavademo D/kpioneer: 无背压 currentThread:Thread[NewThreadWorker,5,main]
    06-19 13:19:29.195 25626-25626/com.haocai.rxjavademo D/kpioneer: 无背压 onReceive:test
    06-19 13:19:29.195 25626-25626/com.haocai.rxjavademo D/kpioneer: 无背压 currentThread:Thread[main,5,main]
    06-19 13:19:29.205 25626-25772/com.haocai.rxjavademo D/kpioneer: 有背压 currentThread:Thread[NewThreadWorker,5,main]
    06-19 13:19:29.205 25626-25626/com.haocai.rxjavademo D/kpioneer: 有背压 onReceive:test
    06-19 13:19:29.205 25626-25626/com.haocai.rxjavademo D/kpioneer: 有背压 currentThread:Thread[main,5,main]
    

    源码下载

    https://github.com/kpioneer123/RxJavaLearning

    相关文章

      网友评论

        本文标题:Android 架构师之路21 响应式编程RxJava 线程变换

        本文链接:https://www.haomeiwen.com/subject/dmfreftx.html