美文网首页
Rxjava2.2.1(3) subscribeOn 线程切换-

Rxjava2.2.1(3) subscribeOn 线程切换-

作者: 其勇勇 | 来源:发表于2019-08-08 14:43 被阅读0次

    rxjava代码

    Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
            Log.e("qwer", Thread.currentThread().getName());
            emitter.onNext("有情况");
        }
    }).subscribeOn(Schedulers.newThread()).
            subscribe(new Observer<String>() {
                @Override
                public void onSubscribe(Disposable d) {
    
                }
    
                @Override
                public void onNext(String s) {
                    Log.e("qwer", s);
                    Log.e("qwer", Thread.currentThread().getName());
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onComplete() {
    
                }
            });
    
    log--
    E/qwer: main
    E/qwer: 有情况
    E/qwer: main
    

    由前面两篇文章我们得知,如果不指定线程切换,那么我们在哪个线程操作,事件的发送和接收就发生在哪个线程。然后create和subscribe也不讲了(可以看前两篇文章)
    1、直接看subscribeOn

    public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }
    

    2、进入ObservableSubscribeOn

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }
    

    看过前两篇文章的都知道,自此,赋值准备工作结束,然后被观察者开始发送事件的时候,会调用ObservableSubscribeOn的subscribeActual方法

    public void subscribeActual(final Observer<? super T> observer) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
    
        observer.onSubscribe(parent);
    
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
    

    这个方法的参数observer就是我们自己new的观察者,SubscribeOnObserver持有observer的引用,这样SubscribeOnObserver在调用onNext的时候,就会再调用observer的onNext方法。这些看过前两篇的都会知道
    直接第三行代码
    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    3、我们首先看最里面的参数new SubscribeTask(parent),进入SubscribeTask

    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;
    
        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }
    
        @Override
        public void run() {
            source.subscribe(parent);
        }
    }
    

    其实就是一个线程,然后在run()方法里调用了
    source.subscribe(parent);
    这个source就是我们自己new的观察者(其实这个rxjava是链式调用,如果有好几层的话也可以理解为source就是上一层的Observable,比如说上面还有一个map转换,那么这个地方的source就是ObservableMap了)
    好了,那么接下来就开始看这个线程在哪里启动的了
    4、我们继续看这一行代码
    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    进入scheduleDirect

    public Disposable scheduleDirect(@NonNull Runnable run) {
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    }
    

    看注释得知,这里其实就是该线程会被无延迟执行,再次进入该重载的方法

    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();
    
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    
        DisposeTask task = new DisposeTask(decoratedRun, w);
    
        w.schedule(task, delay, unit);
    
        return task;
    }
    

    在这里我们发现RxJavaPlugins.onSchedule(run)返回的还是run本身
    然后decoratedRun 又传给了DisposeTask

    DisposeTask(@NonNull Runnable decoratedRun, @NonNull Worker w) {
        this.decoratedRun = decoratedRun;
        this.w = w;
    }
    

    我们发现执行线程赋值给了decoratedRun ,而且DisposeTask实现了Runnable接口,然后在它的run方法里我们看到decoratedRun.run();
    再回到前门的代码
    w.schedule(task, delay, unit);
    w是createWorker()获取的,而createWorker();是个抽象方法,那么只能找Schedule的实现类了
    5、那么我们当前执行的这个scheduler又是谁呢?它其实就是我们.subscribeOn(Schedulers.newThread())时传进来的Schedulers.newThread(),好,继续看看Schedulers.newThread()是何许人也,进入newThread()方法

    public static Scheduler newThread() {
        return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD);
    }
    

    再进入onNewThreadScheduler方法

    public static Scheduler onNewThreadScheduler(@NonNull Scheduler defaultScheduler) {
        Function<? super Scheduler, ? extends Scheduler> f = onNewThreadHandler;
        if (f == null) {
            return defaultScheduler;
        }
        return apply(f, defaultScheduler);
    }
    

    这里其实就是返回本身
    6、那就再看看NEW_THREAD的本身

    static final Scheduler NEW_THREAD;
    

    我们一看是final,就接着找到了

    static {
        ......
        NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
    }
    

    其实initNewThreadScheduler方法还是返回本身,再进入NewThreadTask

    static final class NewThreadTask implements Callable<Scheduler> {
        @Override
        public Scheduler call() throws Exception {
            return NewThreadHolder.DEFAULT;
        }
    }
    

    再继续看DEFAULT

    static final class NewThreadHolder {
        static final Scheduler DEFAULT = new NewThreadScheduler();
    }
    

    好,到此为止,我们发现了步骤4最后要找的Schedule的实现类了
    我们进入NewThreadScheduler,找到createWorker方法

    public Worker createWorker() {
        return new NewThreadWorker(threadFactory);
    }
    

    还记得之前步骤4中的 w.schedule(task, delay, unit)么?不错,这里的w就是这里返回的对象,我们直接进入NewThreadWorker的schedule方法

    public Disposable schedule(@NonNull final Runnable action, long delayTime, @NonNull TimeUnit unit) {
        if (disposed) {
            return EmptyDisposable.INSTANCE;
        }
        return scheduleActual(action, delayTime, unit, null);
    }
    

    再进入scheduleActual方法

    public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    
        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
    
        if (parent != null) {
            if (!parent.add(sr)) {
                return sr;
            }
        }
    
        Future<?> f;
        try {
            if (delayTime <= 0) {
                f = executor.submit((Callable<Object>)sr);
            } else {
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            sr.setFuture(f);
        } catch (RejectedExecutionException ex) {
            if (parent != null) {
                parent.remove(sr);
            }
            RxJavaPlugins.onError(ex);
        }
    
        return sr;
    }
    

    啊!!到这里可算结束啦
    在代码中,delayTime == 0
    然后执行了f = executor.submit((Callable<Object>)sr);
    其实也就是立即执行了我们的操作线程,是不是已经忘记了哪个操作线程了,就是步骤3里的那个SubscribeTask ,而SubscribeTask 的run()方法里的source.subscribe(parent),就这一句代码,就串起了整个观察者模式(其实就是连接了观察者和被观察者而已),如果不能够理解这里,可以回头看看前两篇文章

    相关文章

      网友评论

          本文标题:Rxjava2.2.1(3) subscribeOn 线程切换-

          本文链接:https://www.haomeiwen.com/subject/jvbtjctx.html