需要注意的分析点
1.装饰模式 对 上一步observable.create生成的对象 进行包装
2.NewThreadScheduler
3.RxThreadFactory--ThreadFactory
4.CachedWorkerPool---Runnable
5.createWorker----NewThreadWorker.
6.DisposeTask--Runnable
- w.schedule(task, delay, unit)在线程中执行的scheduleActual(action, delayTime, unit, null);
一、RxJava的线程调度
在RxJava中,要指定上游事件触发的线需要通过subscribeOn方法传入schedulers.
Observable observable = Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(@NonNull ObservableEmitter e) throws Exception {
e.onNext(1);
e.onComplete();
}
}).subscribeOn(Schedulers.newThread());
二、subscribleOn包装事件
1.onAssembly是一个hock方法,如果在subscribeOn前没有使用其他操作符转换,那么就会返回一个new ObservableSubscribeOn<T>(this, scheduler)对象。
2.ObservableSubscribeOn<T>(this, scheduler)。this表示的是Observable.create对象创建的ObservableCreate对象
而scheduler就是我们的Schedulers.newThread()对象
3.ObservableCreate和ObservableSubscribeOn对象都继承了Observable对象,这是典型的装饰模式。目的是对刚创建的Observable进行包装进行包装。
所以subscribleOn返回的是一个ObservableSubscribeOn对象
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
三、ObservableSubscribeOn下发事件
RxJava的事件流下发是在发生订阅事件后(subscribe)方法,而真正执行下发的是subscribeActual方法(查看父类的subscribe方法)
1.在subscribeActual方法中,首先对下游的observer进行了包装
2.调用了下游observer的onSubscribe方法,所以这个方法是在主线程中调用的
3. SubscribeOnObserver是对下游observer进行包装
4. SubscribeTask是一个Runnable,负责触发上游observable对下游observer元素的订阅(事件业务触发及传递)。
5.那么scheduler.scheduleDirect(Runnable)方法一定是负责开启线程的类,通过上面的代码知道scheduler是NewThreadScheduler
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
//上游的onSubscribe方法仍然在主线程中
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
/**
* 创建一个线程,负责执行subscribe方法,subscribe方法内部通常
* 调用了onNext,onComplete等方法,这样上游的所有方法都会在一个新的线程中执行
*/
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
四、Schedule的使用
1.NewThreadScheduler首先创建了一个RxThreadFactory的线程工厂
2.将ThreadFactory交给了Worker,Worker是实际执行线程的地方
3.NewThreadWorker内部通过ScheduledExecutorService来管理线程,ScheduledExecutorService是一个可以将周期性任务通过线程池来执行的类,所以scheduleDirect传入0毫秒,表示立即执行
4.schedule调用worker的方法来执行线程
//表示立即执行
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
父类的方法
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
//调用子类的方法创建worker
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
//对runnable进行包装
DisposeTask task = new DisposeTask(decoratedRun, w);
//执行线程
w.schedule(task, delay, unit);
return task;
}
final ThreadFactory threadFactory;
//创建线程时,给线程追加的前缀
private static final String THREAD_NAME_PREFIX = "RxNewThreadScheduler";
private static final RxThreadFactory THREAD_FACTORY;
/** The name of the system property for setting the thread priority for this Scheduler. */
private static final String KEY_NEWTHREAD_PRIORITY = "rx2.newthread-priority";
static {
int priority = Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY,
Integer.getInteger(KEY_NEWTHREAD_PRIORITY, Thread.NORM_PRIORITY)));
THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX, priority);
}
public NewThreadScheduler() {
this(THREAD_FACTORY);
}
public NewThreadScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
}
@NonNull
@Override
public Worker createWorker() {
return new NewThreadWorker(threadFactory);
}
RxThreadFactory:负责给创建的线程追加前缀,以及通过AtomicLong来管理创建的线程个数
/**
* A ThreadFactory that counts how many threads have been created and given a prefix,
* sets the created Thread's name to {@code prefix-count}.
*/
public final class RxThreadFactory extends AtomicLong implements ThreadFactory {
.....
@Override
public Thread newThread(Runnable r) {
StringBuilder nameBuilder = new StringBuilder(prefix).append('-').append(incrementAndGet());
String name = nameBuilder.toString();
Thread t = nonBlocking ? new RxCustomThread(r, name) : new Thread(r, name);
t.setPriority(priority);
t.setDaemon(true);
return t;
}
....
}
NewThreadWorker内部有一个ScheduledExecutorService来管理线程任务,可以延时,立即或是周期性执行
1.内部有ScheduledExecutorService执行周期性任务
2.scheduleActual是真正执行线程的方法,这过程中也对runnable进行了一定的封装
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
private final ScheduledExecutorService executor;
public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);
}
....
//
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
if (delayTime <= 0) {
// 执行任务
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
....
}
总结
subscribeOn方法会对上游创建的Obserable对象进行一次包装,当完成对下游事件的订阅时,会触发它的subscribeActual方法,而这个发放内部会启动一个线程去触发obserable的subscribe方法。这样上游的所有事件都发生在指定的线程中了
网友评论