美文网首页
Android RxJava线程切换(2)

Android RxJava线程切换(2)

作者: Bfmall | 来源:发表于2023-04-09 15:18 被阅读0次

这里分析下RxJava2 线程是怎么切换的。分析RxJava2最麻烦的地方在于它的封装类太多,我们只抓主线就行了。
首先看RxJava2 线程切换的例子:

getObservable()
.subscribeOn(Schedulers.io())//切换到io线程
.observeOn(AndroidSchedulers.mainThread(), true)//切换到主线程
.subscribe(getObserver());

一、subscribeOn分析

我们从subscribeOn开始分析,传入的参数是Schedulers.io(),源码是:

//Schedulers.java
public final class Schedulers {

    @NonNull
    static final Scheduler IO;


    static {
        IO = RxJavaPlugins.initIoScheduler(new IOTask());
    }

    @NonNull
    public static Scheduler io() {
        return RxJavaPlugins.onIoScheduler(IO);
    }

    static final class IOTask implements Callable<Scheduler> {
        @Override
        public Scheduler call() throws Exception {
            return IoHolder.DEFAULT;
        }
    }

    static final class IoHolder {
        //最终Schedulers.io()返回的就是IoScheduler对象
        static final Scheduler DEFAULT = new IoScheduler();
    }

}

RxJava2源码中有大量和RxJavaPlugins相关的代码,可以简单认为RxJavaPlugins只是wrap了一层,就是个钩子(Hook)函数,比如简单认为RxJavaPlugins.onIoScheduler(IO)返回就是IO,这样对主流程没影响。
IO是什么,追踪源码可以看到IO就是 IoScheduler,先看到这里,再回到subscribeOn方法本身。

/**
 * //Observable.java
 * scheduler==IoSheduler
 *
 * ObservableSubscribeOn 参数包含(传递source,IoSheduler)
 *
 */
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    //RxJavaPlugins.onAssembly为hook函数,返回ObservableSubscribeOn对象
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}

进入ObservableSubscribeOn源码:

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        //source就是最初的Observable
        super(source);
        //scheduler就是最初传进来的IoScheduler
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> observer) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);

        observer.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

    ......

}

可见ObservableSubscribeOn只是把最初的Observable和IO封装了下。
这里说下本文开头例子的执行流程,当getObservable()最后执行subscribe(getObserver());时,会走到上面的subscribeActual方法,调用如下:

//Observable.java
public final void subscribe(Observer<? super T> observer) {
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        observer = RxJavaPlugins.onSubscribe(this, observer);

        ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

        //调用到subscribeActual方法
        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        // can't call onError because no way to know if a Disposable has been set or not
        // can't call onSubscribe because the call might have set a Subscription already
        RxJavaPlugins.onError(e);

        NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
        npe.initCause(e);
        throw npe;
    }
}

调用的是个抽象方法

protected abstract void subscribeActual(Observer<? super T> observer);

实际调用的是Observable的实现类ObservableSubscribeOn的subscribeActual方法:

//ObservableSubscribeOn.java
@Override
public void subscribeActual(final Observer<? super T> observer) {
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);

    observer.onSubscribe(parent);
   
    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

来看下subscribeActual做了什么。
方法里重点在这句

parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));

先来看下new SubscribeTask(parent)

final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            //Observable执行订阅的动作封装在runnable中
            source.subscribe(parent);
        }
    }

再看下scheduler.scheduleDirect(Runnable run)方法,就是调用了IoScheduler的scheduleDirect()方法,方法定义在IoScheduler的父类Scheduler中。可以猜测scheduler.scheduleDirect(Runnable run)方法就是把上面的SubscribeTask作为参数执行线程切换。

//Scheduler.java
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
    return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}

@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    //调用createWorker()方法
    final Worker w = createWorker();
    //decoratedRun是包装的run对象
    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

    DisposeTask task = new DisposeTask(decoratedRun, w);
    //最后调用Worker的schedule()方法:
    w.schedule(task, delay, unit);

    return task;
}

