美文网首页Android开发
Rxjava2的subscribeOn是如何实现线程切换的

Rxjava2的subscribeOn是如何实现线程切换的

作者: 你的益达233 | 来源:发表于2021-11-20 17:14 被阅读0次
背景:

在实际开发中

  1. .subscribeOn(Schedulers.newThread())
  2. .subscribeOn(Schedulers.io())
    这样的代码随处可见,因为默认不设置就会在主线程执行。
要弄明白两个问题
  1. 子线程是如何创建的,怎么切换过去的
  2. .subscribeOn为什么写多次只有第一次起作用
前提:
implementation 'io.reactivex.rxjava2:rxjava:2.2.21'
implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'
源码分析
Observable.create(new ObservableOnSubscribe<Integer>() {

        @Override
        public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onComplete();
        }
    })
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Observer<Integer>() {
        @Override
        public void onSubscribe(@NonNull Disposable d) {
            Log.d(TAG, "开始采用subscribe连接");
        }

        @Override
        public void onNext(@NonNull Integer integer) {
            Log.d(TAG, "对Next事件" + integer + "作出响应");
        }

        @Override
        public void onError(@NonNull Throwable e) {

        }

        @Override
        public void onComplete() {
            Log.d(TAG, "对Complete事件作出响应");
        }
    });

我们看下subscribeOn方法:

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}  

关键点1:这里有两个参数一个是this,一个是scheduler。
这个this是什么呢,就是Observable,就是被观察者,对于上面例子来说就是ObservableCreate对象。
scheduler是Schedulers.newThread()

点击ObservableSubscribeOn类看看

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;

public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
    super(source);
    this.scheduler = scheduler;
}

@Override
public void subscribeActual(final Observer<? super T> observer) {
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);

    observer.onSubscribe(parent);

    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

关键点2:看到没有它实现了subscribeActual方法,可以看下它的父类,它也是Observable的子类。即到时.subscribe订阅后,执行的是这里的方法。不是ObservableCreate里面的subscribeActual方法了。
关键点3:可能你会疑问上面调了ObservableSubscribeOn这个子类的方法,是不是就没执行ObservableCreate的方法了。这个你放心,上面是不是传了source下来嘛?它就是ObservableCreate

分析最关键代码:
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));  

它就是使得被观察者发送的事件发生在子线程的代码
点击SubscribeTask看下:

final class SubscribeTask implements Runnable {
    private final SubscribeOnObserver<T> parent;

    SubscribeTask(SubscribeOnObserver<T> parent) {
        this.parent = parent;
    }

    @Override
    public void run() {
        source.subscribe(parent);
    }
}  

它实现了Runnable接口,在run()方法中调source.subscribe(parent)方法。
关键点4:从这就可以看出只要这个Runnable在某个子线程执行,那么所有事件只能在这个子线程中执行了
再往前看下scheduleDirect方法:

@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
    return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}  

@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    final Worker w = createWorker();

    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

    DisposeTask task = new DisposeTask(decoratedRun, w);

    w.schedule(task, delay, unit);

    return task;
}

请盯着Runnable这个参数,因为它决定着被观察的事件在那发生。可以看出它最终是被Worker类执行。可惜它是个抽象类,到这就没法继续往下走了。
关键点5:被观察者事件在run方法中被Worker类执行,接下来找到Worker具体实现类即可

下面让目光回到最开始的

.subscribeOn(Schedulers.newThread())  

看下Schedulers.newThread()是什么东西

@NonNull
public static Scheduler newThread() {
    return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD);
}

点击NEW_THREAD,类中搜下它会发现

NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());  

点击NewThreadTask

 static final class NewThreadTask implements Callable<Scheduler> {
    @Override
    public Scheduler call() throws Exception {
        return NewThreadHolder.DEFAULT;
    }
}

点击NewThreadHolder.DEFAULT

static final class NewThreadHolder {
    static final Scheduler DEFAULT = new NewThreadScheduler();
}  

再点击NewThreadScheduler,点了这么是不是累了,很快就到真相地方了(一般人还不这样手把手教学的呢)

