背景
文章通过接口层表象来实现一个简版且稳定的Rxjava mini。给予一个台阶,当你读完文章的末尾,希望你有一探RxJava欲望与信心。
目标
- TaskScheduler.executeMain(...); //主线程, 执行任务
- TaskScheduler.executeTask(...); //子线程, 线程池执行任务
- TaskScheduler.executeSingle(...); //子线程, 单线程执行任务
- TaskScheduler.create(...); //任务调度
项目
设计
- .func(...).func(...).func(...)...顺序流执行
- .observeOn(...)线程切换
效果图
TaskScheduler.create(new Task<List<String>>() {
@Override
public List<String> run() {
...do something in io thread
return new ArrayList<>();
}
}).subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.map(new Function<List<String>, String>() {
@Override
public String apply(@NonNull List<String> strings) throws Exception {
...do something in new thread, such as time-consuming, map conversion, etc.
return "";
}
})
.observeOn(Schedulers.io())
.map(new Function<String, Boolean>() {
@Override
public Boolean apply(@NonNull String s) throws Exception {
...do something in io thread, such as time-consuming, map conversion, etc.
return true;
}
})
...
.observeOn(Schedulers.mainThread())
.subscribe(new Observer<Boolean>() {
@Override
public void onNext(@NonNull Boolean result) {
...do something in main thread
}
@Override
public void onError(Throwable e) {
...do something in main thread
}
});
分析
- 线程
- 线程切换
- 任务调度
1. 线程
public class TaskManager {
private static TaskManager ins;
private Handler mainHandler;
private ExecutorService cachedThreadPool;
private ExecutorService singleThreadExecutor;
private TaskManager() {
mainHandler = new Handler(Looper.getMainLooper());
cachedThreadPool = Executors.newCachedThreadPool();
singleThreadExecutor = Executors.newSingleThreadExecutor();
}
static TaskManager getIns() {
if (ins == null) {
synchronized (TaskManager.class) {
if (ins == null) {
ins = new TaskManager();
}
}
}
return ins;
}
/**
* Execute sync task in main thread
*/
void executeMain(Runnable runnable) { mainHandler.post(runnable); }
/**
* Execute async task in cached thread pool
*/
void executeTask(Runnable runnable) { cachedThreadPool.execute(runnable); }
/**
* Execute async task in single thread pool
*/
void executeSingle(Runnable runnable) { singleThreadExecutor.execute(runnable); }
/**
* Execute async task in a new thread
*/
void executeNew(Runnable runnable) { new Thread(runnable).start(); }
}
线程切换的方法:抛runnable到相应线程,由线程来调度执行runnable,runnable中的方法即在相应线程中执行。
如无这样的显式切换线程,代码流(无论多少次方法递归调用)将在当前线程一直执行下去。同一线程,代码总是顺序的执行。
Log.d("Current Thread", Thread.currentThread().getId() + "--NAME--" + Thread.currentThread().getName());
通过这行代码可以打印出当前在那一个线程。主线程的getName是main。
new Thread(() -> {
// Code block 1
Log.d("Current Thread", Thread.currentThread().getId() + "--NAME--" + Thread.currentThread().getName());
...
new Handler(Looper.getMainLooper()).post(new Runnable() {
@Override
public void run() {
// Code block 2
Log.d("Current Thread", Thread.currentThread().getId() + "--NAME--" + Thread.currentThread().getName());
...
}
});
}).start();
这是一个通常的代码形式
Code block 1处在一个子线程中执行代码,通过new Handler(Looper.getMainLooper()).post(...)向主线程抛入一个runnable,runnable进入主线程消息队列,然后等主线程消息队列取出该runnable执行时,Code line 2处代码即在主线程中执行。
Code block 1与Code block 2在时间上并行执行。线程池同理。
public class TaskScheduler<T> {
public static void executeMain(Runnable runnable) { TaskManager.getIns().executeMain(runnable); }
public static void executeTask(Runnable runnable) { TaskManager.getIns().executeTask(runnable); }
public static void executeSingle(Runnable runnable) { TaskManager.getIns().executeSingle(runnable); }
...
}
通过单例简单包装,实现目标1、2、3
2. 线程切换
/**
* Switch thread
* scheduler 线程枚举,int类型: defaultThread、newThread、io、mainThread
*/
public static void switchThread(@Scheduler int scheduler, final Runnable runnable) {
if (scheduler == NEW_THREAD) {
new Thread(() -> {
if (runnable != null) {
runnable.run();
}
}).start();
return;
} else if (scheduler == IO) {
TaskScheduler.executeTask(() -> {
if (runnable != null) {
runnable.run();
}
});
return;
} else if (scheduler == MAIN_THREAD) {
if (!isMainThread()) {
TaskScheduler.executeMain(() -> {
if (runnable != null) {
runnable.run();
}
});
return;
}
}
if (runnable != null) {
runnable.run();
}
}
public static boolean isMainThread() {
return Looper.getMainLooper().getThread() == Thread.currentThread();
}
3. 任务调度
3.1 开始前的准备
我们先来定义3个接口
interface.png然后是2个对应的包装类,后面会用到
Task -> TaskEmitter
Function -> FunctionEmitter
public class Emitter {
public int scheduler;
}
public class TaskEmitter<T> extends Emitter {
public Task<T> task;
public TaskEmitter(Task<T> task, @Schedulers.Scheduler int scheduler) {
this.task = task;
this.scheduler = scheduler;
}
}
public class FunctionEmitter<T, R> extends Emitter {
public Function<? super T, ? extends R> function;
public FunctionEmitter(Function<? super T, ? extends R> function, @Schedulers.Scheduler int scheduler) {
this.function = function;
this.scheduler = scheduler;
}
}
3.2 Create
开始前,我们知道一些开源库如Glide,惯用.with(...)形式,这种方式实质:静态方法 + return new Instance(),
这里我们也用这种模式来开始create(...)。
实现分三步走
Step 1: Create
public static <T> TaskScheduler<T> create(final Task<T> task) {
TaskScheduler<T> schedulers = new TaskScheduler<T>();
schedulers.task = task;
return schedulers;
}
创建TaskScheduler实例,持有 源任务task
public TaskObserve<T> subscribeOn(@Schedulers.Scheduler int scheduler) {
this.subscribeScheduler = scheduler;
return new TaskObserve<T>(new TaskEmitter<T>(task, subscribeScheduler));
}
指定 源任务task
执行所在线程,丢弃当前TaskScheduler实例。
源任务task
、 线程枚举
注入TaskEmitter后,返回新的实例TaskObserve,后续逻辑全由TaskObserve处理
Step 2: TaskObserve中间件
public static class TaskObserve<T> {
private TaskEmitter taskEmitter;
private List<FunctionEmitter> emitters;
private int observeOnScheduler = Schedulers.defaultThread();
TaskObserve(TaskEmitter<T> taskEmitter) {
this.taskEmitter = taskEmitter;
this.emitters = new ArrayList<>();
}
...
}
TaskObserve
: 中间件
,初始和map转换时生成,包含以下成员
taskEmitter
: 源任务
emitters
: 转换队列
,map转换时递增
observeOnScheduler
: 线程枚举
,observeOn观察者所在线程,可重复调用,当然只保留最后一次指定的线程
TaskObserve(TaskObserve middle) {
this.taskEmitter = middle.taskEmitter;
this.observeOnScheduler = middle.observeOnScheduler;
this.emitters = middle.emitters;
}
public <TR> TaskObserve<TR> map(Function<? super T, ? extends TR> f) {
this.emitters.add(new FunctionEmitter<T, TR>(f, observeOnScheduler));
return new TaskObserve<TR>(this);
}
map转换时,将 转换体Function
、当前 线程枚举
observeOnScheduler注入 FunctionEmitter
,添加到 转换队列
。
返回新的实例TaskObserve,丢弃当前TaskObserve实例,新实例线程枚举observeOnScheduler默认为默认线程
Step 3: Subscribe,才是开始!!!
核心思想
- 先执行
源任务
,返回值- 递归从
转换队列
取出FunctionEmitter
(含有转换体、线程枚举),Schedulers.switchThread(...)指定线程执行,转换返回值转换队列
执行尽,提交任务,任务结束
public void subscribe(final Observer<T> callback) {
// 指定源任务线程枚举
Schedulers.switchThread(taskEmitter.scheduler, () -> {
try {
// 执行源任务
Object t = taskEmitter.task.run();
// 转换队列是否为空
if (assertInterrupt(t)) {
// 转换队列空,提交本次任务,任务结束
submit(t, callback);
return;
}
// 转换队列不为空,继续转换
apply(t, emitters, callback);
} catch (Throwable e) {
// 任务流抛出异常,即时中断,任务结束
error(e, callback);
}
});
}
private boolean assertInterrupt(Object emitter) throws Exception {
if (emitter == null) {
// 转换返回值,不能为Null!!!
throw new RuntimeException("Apply output must not be null!");
}
return emitters.size() <= 0;
}
assertInterrupt判断当前转换队列,是否执行尽了
Step 3 - 1: Apply转换队列转换
private <E, F> void apply(final E o, final List<FunctionEmitter> emitters, final Observer<F> callback) {
// 依次从转换队列取出FunctionEmitter,然后移除
final FunctionEmitter<E, F> f = emitters.get(0);
emitters.remove(f);
// 指定当前转换线程枚举
Schedulers.switchThread(f.scheduler, () -> {
try {
// 转换,返回转换值
Object emitter = f.function.apply(o);
// 转换队列是否为空
if (assertInterrupt(emitter)) {
// 转换队列空,提交本次任务,任务结束
submit(emitter, callback);
return;
}
// 转换队列不为空,继续转换
apply(emitter, emitters, callback);
} catch (Throwable e) {
// 任务流抛出异常,即时中断,任务结束
error(e, callback);
}
});
}
Step 3 - 2: Submit提交
private <S> void submit(final Object result, final Observer<S> callback) {
// 指定当前转换线程枚举,即当前中间件线程枚举observeOnScheduler
Schedulers.switchThread(observeOnScheduler, () -> {
try {
if (callback != null) {
// 成功,任务结束
callback.onNext((S) result);
}
} catch (Throwable e) {
error(e, callback);
}
});
}
private <S> void error(final Throwable e, final Observer<S> callback) {
// 指定当前转换线程枚举,即当前中间件线程枚举observeOnScheduler
Schedulers.switchThread(observeOnScheduler, () -> {
if (callback != null) {
// 出错,任务结束
callback.onError(e);
}
});
}
小结:
泛型: java泛型属于类型擦除,无论T、F还是R...,最终都是Object,所以我们可以不用泛型,用Object。
设计: 这里的 任务流
实现方式为递归嵌套调用。
网友评论