//抽象方法:实际调用子类IoScheduler的方法
public abstract Worker createWorker();

先看下IoScheduler的createWorker()方法:

//IoScheduler.java
@NonNull
@Override
public Worker createWorker() {
    return new EventLoopWorker(pool.get());
}

static final class EventLoopWorker extends Scheduler.Worker {
    private final CompositeDisposable tasks;
    private final CachedWorkerPool pool;
    private final ThreadWorker threadWorker;

    final AtomicBoolean once = new AtomicBoolean();

    EventLoopWorker(CachedWorkerPool pool) {
        this.pool = pool;
        this.tasks = new CompositeDisposable();
        //后面分析threadWorker
        this.threadWorker = pool.get();
    }

    @Override
    public void dispose() {
        if (once.compareAndSet(false, true)) {
            tasks.dispose();

            // releasing the pool should be the last action
            pool.release(threadWorker);
        }
    }

    @Override
    public boolean isDisposed() {
        return once.get();
    }
    
    //调用此方法
    @NonNull
    @Override
    public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
        if (tasks.isDisposed()) {
            // don't schedule, we are unsubscribed
            return EmptyDisposable.INSTANCE;
        }
        //接着调用ThreadWorker的scheduleActual()方法
        return threadWorker.scheduleActual(action, delayTime, unit, tasks);
    }
}

static final class ThreadWorker extends NewThreadWorker {
    private long expirationTime;

    ThreadWorker(ThreadFactory threadFactory) {
        super(threadFactory);
        this.expirationTime = 0L;
    }

    public long getExpirationTime() {
        return expirationTime;
    }

    public void setExpirationTime(long expirationTime) {
        this.expirationTime = expirationTime;
    }
}

看下NewThreadWorker 类的scheduleActual()方法:

//NewThreadWorker.java
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
    private final ScheduledExecutorService executor;

    volatile boolean disposed;

    public NewThreadWorker(ThreadFactory threadFactory) {
        //线程池的工厂类创建线程池
        executor = SchedulerPoolFactory.create(threadFactory);
    }

    @NonNull
    @Override
    public Disposable schedule(@NonNull final Runnable run) {
        return schedule(run, 0, null);
    }

    @NonNull
    @Override
    public Disposable schedule(@NonNull final Runnable action, long delayTime, @NonNull TimeUnit unit) {
        if (disposed) {
            return EmptyDisposable.INSTANCE;
        }
        return scheduleActual(action, delayTime, unit, null);
    }

    @NonNull
    public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
        //run的装饰类
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        //又一层decoratedRun的包装类
        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

        if (parent != null) {
            if (!parent.add(sr)) {
                return sr;
            }
        }

        Future<?> f;
        try {
            //用线程池执行ScheduledRunnable 完成了线程切换
            if (delayTime <= 0) {
                f = executor.submit((Callable<Object>)sr);
            } else {
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            sr.setFuture(f);
        } catch (RejectedExecutionException ex) {
            if (parent != null) {
                parent.remove(sr);
            }
            RxJavaPlugins.onError(ex);
        }

        return sr;
    }
}

上面代码
executor = SchedulerPoolFactory.create(threadFactory);创建线程池代码:

//SchedulerPoolFactory.java
public static ScheduledExecutorService create(ThreadFactory factory) {
        final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
        tryPutIntoPool(PURGE_ENABLED, exec);
        return exec;
    }

经过上面的流程分析,可见Runnable经过wrap后,最终交给了ScheduledExecutorService去执行。
到这里可以总结下,所谓线程切换只是将要跑的任务封装成Runnable,然后把Runnable交给ExecutorService去执行,源码看起来复杂的地方就在于一层层的往下传这个Runnable给真正执行的对象。

题外的分析

上面提到Runnable给ThreadWorker去跑,ThreadWorker从哪里来?看上面EventLoopWorker的构造函数源码,从里面看到threadWorker从CachedWorkerPool里得到:

this.threadWorker = pool.get();

