Rxjava2 线程切换(3)

作者: PuHJ | 来源:发表于2019-04-01 19:55 被阅读0次

    一、简介

    前面说了Rxjava的用法和操作符原理,Rxjava的基于事件的响应式编程面向事件本身采用流式操作,让事件处理层次变得非常的清晰。但是如果没有线程变换,只是在单一的线程中操作,那就是花拳绣腿上不了台面。

    好在Rxjava2提供了四种线程切换模式,而且Rxjava2让线程切换变得异常的简单,只需要一行代码就可以实现。往往简单的这一行代码,内部的实现确实精妙的。所以这一章就来说说Rxjava2的线程变换原理

    先来总结下Rxjava的线程切换用法:

                    .observeOn(Schedulers.io())
                    .subscribeOn(AndroidSchedulers.mainThread())
    
    • subscribeOn是指定之前的事件流处理的线程
    • observeOn 是指定之后的事件流处理的线程

    并且提供了四种类型线程模式:

    • AndroidSchedulers.mainThread() : 主线程
    • Schedulers.io() : io操作的线程, 通常用于网络,读写文件等io密集型的操作
    • Schedulers.computation() : CPU计算密集型的操作
    • Schedulers.newThread() : 新建一个线程

    observeOn和subscribeOn方法都是Observable中的方法,他们也是在链式调用之内,所以这两个方法同其他的变换操作符一致,必定是接受旧的Observable,包装成一个新的Observable并返回。

    此外android中的子线程是采用Runnable接口,主线程是使用Handler(Looper.getMainLooper()) handler机制,所以我们可以肯定Rxjava的线程变换无非是对子线程和主线程Handler的二次封装。

    二、线程调度模块

    线程调度

    上图是以采用Handler调度的方式为例,并以此图来分析该模型。

    ①、Scheduler

    先看下Scheduler类,此类是数据调度的抽象类,该对象可获得一个Worker对象,真实的数据调度是在Worker中的。

    精简代码

    public abstract class Scheduler {
    
        // 每一个Scheduler 对应一个Worker用来实现真实的线程切换
        public abstract Worker createWorker();
    
        // 立即的执行Runnable
        public Disposable scheduleDirect(Runnable run) {
            return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
        }
    
        // 推迟delay事件调度数据
        public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
            // 第一步 得到子类的Worker
            final Worker w = createWorker();
            
            final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    
            // 使用worker真实的执行Runnable
            w.schedule(new Runnable() {
                @Override
                public void run() {
                    try {
                        decoratedRun.run();
                    } finally {
                        w.dispose();
                    }
                }
            }, delay, unit);
    
            return w;
        }
    }
    

    Scheduler中主要完成以下几件事:

    • 创建一个真实的线程切换的执行者Worker
    • 外界通过Scheduler对象执行scheduleDirect方法,来开始调度处理。
      从中可以看到Scheduler只是一个抽象调度,代理机制来调度。从代码中可以看到Scheduler很简单,是出于上层的抽象类,scheduleDirect方法中,就是创建一个Worker执行者,来执行传入的Runnable。并且scheduleDirect方法若不满足,还是可以在子类中根据自己的需求重写。
    ②、Worker执行者

    精简代码:

    public abstract static class Worker implements Disposable {
    
            public Disposable schedule(Runnable run) {
                return schedule(run, 0L, TimeUnit.NANOSECONDS);
            }
    
            public abstract Disposable schedule(Runnable run, long delay, TimeUnit unit);
    }
    

    从代码中可以看到,这两个方法和Scheduler中的方法很像,除了方法名不一样,其他的都相似。也就是Scheduler和Worker就是一个静态的代理。

    问题:为什么执行会返回Disposable了?
    因为在处理事件流过程中可能会取消事件流,返回的Disposable可以控制停止线程调度。

    ③、ScheduledRunnable被执行的对象

    ScheduledRunnable是线程切换的被执行单元,这里对Runnable进行了二次封装,增加了取消的功能。

       private static final class ScheduledRunnable implements Runnable, Disposable {
            private final Handler handler;
            private final Runnable delegate;
    
            private volatile boolean disposed;
    
            ScheduledRunnable(Handler handler, Runnable delegate) {
                this.handler = handler;
                this.delegate = delegate;
            }
    
            @Override
            public void run() {
                try {
                    delegate.run();
                } catch (Throwable t) {
                    IllegalStateException ie =
                        new IllegalStateException("Fatal Exception thrown on Scheduler.", t);
                    RxJavaPlugins.onError(ie);
                    Thread thread = Thread.currentThread();
                    thread.getUncaughtExceptionHandler().uncaughtException(thread, ie);
                }
            }
    
            @Override
            public void dispose() {
                disposed = true;
                handler.removeCallbacks(this);
            }
    
            @Override
            public boolean isDisposed() {
                return disposed;
            }
        }
    

    这里就不需要过多的解释,在run方法中直接执行传入的Runnable对象,因为调度是在Handler中,用Handler发送个Runnable对象,所以取消也是用Handler去remove。也就是说ScheduledRunnable中传入了Runnable和Handler两个对象。

    ④、HandlerWorker具体的Runnable执行者

    Worker是个抽象类,具体的实现还得由子类完成。这里看下具体的HandlerWorker。

    private static final class HandlerWorker extends Worker {
            private final Handler handler;
    
            private volatile boolean disposed;
    
            HandlerWorker(Handler handler) {
                this.handler = handler;
            }
    
            @Override
            public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
                if (run == null) throw new NullPointerException("run == null");
                if (unit == null) throw new NullPointerException("unit == null");
    
                if (disposed) {
                    return Disposables.disposed();
                }
    
                run = RxJavaPlugins.onSchedule(run);
    
                ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
    
                Message message = Message.obtain(handler, scheduled);
                message.obj = this; // Used as token for batch disposal of this worker's runnables.
    
                handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
    
                // Re-check disposed state for removing in case we were racing a call to dispose().
                if (disposed) {
                    handler.removeCallbacks(scheduled);
                    return Disposables.disposed();
                }
    
                return scheduled;
            }
    
            @Override
            public void dispose() {
                disposed = true;
                handler.removeCallbacksAndMessages(this /* token */);
            }
    
            @Override
            public boolean isDisposed() {
                return disposed;
            }
        }
    

    因为Handler是执行者,所以构造方法先把Handler传入进来。然后看下核心方法schedule。

                // 1、传入的Runnable分装成ScheduledRunnable
                ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
                // 2、创建一个可以执行Runnable的Message
                Message message = Message.obtain(handler, scheduled);
                message.obj = this; // Used as token for batch disposal of this worker's runnables.
               // 3、使用Handler推迟时间执行该scheduled
                handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
    
                // Re-check disposed state for removing in case we were racing a call to dispose().
                // 4、如果取消了就用Handler移除
                if (disposed) {
                    handler.removeCallbacks(scheduled);
                    return Disposables.disposed();
                }
    
    ⑤、HandlerScheduler具体的调度者
    final class HandlerScheduler extends Scheduler {
        private final Handler handler;
    
        HandlerScheduler(Handler handler) {
            this.handler = handler;
        }
    
        @Override
        public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
            if (run == null) throw new NullPointerException("run == null");
            if (unit == null) throw new NullPointerException("unit == null");
    
            run = RxJavaPlugins.onSchedule(run);
            ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
            handler.postDelayed(scheduled, Math.max(0L, unit.toMillis(delay)));
            return scheduled;
        }
    
        @Override
        public Worker createWorker() {
            return new HandlerWorker(handler);
        }
    

    HandlerScheduler中复写了scheduleDirect中方法,所以他不会去代理Worker执行Runnable。

    scheduleDirect中的逻辑很少,就是用Handler处理一个定时任务。然后就结束了。

    使用:

    直接调用HandlerScheduler对象的scheduleDirect(Runnable run)方法,最终执行的就是HandlerScheduler中的scheduleDirect(Runnable run, long delay, TimeUnit unit)方法。本质就是使用Handler执行Runnable。

    从线程切换源码可以看出本质上还是对Handler和Runnable的封装。并没从本质上更改。下面介绍下线程切换的具体使用。

    三、subscribeOn

    subscribeOn的意思就是指定在此之前的事件流执行的线程。从代码中可以看到,subscribeOn就是对原来的Observable封装下,转变成ObservableSubscribeOn并返回。

    @SchedulerSupport(SchedulerSupport.CUSTOM)
        public final Observable<T> subscribeOn(Scheduler scheduler) {
            ObjectHelper.requireNonNull(scheduler, "scheduler is null");
            return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
        }
    

    直接看ObservableSubscribeOn源码:

    public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
        final Scheduler scheduler;
    
        public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
            super(source);
            this.scheduler = scheduler;
        }
    
        @Override
        public void subscribeActual(final Observer<? super T> s) {
            final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
    
            s.onSubscribe(parent);
    
            parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
                @Override
                public void run() {
                    source.subscribe(parent);
                }
            }));
        }
    

    ObservableSubscribeOn中将scheduler传入进来,作为触发线程切换的对象。重点代码再subscribeActual方法中。

    • 1、先将原来的Observer包装成新的SubscribeOnObserver
    • 2、调用老的Observer得onSubscribe回调方法
    • 3、parent设置Disposable,保证Disposable唯一性。方法中执行的是线程的调度,也就是将 source.subscribe(parent);代码切换到了指定的线程中。即指定了事件流的最开始的触发位置的线程。

    #######注意:subscribeOn方法只有第一个设置的才生效,原因很简单它的执行顺序是后面设置的先执行,会导致最开始的会将原来设置的覆盖了。

    四、observeOn

    observeOn的意思就是指定在此之后的事件流执行的线程。从代码中可以看到,observeOn就是对原来的Observable封装下,转变成ObservableSubscribeOn并返回。

      @SchedulerSupport(SchedulerSupport.CUSTOM)
        public final Observable<T> observeOn(Scheduler scheduler) {
            return observeOn(scheduler, false, bufferSize());
        }
    
        @SchedulerSupport(SchedulerSupport.CUSTOM)
        public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
            ObjectHelper.requireNonNull(scheduler, "scheduler is null");
            ObjectHelper.verifyPositive(bufferSize, "bufferSize");
            return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
        }
    

    核心的代码还是在ObservableObserveOn中。猜想下observeOn中是决定后面事件流的执行线程,所以肯定不是切换subscribe方法,而是在Observer中的onNext方法中切换。

    首先看下ObservableObserveOn#subscribeActual方法:

        @Override
        protected void subscribeActual(Observer<? super T> observer) {
            // 1、如果是上游和下游是同一个线程,那么就不管,该怎么办就怎么办
            if (scheduler instanceof TrampolineScheduler) {
                source.subscribe(observer);
            } else {
                // 2、得到切换线程的执行者
                Scheduler.Worker w = scheduler.createWorker();
                // 3、封装老的observer,并将worker对象传入
                source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
            }
        }
    

    subscribeActual中先要判断上游和下游的线程是不是在同一个线程中,就分为了两种情况,如果在不同的线程中,那就需要二次加工了。接下来在看看ObserveOnObserver中的处理。

    ObserveOnObserver中的代码有点长,略加删除了些。

        static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
        implements Observer<T>, Runnable {
    
            private static final long serialVersionUID = 6576896619930983584L;
            final Observer<? super T> actual;
            final Scheduler.Worker worker;
            final boolean delayError;
            final int bufferSize;
    
            SimpleQueue<T> queue;
    
            Disposable s;
    
            Throwable error;
            volatile boolean done;
    
            volatile boolean cancelled;
    
            int sourceMode;
    
            boolean outputFused;
    
            ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
                this.actual = actual;
                this.worker = worker;
                this.delayError = delayError;
                this.bufferSize = bufferSize;
            }
    
            @Override
            public void onSubscribe(Disposable s) {
                if (DisposableHelper.validate(this.s, s)) {
                    this.s = s;
                    if (s instanceof QueueDisposable) {
                        @SuppressWarnings("unchecked")
                        QueueDisposable<T> qd = (QueueDisposable<T>) s;
    
                        int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
    
                        if (m == QueueDisposable.SYNC) {
                            sourceMode = m;
                            queue = qd;
                            done = true;
                            actual.onSubscribe(this);
                            schedule();
                            return;
                        }
                        if (m == QueueDisposable.ASYNC) {
                            sourceMode = m;
                            queue = qd;
                            actual.onSubscribe(this);
                            return;
                        }
                    }
    
                    queue = new SpscLinkedArrayQueue<T>(bufferSize);
    
                    actual.onSubscribe(this);
                }
            }
    
            @Override
            public void onNext(T t) {
                if (done) {
                    return;
                }
    
                if (sourceMode != QueueDisposable.ASYNC) {
                    queue.offer(t);
                }
                schedule();
            }
    
            @Override
            public void onError(Throwable t) {
                if (done) {
                    RxJavaPlugins.onError(t);
                    return;
                }
                error = t;
                done = true;
                schedule();
            }
    
            @Override
            public void onComplete() {
                if (done) {
                    return;
                }
                done = true;
                schedule();
            }
    
            @Override
            public void dispose() {
                if (!cancelled) {
                    cancelled = true;
                    s.dispose();
                    worker.dispose();
                    if (getAndIncrement() == 0) {
                        queue.clear();
                    }
                }
            }
    
            @Override
            public boolean isDisposed() {
                return cancelled;
            }
    
            void schedule() {
                if (getAndIncrement() == 0) {
                    worker.schedule(this);
                }
            }
    
            void drainNormal() {
                int missed = 1;
    
                final SimpleQueue<T> q = queue;
                final Observer<? super T> a = actual;
    
                for (;;) {
                    if (checkTerminated(done, q.isEmpty(), a)) {
                        return;
                    }
    
                    for (;;) {
                        boolean d = done;
                        T v;
    
                        try {
                            v = q.poll();
                        } catch (Throwable ex) {
                            Exceptions.throwIfFatal(ex);
                            s.dispose();
                            q.clear();
                            a.onError(ex);
                            return;
                        }
                        boolean empty = v == null;
    
                        if (checkTerminated(d, empty, a)) {
                            return;
                        }
    
                        if (empty) {
                            break;
                        }
    
                        a.onNext(v);
                    }
    
                    missed = addAndGet(-missed);
                    if (missed == 0) {
                        break;
                    }
                }
            }
    
        
            @Override
            public void run() {
                if (outputFused) {
                    drainFused();
                } else {
                    drainNormal();
                }
            }
    
     
            @Override
            public T poll() throws Exception {
                return queue.poll();
            }
    
            @Override
            public void clear() {
                queue.clear();
            }
    
            @Override
            public boolean isEmpty() {
                return queue.isEmpty();
            }
        }
    
    

    因为onNext之后才处理下游的事件,所以先看下onNext方法。

                if (sourceMode != QueueDisposable.ASYNC) {
                    queue.offer(t);
                }
                schedule();
    

    1、onNext中主要就干了两件事,一件事将Event事件插入到Queue队列中,第二件就是执行schedule();,也就是取出事件,并消费。

            void schedule() {
                if (getAndIncrement() == 0) {
                    worker.schedule(this);
                }
            }
    

    2、schedule()中处理的就更加简单了, worker.schedule(this);直接用worker切换线程,因为该类是继承自Runnable,所以最终会执行到自己的run方法。

            @Override
            public void run() {
                if (outputFused) {
                    drainFused();
                } else {
                    drainNormal();
                }
            }
    

    3、run方法是针对不同的情况分别处理了。直接看drainNormal();

    4、drainNormal方法主要就做了两件事,一件是从队列中取出一个事件,如果没有事件了就停止消费。另一个就是将获得的事件用onNext执行,因为执行的地方是在另一个线程,那么后面的事件处理也就切换了线程。

           void drainNormal() {
                int missed = 1;
    
                final SimpleQueue<T> q = queue;
                final Observer<? super T> a = actual;
    
                for (;;) {
                    if (checkTerminated(done, q.isEmpty(), a)) {
                        return;
                    }
    
                    for (;;) {
                        boolean d = done;
                        T v;
    
                        try {
                            v = q.poll();
                        } catch (Throwable ex) {
                            Exceptions.throwIfFatal(ex);
                            s.dispose();
                            q.clear();
                            a.onError(ex);
                            return;
                        }
                        boolean empty = v == null;
    
                        if (checkTerminated(d, empty, a)) {
                            return;
                        }
    
                        if (empty) {
                            break;
                        }
    
                        a.onNext(v);
                    }
    
                    missed = addAndGet(-missed);
                    if (missed == 0) {
                        break;
                    }
                }
            }
    

    相关文章

      网友评论

        本文标题:Rxjava2 线程切换(3)

        本文链接:https://www.haomeiwen.com/subject/ksjubqtx.html