美文网首页
RxJava->如何进行线程管理

RxJava->如何进行线程管理

作者: 冉桓彬 | 来源:发表于2018-01-27 21:56 被阅读60次
    这篇笔记主要解决一下几个问题:
    1. Schedulers.newThread()做了哪些事;
    2. Disposable.dispose()如何关掉当前任务;
    3. 多线程通信如何用链式实现;
    
    private void rxJavaStartTask() {
        Observable
                .create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {...}
                })
                .subscribeOn(Schedulers.newThread())
                .doOnNext(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {...}
                })
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {...}
    
                    @Override
                    public void onNext(Integer value) {...}
    
                    @Override
                    public void onError(Throwable e) {...}
    
                    @Override
                    public void onComplete() {...}
                });
        }
    
    private void rxJavaCancelTask() {
        mDisposable.dispose();
    }
    

      这篇笔记主要围绕上面两段代码进行分析;

    Schedulers.newThread():
    public final class Schedulers {
        public static Scheduler newThread() {
            return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD);
        }
        static final Scheduler NEW_THREAD;
        static {
            NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new Callable<Scheduler>() {
                @Override
                public Scheduler call() throws Exception {
                    return NewThreadHolder.DEFAULT;
                }
            });
        }
        static final class NewThreadHolder {
            static final Scheduler DEFAULT = NewThreadScheduler.instance();
        }
    }
    
    public final class NewThreadScheduler extends Scheduler {
        public static NewThreadScheduler instance() {
            return INSTANCE;
        }
        private static final NewThreadScheduler INSTANCE = new NewThreadScheduler();
        @Override
        public Worker createWorker() {
            return new NewThreadWorker(THREAD_FACTORY);
        }
    }
    

      1、Schedulers.newThread()返回NewThreadScheduler, NewThreadScheduler实际是一个单例类;
      2、NewThreadScheduler内部有一个createWorker()的方法, 待后面用到再做分析;

    Observable.subscribeOn() :
    public abstract class Observable<T> implements ObservableSource<T> {
        @SchedulerSupport(SchedulerSupport.CUSTOM)
        public final Observable<T> subscribeOn(Scheduler scheduler) {
            return new ObservableSubscribeOn<T>(this, scheduler);
        }
    }
    

      1、调用.subscribeOn(Schedulers.newThread())之后说明之后的操作是在子线程中执行, 并且返回ObservableSubscribeOn对象的引用;
      2、调用ObservableSubscribeOn构造函数时传入的this实际指向调用ObservableCreate;

    Observable.doOnNext() :
    public abstract class Observable<T> implements ObservableSource<T> {
        @SchedulerSupport(SchedulerSupport.NONE)
        public final Observable<T> doOnNext(Consumer<? super T> onNext) {
            return doOnEach(onNext, Functions.emptyConsumer(), Functions.EMPTY_ACTION, Functions.EMPTY_ACTION);
        }
        @SchedulerSupport(SchedulerSupport.NONE)
        private Observable<T> doOnEach(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Action onAfterTerminate) {
            return new ObservableDoOnEach<T>(this, onNext, onError, onComplete, onAfterTerminate);
        }
    }
    
    public final class ObservableDoOnEach<T> extends AbstractObservableWithUpstream<T, T> {
        final Consumer<? super T> onNext;
        final Consumer<? super Throwable> onError;
        final Action onComplete;
        final Action onAfterTerminate;
    
        public ObservableDoOnEach(ObservableSource<T> source, Consumer<? super T> onNext,
                                  Consumer<? super Throwable> onError,
                                  Action onComplete,
                                  Action onAfterTerminate) {
            super(source);
            this.onNext = onNext;
            this.onError = onError;
            this.onComplete = onComplete;
            this.onAfterTerminate = onAfterTerminate;
        }
    }
    
    abstract class AbstractObservableWithUpstream<T, U> extends Observable<U>
    
    

      调用Obeservable.doOnNext()返回ObservableDoOnEach对象, ObservableDoOnEach对象持有Observe的引用, Observe在上文Observable.subscribeOn()时指向了ObservableSubscribeOn;

    Observable.observeOn(AndroidSchedulers.mainThread()):
    public final class AndroidSchedulers {
    
        private static final class MainHolder {
    
            static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
        }
    
        private static final Scheduler MAIN_THREAD = new Callable<Scheduler>() {
                                                         @Override public Scheduler call() throws Exception {
                                                             return MainHolder.DEFAULT;
                                                         }
                                                     };
    
        public static Scheduler mainThread() {
            return MAIN_THREAD;
        }
        public static Scheduler from(Looper looper) {
            return new HandlerScheduler(new Handler(looper));
        }
    
        private AndroidSchedulers() {
            throw new AssertionError("No instances.");
        }
    }
    

      MAIN_THREAD实际是一个常量, 也就说不管调用Observable.observeOn(AndroidSchedulers.mainThread())多少次, 都只会创建一个HandlerScheduler, 该HandlerScheduler持有一个Handler;

    Observable.observeOn(...):
    public abstract class Observable<T> implements ObservableSource<T> {
        @SchedulerSupport(SchedulerSupport.CUSTOM)
        public final Observable<T> observeOn(Scheduler scheduler) {
            return observeOn(scheduler, false, bufferSize());
        }
        @SchedulerSupport(SchedulerSupport.CUSTOM)
        public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
            return new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize);
        }
    }
    public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
        final Scheduler scheduler;
        public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler...) {
            /**
             * source实际指向ObservableDoOnEach;
             */
            super(source);
            this.scheduler = scheduler;
        }
    }
    
    Observable.subscribe(...) :
    public abstract class Observable<T> implements ObservableSource<T> {
        @SchedulerSupport(SchedulerSupport.NONE)
        @Override
        public final void subscribe(Observer<? super T> observer) {
            subscribeActual(observer);
        }
        protected abstract void subscribeActual(Observer<? super T> observer);
    }
    

      1、subscribeActual(...)被ObservableObserveOn_observeOn实现;
      2、传入Observer指向Observer_subscribe;

    ObservableObserveOn_observeOn.subscribeActual(...) :
    public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
        @Override
        protected void subscribeActual(Observer<? super T> observer) {
            /**
             * 1. 由前边observeOn(AndroidSchedulers.mainThread())指定线程知道createWorker()被Scheduler的子类HandlerScheduler实现;
             *    所以Worker实际指向HandlerWorker;
             * 2. source实际指向doOnNext(...)返回的ObservableDoOnEach, 后续为了方便, 
             *    在ObservableObserveOn后缀加上方法名;
             */
            Scheduler.Worker w = scheduler.createWorker();
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
    
    final class HandlerScheduler extends Scheduler {
        private final Handler handler;
    
        HandlerScheduler(Handler handler) {
            this.handler = handler;
        }
    
        @Override
        public Worker createWorker() {
            return new HandlerWorker(handler);
        }
    }
    

      1. source实际指向ObservableDoOnEach_doOnNext;
      2. 调用ObservableDoOnEach_doOnNext.subscribeActual(...)传入ObserveOnObserver_observeOn;

    ObservableDoOnEach_doOnNext.subscribeActual(...) :
    public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
        @Override
        public void subscribeActual(Observer<? super T> t) {
            /**
             * 往上推source指向.subscribeOn(Schedulers.newThread())指定线程时返回的ObservableSubscribeOn;
             */
            source.subscribe(new DoOnEachObserver<T>(t, onNext, onError, onComplete, onAfterTerminate));
        }
    }
    

      1. source实际指向ObservableSubscribeOn_subscribeOn;
      2. 调用ObservableSubscribeOn_subscribeOn.subscribeActual(...)传入DoOnEachObserver_doOnNext;

    ObservableSubscribeOn_subscribeOn.subscribeActual(...) :
    public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
        @Override
        public void subscribeActual(final Observer<? super T> s) {
            /**
             * 1. 创建SubscribeOnObserver_subscribeOn对象;
             * 2. DoOnEachObserver_doOnNext中传入SubscribeOnObserver_subscribeOn对象的引用;
             */
            final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
            
            s.onSubscribe(parent);
    
            parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
                @Override
                public void run() {
                    source.subscribe(parent);
                }
            }));
        }
    }
    /**
     * 1. SubscribeOnObserver.actual指向DoOnEachObserver_doOnNext;
     * 2. DoOnEachObserver_doOnNext持有SubscribeOnObserver_subscribeOn的引用;
     */
    static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
    
        final Observer<? super T> actual;
    
        final AtomicReference<Disposable> s;
    
        SubscribeOnObserver(Observer<? super T> actual) {
            this.actual = actual;
            this.s = new AtomicReference<Disposable>();
        }
    
        @Override
        public void onSubscribe(Disposable s) {
            DisposableHelper.setOnce(this.s, s);
        }
    
        @Override
        public void onNext(T t) {
            actual.onNext(t);
        }
    
        @Override
        public void onComplete() {
            actual.onComplete();
        }
    
        @Override
        public void dispose() {
            DisposableHelper.dispose(s);
            DisposableHelper.dispose(this);
        }
    
        void setDisposable(Disposable d) {
            DisposableHelper.setOnce(this, d);
        }
    }
    
    DoOnEachObserver_doOnNext.onSubscribe() :
    static final class DoOnEachObserver<T> implements Observer<T>, Disposable {
        final Observer<? super T> actual;
        final Consumer<? super T> onNext;
        Disposable s;
        /**
         * this.actual指向ObserveOnObserver_observeOn(...)
         */
        DoOnEachObserver(Observer<? super T> actual, Consumer<? super T> onNext...) {
            this.actual = actual;
            this.onNext = onNext;
            ...
        }
    
        @Override
        public void onSubscribe(Disposable s) {
            if (DisposableHelper.validate(this.s, s)) {
                this.s = s;
                actual.onSubscribe(this);
            }
        }
    }
    

      调用ObserveOnObserver_observeOn中的onSubscribe方法并传入DoOnEachObserver_doOnNext(...);

    ObserveOnObserver_observeOn.onSubscribe(...) :
    static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
        implements Observer<T>, Runnable {
        final Observer<? super T> actual;
        final Scheduler.Worker worker;
        ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker...) {
            this.actual = actual;
            this.worker = worker;
            ...
        }
        @Override
        public void onSubscribe(Disposable s) {
            if (DisposableHelper.validate(this.s, s)) {
                this.s = s;
                if (s instanceof QueueDisposable) {...}
                queue = new SpscLinkedArrayQueue<T>(bufferSize);
                actual.onSubscribe(this);
            }
        }
    }
    

      actual实际指向Observer_subscribe(...) , 即下图中的onSubscribe(...)方法;

    image.png
      同时也说明了onSubscribe(...){...}在主线程中执行;
    继续回到ObservableSubscribeOn_subscribeOn.subscribeActual(...)中:
    public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
        @Override
        public void subscribeActual(final Observer<? super T> s) {
            /**
             * 1. 创建SubscribeOnObserver_subscribeOn对象;
             * 2. DoOnEachObserver_doOnNext中传入SubscribeOnObserver_subscribeOn对象的引用;
             */
            final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
            /**
             * 以递归的方式在主线程中调用onSubscribe(...)提供Disposable
             */
            s.onSubscribe(parent);
            /**
             * 重点来了, 如何操作线程池, 如何实现线程间通信, 如何实现链式调用;
             */
            parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
                @Override
                public void run() {
                    source.subscribe(parent);
                }
            }));
        }
    }
    
    NewThreadScheduler.scheduleDirect(...) :
    public final class NewThreadScheduler extends Scheduler {
        ...
    }
    
    public abstract class Scheduler {
        public Disposable scheduleDirect(Runnable run) {
            return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
        }
    
        public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
            /**
             * createWorker()被Scheduler子类NewThreadScheduler实现;
             */
            final Worker w = createWorker();
            final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
            w.schedule(new Runnable() {
                @Override
                public void run() {
                    try {
                        decoratedRun.run();
                    } finally {
                        w.dispose();
                    }
                }
            }, delay, unit);
    
            return w;
        }
    }
    
    public final class NewThreadScheduler extends Scheduler {
        @Override
        public Worker createWorker() {
            return new NewThreadWorker(THREAD_FACTORY);
        }
    }
    
    NewThreadWorker.schedule(...) :
    public class NewThreadWorker extends Scheduler.Worker implements Disposable {
        private final ScheduledExecutorService executor;
    
        public NewThreadWorker(ThreadFactory threadFactory) {
            executor = SchedulerPoolFactory.create(threadFactory);
        }
        @Override
        public Disposable schedule(final Runnable action, long delayTime, TimeUnit unit) {
            if (disposed) {
                return EmptyDisposable.INSTANCE;
            }
            return scheduleActual(action, delayTime, unit, null);
        }
    
        public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, TimeUnit unit, DisposableContainer parent) {
            Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
            ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
            if (parent != null) {...}
            Future<?> f;
            try {
                if (delayTime <= 0) {
                    f = executor.submit((Callable<Object>)sr);
                } else {...}
                sr.setFuture(f);
            } catch (RejectedExecutionException ex) {
                parent.remove(sr);
                RxJavaPlugins.onError(ex);
            }
            return sr;
        }
    }
    
    public final class SchedulerPoolFactory {
        public static ScheduledExecutorService create(ThreadFactory factory) {
            final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
            if (exec instanceof ScheduledThreadPoolExecutor) {
                ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) exec;
                POOLS.put(e, exec);
            }
            return exec;
        }
    }
    

      1、每次调用.subscribeOn(Schedulers.newThread())时, 都会根据单例类NewThreadScheduler与创建一个NewThreadWorker的实例, 而每一个Worker内部又维护了一个ScheduledExecutorService;
      2、创建的ScheduledExecutorService又会被缓存在SchedulerPoolFactory中的POOLS中, 目前有一个疑问, 缓存之后如何做到复用的? <TODO>

    ObservableSubscribeOn_subscribeOn.subscribeActual(...) :
    public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
        @Override
        public void subscribeActual(final Observer<? super T> s) {
            final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
    
            s.onSubscribe(parent);
    
            parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
                @Override
                public void run() {
                    /**
                     * source指向ObservableCreate.create(...)
                     */
                    source.subscribe(parent);
                }
            }));
        }
    }
    

      source指向ObservableCreate.create(...);

    ObservableCreate_create.subscribeActual(...) :
    public final class ObservableCreate<T> extends Observable<T> {
        @Override
        protected void subscribeActual(Observer<? super T> observer) {
            CreateEmitter<T> parent = new CreateEmitter<T>(observer);
            observer.onSubscribe(parent);
            /**
             * source指向Observable.create(new ObservableOnSubscribe<T>())时创建的传入的ObservableOnSubscribe;
             */
            source.subscribe(parent);
        }
    }
    

      1. observer指向SubscribeOnObserver_subscribeOn;
      2. CreateEmitter持有SubscribeOnObserver_subscribeOn的引用;
      3. observer.onSubscribe(CreateEmitter)持有CreateEmitter的引用;

    CreateEmitter.onNext(...) :
    static final class CreateEmitter<T>
        extends AtomicReference<Disposable>
        implements ObservableEmitter<T>, Disposable {
        @Override
        public void onNext(T t) {
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }
    }
    

      observer指向SubscribeOnObserver_subscribeOn, 继续向下分析:

    SubscribeOnObserver_subscribeOn.onNext(...) :
    static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
        final Observer<? super T> actual;
        SubscribeOnObserver(Observer<? super T> actual) {
            this.actual = actual;
        }
        @Override
        public void onNext(T t) {
            actual.onNext(t);
        }
    }
    

      actual指向DoOnEachObserver_doOnNext, 继续向下分析:

    DoOnEachObserver_doOnNext.onNext(...) :
    static final class DoOnEachObserver<T> implements Observer<T>, Disposable {
        DoOnEachObserver(Observer<? super T> actual, Consumer<? super T> onNext...) {
            this.actual = actual;
            this.onNext = onNext;
            ...
        }
        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }
            onNext.accept(t);
            actual.onNext(t);
        }
    }
    

      actual指向ObservableObserveOn_observeOn, 继续向下分析:

    ObservableObserveOn_observeOn.onNext(...) :
    static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
        implements Observer<T>, Runnable {
        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }
            if (sourceMode != QueueDisposable.ASYNC) {...}
            schedule();
        }
        void schedule() {
            if (getAndIncrement() == 0) {
                worker.schedule(this);
            }
        }
    }
    
    HandlerWorker.schedule(...) :
    private static final class HandlerWorker extends Worker {
        @Override
        public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
            if (disposed) {
                return Disposables.disposed();
            }
            run = RxJavaPlugins.onSchedule(run);
            ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
            Message message = Message.obtain(handler, scheduled);
            message.obj = this; // Used as token for batch disposal of this worker's runnables.
            handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
            if (disposed) {
                handler.removeCallbacks(scheduled);
                return Disposables.disposed();
            }
            return scheduled;
        }
    }
    

    相关文章

      网友评论

          本文标题:RxJava->如何进行线程管理

          本文链接:https://www.haomeiwen.com/subject/rxuoaxtx.html