CachedWorkerPool的主要代码如下:

static final class CachedWorkerPool implements Runnable {
    private final long keepAliveTime;
    private final ConcurrentLinkedQueue<ThreadWorker> expiringWorkerQueue;
    final CompositeDisposable allWorkers;
    private final ScheduledExecutorService evictorService;
    private final Future<?> evictorTask;
    private final ThreadFactory threadFactory;

    CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
        this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;
        this.expiringWorkerQueue = new ConcurrentLinkedQueue<ThreadWorker>();
        this.allWorkers = new CompositeDisposable();
        this.threadFactory = threadFactory;

        ScheduledExecutorService evictor = null;
        Future<?> task = null;
        if (unit != null) {
            evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
            task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
        }
        evictorService = evictor;
        evictorTask = task;
    }

    @Override
    public void run() {
        evictExpiredWorkers();
    }

    //重点看get方法
    ThreadWorker get() {
        if (allWorkers.isDisposed()) {
            return SHUTDOWN_THREAD_WORKER;
        }
        //如果队列不为空,从队列里取一个ThreadWorker 
        while (!expiringWorkerQueue.isEmpty()) {
            ThreadWorker threadWorker = expiringWorkerQueue.poll();
            if (threadWorker != null) {
                return threadWorker;
            }
        }
        //如果队列为空,创建一个ThreadWorker返回
        // No cached worker found, so create a new one.
        ThreadWorker w = new ThreadWorker(threadFactory);
        allWorkers.add(w);
        return w;
    }

    //释放ThreadWorker,放入队列中
    void release(ThreadWorker threadWorker) {
        // Refresh expire time before putting worker back in pool
        threadWorker.setExpirationTime(now() + keepAliveTime);

        expiringWorkerQueue.offer(threadWorker);
    }
}

可以看到ThreadWorker先从expiringWorkerQueue中取,如果队列是空的,就创建个新的ThreadWorker。
那啥时候把ThreadWorker放入expiringWorkerQueue队列呢?
就是上面的release方法。
谁调用release呢?就是上面的EventLoopWorker的dispose方法:

//IoScheduler.java的内部类EventLoopWorker
@Override
public void dispose() {
    if (once.compareAndSet(false, true)) {
        tasks.dispose();

        // releasing the pool should be the last action
        pool.release(threadWorker);
    }
}

二、observeOn分析

observeOn的流程和subscribeOn差不多,也是封装成了Runnable交给Scheduler,Scheduler再找对象去执行,例子里是Scheduler找Handler来实现的,下面来分析。来看AndroidSchedulers.mainThread(),MAIN_THREAD其实被封装过的DEFAULT这个Scheduler:

public final class AndroidSchedulers {

    private static final class MainHolder {
        //scheduler 就是HandlerScheduler,参数为主线程handler
        static final Scheduler DEFAULT
            = new HandlerScheduler(new Handler(Looper.getMainLooper()), false);
    }

    private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
            new Callable<Scheduler>() {
                @Override public Scheduler call() throws Exception {
                    return MainHolder.DEFAULT;
                }
            });

    /** A {@link Scheduler} which executes actions on the Android main thread. */
    public static Scheduler mainThread() {
        return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
    }
}

可以看到HandlerScheduler传入的参数是个MainLooper的Handler,这里就可以猜到RxJava2是用这个Handler将线程切换到主线程上。来看看HandlerScheduler源码:

final class HandlerScheduler extends Scheduler {
    private final Handler handler;
    private final boolean async;

    HandlerScheduler(Handler handler, boolean async) {
        this.handler = handler;
        this.async = async;
    }

    @Override
    public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
        if (run == null) throw new NullPointerException("run == null");
        if (unit == null) throw new NullPointerException("unit == null");