public final class NewThreadScheduler extends Scheduler {

    final ThreadFactory threadFactory;

    private static final String THREAD_NAME_PREFIX = "RxNewThreadScheduler";
    private static final RxThreadFactory THREAD_FACTORY;

    /** The name of the system property for setting the thread priority for this Scheduler. */
    private static final String KEY_NEWTHREAD_PRIORITY = "rx2.newthread-priority";

    static {
        int priority = Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY,
                Integer.getInteger(KEY_NEWTHREAD_PRIORITY, Thread.NORM_PRIORITY)));

        THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX, priority);
    }

    public NewThreadScheduler() {
        this(THREAD_FACTORY);
    }

    public NewThreadScheduler(ThreadFactory threadFactory) {
        this.threadFactory = threadFactory;
    }

    @NonNull
    @Override
    public Worker createWorker() {
        return new NewThreadWorker(threadFactory);
    }
}

看到没,它实现了createWorker方法。
触发时机就是之前代码的

@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    final Worker w = createWorker();  

点击NewThreadWorker类:

public class NewThreadWorker extends Scheduler.Worker implements Disposable {
private final ScheduledExecutorService executor;

volatile boolean disposed;

public NewThreadWorker(ThreadFactory threadFactory) {
    executor = SchedulerPoolFactory.create(threadFactory);
}

@NonNull
@Override
public Disposable schedule(@NonNull final Runnable run) {
    return schedule(run, 0, null);
}

@NonNull
@Override
public Disposable schedule(@NonNull final Runnable action, long delayTime, @NonNull TimeUnit unit) {
    if (disposed) {
        return EmptyDisposable.INSTANCE;
    }
    return scheduleActual(action, delayTime, unit, null);
}


public Disposable scheduleDirect(final Runnable run, long delayTime, TimeUnit unit) {
    ScheduledDirectTask task = new ScheduledDirectTask(RxJavaPlugins.onSchedule(run));
    try {
        Future<?> f;
        if (delayTime <= 0L) {
            f = executor.submit(task);
        } else {
            f = executor.schedule(task, delayTime, unit);
        }
        task.setFuture(f);
        return task;
    } catch (RejectedExecutionException ex) {
        RxJavaPlugins.onError(ex);
        return EmptyDisposable.INSTANCE;
    }
}


public Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, long period, TimeUnit unit) {
    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    if (period <= 0L) {

        InstantPeriodicTask periodicWrapper = new InstantPeriodicTask(decoratedRun, executor);
        try {
            Future<?> f;
            if (initialDelay <= 0L) {
                f = executor.submit(periodicWrapper);
            } else {
                f = executor.schedule(periodicWrapper, initialDelay, unit);
            }
            periodicWrapper.setFirst(f);
        } catch (RejectedExecutionException ex) {
            RxJavaPlugins.onError(ex);
            return EmptyDisposable.INSTANCE;
        }

        return periodicWrapper;
    }
    ScheduledDirectPeriodicTask task = new ScheduledDirectPeriodicTask(decoratedRun);
    try {
        Future<?> f = executor.scheduleAtFixedRate(task, initialDelay, period, unit);
        task.setFuture(f);
        return task;
    } catch (RejectedExecutionException ex) {
        RxJavaPlugins.onError(ex);
        return EmptyDisposable.INSTANCE;
    }
}

关键点6:构造方法中通过线程池生成一个新线程(不是主线程都是子线程),回答了问题1前半部分
其中代码:

 executor.submit((Callable<Object>)sr);

之前叫你盯着的Runnable是不是在这执行了。回答了问题1的后半部分

到此源码分析完

至于问题2,就不需过多回答了吧。比如我碰到了第一个subscribeOn,我的被观察者事件已经在这线程中执行发送了,后面你再写一个还有什么意思呢。

相关文章

网友评论

    本文标题:Rxjava2的subscribeOn是如何实现线程切换的

    本文链接:https://www.haomeiwen.com/subject/nshutrtx.html