美文网首页Android进阶之路Android开发Android开发
我是用来切换线程的-----subscribeOn

我是用来切换线程的-----subscribeOn

作者: javalong | 来源:发表于2018-05-23 22:20 被阅读31次
    前言

    讲到RxJava不得不提到他的线程切换,前面本来打算是按字母顺序把操作符一个个介绍,现在看来其实很多操作符的源码看过去都是差不多的,我这样一个个介绍难免显得过于重复,啰嗦,所以还是想把几个最重要的,最常用的挑出来,然后给大家详细的分析,这样应该更好。

    用法+解析

    subscribeOn其实是很通用的方法。用来切换线程。他的参数呢是Scheduler对象。我们可以直接看RxJava中为我们提供了多少种Scheduler

    前面几篇文章的写法呢,直接用语言来概括用法,我觉得不太好,还是应该用例子来说明,这样会比较好。

    图片.png

    Schedulers中定义了许多 返回不同Scheduler的方法。
    由于篇幅有限,这里我介绍一个最常用的Schedulers.io(),如果反响比较好,可以再介绍其他的,其实搞懂了一个,其他也是一样的。

    首先了解下用法,然后深入源码。

    public class MainActivity extends AppCompatActivity {
    
        @Override
        protected void onCreate(Bundle savedInstanceState) {
            super.onCreate(savedInstanceState);
            setContentView(R.layout.activity_main);
            findViewById(R.id.btClick).setOnClickListener(new View.OnClickListener() {
                @Override
                public void onClick(View v) {
                     Observable.create(new Observable.OnSubscribe<Integer>() {
                        @Override
                        public void call(Subscriber<? super Integer> subscriber) {
                            subscriber.onNext(1);
                            subscriber.onCompleted();
                        }
                    }).subscribeOn(Schedulers.io())
                      .subscribe(new Subscriber<Integer>() {
                            @Override
                            public void onCompleted() {
                            }
    
                            @Override
                            public void onError(Throwable e) {
                            }
    
                            @Override
                            public void onNext(Integer integer) {
                                Log.e("aaaaa", "" + Thread.currentThread().getName());
                            }
                     });
                }
            });
    
        }
    }
    

    快速点击按钮


    图片.png

    看日志,始终是复用了一个叫RxIoScheduler-2的线程。我们都知道线程的创建和销毁是很消耗资源的。所以我们尽量避免使用Schedulers.newThread()每次都去创建一个线程,而是去使用Schedulers.io()可以去复用已有的线程。

    等待2分钟以上,我们再次去快速点击按钮。

    图片.png

    发现这一次的所有的线程名称变为了RxIoScheduler-3的线程。

    初步推断RxIoScheduler-2长时间未使用,被销毁了。那么我们可以用工具去验证下,查看下当前存在的线程,查看RxIoScheduler-2是否是被销毁了。

    工具查看RxIoScheduler-2是否被销毁
    1. 进入debug模式


      图片.png
    2. 查看线程状态


      图片.png
    3. 间隔2分钟,进行2次快速点击,再查看线程状态

    图片.png 图片.png

    很显然,RxIoScheduler-2线程已经被销毁了

    深入源码

    1. Schedulers.io()

    Schedulers

    ...
     Scheduler io = hook.getIOScheduler();
            if (io != null) {
                ioScheduler = io;
            } else {
                ioScheduler = RxJavaSchedulersHook.createIoScheduler();
            }
    ...
    public static Scheduler io() {
            return RxJavaHooks.onIOScheduler(getInstance().ioScheduler);
        }
    ...
    

    hook.getIOScheduler()==null所以走下面分支。

    hook其实是提供了一个全局的钩子,一般都不会自己去实现,所以直接是使用默认的hook,返回为null

    RxJavaSchedulersHook

    ...
    public Scheduler getIOScheduler() {
            return null;
        }
    ...
      public static Scheduler createIoScheduler() {
            return createIoScheduler(new RxThreadFactory("RxIoScheduler-"));
        }
    ...
    public static Scheduler createIoScheduler(ThreadFactory threadFactory) {
            if (threadFactory == null) {
                throw new NullPointerException("threadFactory == null");
            }
            return new CachedThreadScheduler(threadFactory);
        }
    ...
    

    这里我们可以看出来Schedulers.io()返回的其实是CachedThreadScheduler


    1. RxThreadFactory

    其实在上面代码中,我们看到了一个比较熟悉的东西,就是RxIoScheduler-这么一个字符串,在前面的例子其实我们已经有所提到,就是线程的名称是RxIoScheduler-2,RxIoScheduler-3...这样的。
    RxThreadFactory

    public final class RxThreadFactory extends AtomicLong implements ThreadFactory {
    ...
    @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r, prefix + incrementAndGet());
            t.setDaemon(true);
            return t;
        }
    ...
    

    这里就是真正来创建线程的地方,prefix + incrementAndGet()就是线程的name。
    RxThreadFactory继承了AtomicLong来实现线程安全,不会出现2个同名的情况。


    1. subscribeOn
      我们已经知道Schedulers.io的返回和真正创建线程的地方。
      那么重新回到subscribeOn操作符下,一步步跟入。
      Observable
    ...
     public final Observable<T> subscribeOn(Scheduler scheduler) {
            return subscribeOn(scheduler, !(this.onSubscribe instanceof OnSubscribeCreate));
        }
    ...
       public final Observable<T> subscribeOn(Scheduler scheduler, boolean requestOn) {
            if (this instanceof ScalarSynchronousObservable) {
                return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
            }
            return unsafeCreate(new OperatorSubscribeOn<T>(this, scheduler, requestOn));
        }
    ...
    

    this显然不是ScalarSynchronousObservable所以走下面分支。给Observable外面再套一层Observable。这不是关键。关键的是OperatorSubscribeOn是一个OnSubscribe
    其实前面我讲解了几篇操作符的文章,操作符主要就是对应了一个OnSubscribe,我们直接看里面的具体实现。

    今天这篇文章主要介绍线程切换,可能对RxJava整个链式函数的流程说的不是很清楚。
    可以参考RxJava操作符源码解析

    因为每个操作符的整体流程其实是不变的,最重要的是操作符对应的OnSubscribe具体做了什么,这里就不重复去讲整个流程了。


    1. OperatorSubscribeOn
      @Override
        public void call(final Subscriber<? super T> subscriber) {
            final Worker inner = scheduler.createWorker();
    
            SubscribeOnSubscriber<T> parent = new SubscribeOnSubscriber<T>(subscriber, requestOn, inner, source);
            subscriber.add(parent);
            subscriber.add(inner);
          //最重要的一句,切换线程
            inner.schedule(parent);
        }
    

    scheduler其实在前面已经介绍,就是CachedThreadScheduler
    我们来分析下上面的代码。

    1. 创建一个Worker对象
      创建一个Worker,这个worker也是一个Subscription
    2. 创建Subscriber对象,并绑定parent,inner
      绑定后,Subscriber对象unsubscribe的时候,parent,inner一起unsubscribe
    3. 使用Worker对象切换线程

    1. SubscribeOnSubscriber

    首先这是一个Subscriber,对我们自己实现的Subscriber做了一次包装,先执行这里的onNext等方法然后再执行我们自己实现的SubscriberonNext等方法。
    SubscribeOnSubscriber

    ...
     @Override
            public void onNext(T t) {
                actual.onNext(t);
            }
    
            @Override
            public void onError(Throwable e) {
                try {
                    actual.onError(e);
                } finally {
                    worker.unsubscribe();
                }
            }
    
            @Override
            public void onCompleted() {
                try {
                    actual.onCompleted();
                } finally {
                    worker.unsubscribe();
                }
            }
    ...
    

    onNext 其实不作任何处理
    onError,onCompleted都会调用worker.unsubscribe();
    这一句是关键。
    简单的理解就是执行完毕后,就会把Worker对象解除订阅。


    1. Worker.schedule
      inner.schedule是用来切换线程的,我们可以深入看看。
      CachedThreadScheduler
    ...
    @Override
        public Worker createWorker() {
            return new EventLoopWorker(pool.get());
        }
    ...
    

    所以这里的inner就是EventLoopWorker

    EventLoopWorker

    ...
     @Override
            public Subscription schedule(Action0 action) {
                return schedule(action, 0, null);
            }
    
            @Override
            public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
                if (innerSubscription.isUnsubscribed()) {
                    // don't schedule, we are unsubscribed
                    return Subscriptions.unsubscribed();
                }
    
                ScheduledAction s = threadWorker.scheduleActual(new Action0() {
                    @Override
                    public void call() {
                        if (isUnsubscribed()) {
                            return;
                        }
                        action.call();
                    }
                }, delayTime, unit);
                innerSubscription.add(s);
                s.addParent(innerSubscription);
                return s;
            }
    ...
    

    threadWorker是什么?一步步往前推
    CachedThreadScheduler

    ...
    final AtomicReference<CachedWorkerPool> pool;
    ...
     @Override
        public Worker createWorker() {
            return new EventLoopWorker(pool.get());
        }
    ...
     static final class EventLoopWorker extends Scheduler.Worker implements Action0 {
            private final CompositeSubscription innerSubscription = new CompositeSubscription();
            private final CachedWorkerPool pool;
            private final ThreadWorker threadWorker;
            final AtomicBoolean once;
    
            EventLoopWorker(CachedWorkerPool pool) {
                this.pool = pool;
                this.once = new AtomicBoolean();
                this.threadWorker = pool.get();
            }
    ...
    
    

    这里其实看的比较迷糊,因为用了2个相同名字的变量,并且有2个相同名字的方法。
    第一个pool.get()获取的是CachedWorkerPool对象
    第二个pool.get()获取的是ThreadWorker对象

    我们直接看第二个pool.get()也就是 CachedWorkerPool的`get`方法。

    ...
    ThreadWorker get() {
                if (allWorkers.isUnsubscribed()) {
                    return SHUTDOWN_THREADWORKER;
                }
                while (!expiringWorkerQueue.isEmpty()) {
                    ThreadWorker threadWorker = expiringWorkerQueue.poll();
                    if (threadWorker != null) {
                        return threadWorker;
                    }
                }
    
                // No cached worker found, so create a new one.
                ThreadWorker w = new ThreadWorker(threadFactory);
                allWorkers.add(w);
                return w;
            }
    ...
    

    这里我们就可以清楚的看出,CachedThreadScheduler是如何复用的。先从expiringWorkerQueue取,没有再去创建ThreadWorker,这里传入的参数threadFactory就是前面介绍的RxThreadFactory

    1. 如何维护expiringWorkerQueue
      既然知道了使用expiringWorkerQueue来复用线程。那么我们看看具体是如何维护的。
      带着几个问题。

      1. 如何添加ThreadWork到expiringWorkerQueue

      2.如何移除ThreadWork

      首先我们要知道,在使用的ThreadWorker显然是不会放入expiringWorkerQueue,然后给其他地方复用的。所以肯定是,当ThreadWorker执行完毕之后。

    这里其实我们可以在原来的demo基础之上稍微修改一下,救能很快的证明这一点。

     Observable.create(new Observable.OnSubscribe<Integer>() {
                        @Override
                        public void call(Subscriber<? super Integer> subscriber) {
                            subscriber.onNext(1);
                            subscriber.onCompleted();
                        }
                    }).subscribeOn(Schedulers.io())
                      .subscribe(new Subscriber<Integer>() {
                            @Override
                            public void onCompleted() {
                            }
    
                            @Override
                            public void onError(Throwable e) {
                            }
    
                            @Override
                            public void onNext(Integer integer) {
                                Log.e("aaaaa", "" + Thread.currentThread().getName());
                                try {
                                    //添加sleep
                                    Thread.sleep(1000);
                                } catch (InterruptedException e) {
                                    e.printStackTrace();
                                }
                            }
                     });
    

    快速点击多次,看输出。


    图片.png

    由于添加了Thread.sleep后,你快速点击多次,点击第二次的时候,上一个ThreadWorker未执行完成,所以不能复用,而是重新创建了。但是当点击第8次的时候,第一个 ThreadWorker其实是已经执行完毕了,所以RxIoScheduler-2线程被复用。

    在看源码之前,我们先了解下CachedThreadScheduler,CachedWorkerPool,EventLoopWorker这3者的关系。

    CachedThreadScheduler:第一次使用Schedulers类方法的时候创建,单例
    CachedWorkerPool:创建CachedThreadScheduler的时候创建,单例
    EventLoopWorker:每次调用subscribeOn操作符的时候创建,非单例

    3个类的代码都在CachedThreadScheduler.java文件中。

    下面我们直接看代码

    CachedThreadScheduler

    ...
     static final class CachedWorkerPool {
    void release(ThreadWorker threadWorker) {
    ...
                // Refresh expire time before putting worker back in pool
                threadWorker.setExpirationTime(now() + keepAliveTime);
    
                expiringWorkerQueue.offer(threadWorker);
            }
    ...
    static final class EventLoopWorker extends Scheduler.Worker implements Action0 {
    ...
    @Override
            public void call() {
                pool.release(threadWorker);
            }
    ...
    
    

    可以看出,调用EventLoopWorkercall方法,就会给ThreadWorker设置过期时间,然后加入expiringWorkerQueue队列。

    那么在哪里调用EventLoopWorkercall方法呢?

    在前面介绍SubscribeOnSubscriber的最后,提到了onError,onCompleted都会调用worker.unsubscribe();

    ...
    static final class EventLoopWorker extends Scheduler.Worker implements Action0 {
          ...
            @Override
            public void unsubscribe() {
                if (once.compareAndSet(false, true)) {
                    // unsubscribe should be idempotent, so only do this once
    
                    // Release the worker _after_ the previous action (if any) has completed
                    threadWorker.schedule(this);
                }
                innerSubscription.unsubscribe();
            }
    
            @Override
            public void call() {
                pool.release(threadWorker);
            }
    ...
    

    EventLoopWorker本身也是个Action0,
    threadWorker.schedule(this);其实就是直接异步调用Action0call方法。

    这么看来其实就是当SubscriberonCompleteonError方法调用后,线程就会放在队列中复用。

    那么第二个问题,什么时候从队列中移除这个线程?
    CachedThreadScheduler

    void release(ThreadWorker threadWorker) {
    ...
                // Refresh expire time before putting worker back in pool
                threadWorker.setExpirationTime(now() + keepAliveTime);
    
                expiringWorkerQueue.offer(threadWorker);
            }
    

    大家可以看出来当空闲的ThreadWorker添加到队列前设置了一个ExpirationTime,也就是超时时间。
    这里可以告诉大家keepAliveTime其实是60s,可以配置的。

    ...
      static final class CachedWorkerPool {
        ...
    
            CachedWorkerPool(final ThreadFactory threadFactory, long keepAliveTime, TimeUnit unit) {
               ...
                if (unit != null) {
                    //创建了一个ScheduledExecutorService
                    evictor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
                        @Override public Thread newThread(Runnable r) {
                            Thread thread = threadFactory.newThread(r);
                            thread.setName(thread.getName() + " (Evictor)");
                            return thread;
                        }
                    });
                    ...
                    //开启一个新的Thread每隔60s一次调用evictExpiredWorkers
                    task = evictor.scheduleWithFixedDelay(
                            new Runnable() {
                                @Override
                                public void run() {
                                    evictExpiredWorkers();
                                }
                            }, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS
                    );
                }
            ...
            //遍历expiringWorkerQueue,移除已经过期的ThreadWorker
             void evictExpiredWorkers() {
                if (!expiringWorkerQueue.isEmpty()) {
                    long currentTimestamp = now();
    
                    for (ThreadWorker threadWorker : expiringWorkerQueue) {
                        if (threadWorker.getExpirationTime() <= currentTimestamp) {
                            if (expiringWorkerQueue.remove(threadWorker)) {
                                allWorkers.remove(threadWorker);
                            }
                        } else {
                            // Queue is ordered with the worker that will expire first in the beginning, so when we
                            // find a non-expired worker we can stop evicting.
                            break;
                        }
                    }
                }
            }
        ...
            }
    

    已经在关键代码上添加了注释,整体来看,就是在创建CachedWorkerPool的时候开启了一个线程每隔60s去遍历移除过期的ThreadWorker

    下面我们可以再来看下刚才的图片,看过源码之后,我们可以再去理解一下为什么会这么输出。

    图片.png

    大家可以前面我让大家等待2分钟以上,然后线程变成了RxIoScheduler-3,因为RxIoScheduler-2显然已经被移除了。

    那么这里为什么会是2分钟呢,刚才不是介绍了keepAliveTime是60s么。

    1. 这里会给ThreadWorker设置为过期时间为60s
    2. 然后每60s遍历并移除过期ThreadWorker

    如果运气不好的话,当你ThreadWorker的工作刚刚完成,设置了过期时间为60s之后,这一次的evictExpiredWorkers已经执行过了,只能等待下一次。所以等待了60s左右进行下一次的evictExpiredWorkers,显然这时候ThreadWorker的过期时间还没到,还差一点。只能等待下一次evictExpiredWorkers。所以最好是2分钟。


    总结

    其实呢已经把最关键的点都过了一遍,但是大家对整个流程到底哪里切换了线程不是非常理解,我会在有空的时候更新一张图,画出来。

    Observable.create 其实是最简单的,如果使用Observable.just,Observable.from等方法的话整个流程会更加复杂,简单去理解subscribeOn或者其他操作符的话,建议大家都用Observable.create,这样就避免调用setProducerrequest等方法,这样对理解操作符来说会有很大的帮助。

    有空再更新下,其实还没有写完。

    相关文章

      网友评论

        本文标题:我是用来切换线程的-----subscribeOn

        本文链接:https://www.haomeiwen.com/subject/hfmbdftx.html