        run = RxJavaPlugins.onSchedule(run);
        ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
        handler.postDelayed(scheduled, unit.toMillis(delay));
        return scheduled;
    }

    @Override
    public Worker createWorker() {
        return new HandlerWorker(handler, async);
    }

    private static final class HandlerWorker extends Worker {
        private final Handler handler;
        private final boolean async;

        private volatile boolean disposed;

        HandlerWorker(Handler handler, boolean async) {
            this.handler = handler;
            this.async = async;
        }

        @Override
        @SuppressLint("NewApi") // Async will only be true when the API is available to call.
        public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
            if (run == null) throw new NullPointerException("run == null");
            if (unit == null) throw new NullPointerException("unit == null");

            if (disposed) {
                return Disposables.disposed();
            }

            run = RxJavaPlugins.onSchedule(run);

            ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

            //主要的就是通过handler发送消息,完成线程切换
            Message message = Message.obtain(handler, scheduled);
            message.obj = this; // Used as token for batch disposal of this worker's runnables.

            if (async) {
                message.setAsynchronous(true);
            }

            handler.sendMessageDelayed(message, unit.toMillis(delay));

            // Re-check disposed state for removing in case we were racing a call to dispose().
            if (disposed) {
                handler.removeCallbacks(scheduled);
                return Disposables.disposed();
            }

            return scheduled;
        }

        @Override
        public void dispose() {
            disposed = true;
            handler.removeCallbacksAndMessages(this /* token */);
        }

        @Override
        public boolean isDisposed() {
            return disposed;
        }
    }

    private static final class ScheduledRunnable implements Runnable, Disposable {
        private final Handler handler;
        private final Runnable delegate;

        private volatile boolean disposed; // Tracked solely for isDisposed().

        ScheduledRunnable(Handler handler, Runnable delegate) {
            this.handler = handler;
            this.delegate = delegate;
        }

        @Override
        public void run() {
            try {
                delegate.run();
            } catch (Throwable t) {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public void dispose() {
            handler.removeCallbacks(this);
            disposed = true;
        }

        @Override
        public boolean isDisposed() {
            return disposed;
        }
    }
}

可以看到HandlerWorker结构非常类似在分析subscribeOn时最后碰到的NewThreadWorker,任务Runnable经过封装后最后都是交给这些Worker的schedule方法来执行。这里是通过Handler机制来运行Runnable的,上面代码很多,其实只用看这句:

//获得Message,scheduled作为这个Message的callback变量
Message message = Message.obtain(handler, scheduled);

Message被handler发送出去,scheduled这个callback Runnable就在MainLooper所在线程里执行了。

题外的分析-Handler机制

下面简要分析message被handler发送后,scheduled这个callback Runnable怎么执行的。
Handler里有个Looper对象,Looper里有个MessageQueue(消息队列),Looper里有个loop()方法不断从MessageQueue取Message,源码精简后的流程如下:

public final class Looper {
      //通过ThreadLocal来存取Looper,每个线程只有一个Looper
      public static @Nullable Looper myLooper() {
             return sThreadLocal.get();
       }
      public static void loop() {
            final Looper me = myLooper();//获得Looper
            final MessageQueue queue = me.mQueue;//获得Looper里的
            MessageQueue
            for (;;) {
                   Message msg = queue.next();//获取消息队列的每个消息
                   msg.target.dispatchMessage(msg); //重点来了,target就是handler
            }
       }
       ....................
}
public class Handler {
      public void dispatchMessage(Message msg) {
               if (msg.callback != null) {//判断msg有callback先执行
                      handleCallback(msg);
                } else {
                      if (mCallback != null) {
                              if (mCallback.handleMessage(msg)) {
                               return;
                       }
            }
            handleMessage(msg);
      }
      private static void handleCallback(Message message) {
              message.callback.run();//Runnable执行了
       }
        ......................
    }
}

再次提醒,callback就是前面要执行的ScheduledRunnable,这样ScheduledRunnable就在Handler的Looper所在线程中执行了。

参考:
https://www.jianshu.com/p/dab243ca781f

相关文章

网友评论

      本文标题:Android RxJava线程切换(2)

      本文链接:https://www.haomeiwen.com/subject/fncjddtx.html