RxJava mini

作者: Dsiner | 来源:发表于2018-07-19 17:02 被阅读70次

    背景

    文章通过接口层表象来实现一个简版且稳定的Rxjava mini。给予一个台阶,当你读完文章的末尾,希望你有一探RxJava欲望与信心。

    目标

    1. TaskScheduler.executeMain(...); //主线程, 执行任务
    2. TaskScheduler.executeTask(...); //子线程, 线程池执行任务
    3. TaskScheduler.executeSingle(...); //子线程, 单线程执行任务
    4. TaskScheduler.create(...); //任务调度

    项目

    设计

    1. .func(...).func(...).func(...)...顺序流执行
    2. .observeOn(...)线程切换

    效果图

            TaskScheduler.create(new Task<List<String>>() {
                @Override
                public List<String> run() {
                    ...do something in io thread
                    return new ArrayList<>();
                }
            }).subscribeOn(Schedulers.io())
                    .observeOn(Schedulers.newThread())
                    .map(new Function<List<String>, String>() {
                        @Override
                        public String apply(@NonNull List<String> strings) throws Exception {
                            ...do something in new thread, such as time-consuming, map conversion, etc.
                            return "";
                        }
                    })
                    .observeOn(Schedulers.io())
                    .map(new Function<String, Boolean>() {
                        @Override
                        public Boolean apply(@NonNull String s) throws Exception {
                            ...do something in io thread, such as time-consuming, map conversion, etc.
                            return true;
                        }
                    })
                    ...
                    .observeOn(Schedulers.mainThread())
                    .subscribe(new Observer<Boolean>() {
                        @Override
                        public void onNext(@NonNull Boolean result) {
                            ...do something in main thread
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            ...do something in main thread
                        }
                    });
    

    分析

    1. 线程
    2. 线程切换
    3. 任务调度

    1. 线程

    public class TaskManager {
        private static TaskManager ins;
    
        private Handler mainHandler;
        private ExecutorService cachedThreadPool;
        private ExecutorService singleThreadExecutor;
    
        private TaskManager() {
            mainHandler = new Handler(Looper.getMainLooper());
            cachedThreadPool = Executors.newCachedThreadPool();
            singleThreadExecutor = Executors.newSingleThreadExecutor();
        }
    
        static TaskManager getIns() {
            if (ins == null) {
                synchronized (TaskManager.class) {
                    if (ins == null) {
                        ins = new TaskManager();
                    }
                }
            }
            return ins;
        }
    
        /**
         * Execute sync task in main thread
         */
        void executeMain(Runnable runnable) { mainHandler.post(runnable); }
    
        /**
         * Execute async task in cached thread pool
         */
        void executeTask(Runnable runnable) { cachedThreadPool.execute(runnable); }
    
        /**
         * Execute async task in single thread pool
         */
        void executeSingle(Runnable runnable) { singleThreadExecutor.execute(runnable); }
    
        /**
         * Execute async task in a new thread
         */
        void executeNew(Runnable runnable) { new Thread(runnable).start(); }
    }
    

    线程切换的方法:抛runnable到相应线程,由线程来调度执行runnable,runnable中的方法即在相应线程中执行。
    如无这样的显式切换线程,代码流(无论多少次方法递归调用)将在当前线程一直执行下去。同一线程,代码总是顺序的执行。

    Log.d("Current Thread", Thread.currentThread().getId() + "--NAME--" + Thread.currentThread().getName());
    

    通过这行代码可以打印出当前在那一个线程。主线程的getName是main。

            new Thread(() -> {
                    // Code block 1
                    Log.d("Current Thread", Thread.currentThread().getId() + "--NAME--" + Thread.currentThread().getName());
                    ...
                    new Handler(Looper.getMainLooper()).post(new Runnable() {
                        @Override
                        public void run() {
                            // Code block 2
                            Log.d("Current Thread", Thread.currentThread().getId() + "--NAME--" + Thread.currentThread().getName());
                            ...
                        }
                    });
            }).start();
    

    这是一个通常的代码形式
    Code block 1处在一个子线程中执行代码,通过new Handler(Looper.getMainLooper()).post(...)向主线程抛入一个runnable,runnable进入主线程消息队列,然后等主线程消息队列取出该runnable执行时,Code line 2处代码即在主线程中执行。
    Code block 1与Code block 2在时间上并行执行。线程池同理。

    public class TaskScheduler<T> {
        public static void executeMain(Runnable runnable) { TaskManager.getIns().executeMain(runnable); }
    
        public static void executeTask(Runnable runnable) { TaskManager.getIns().executeTask(runnable); }
    
        public static void executeSingle(Runnable runnable) { TaskManager.getIns().executeSingle(runnable); }
    
        ...
    }
    

    通过单例简单包装,实现目标1、2、3

    2. 线程切换

        /**
         * Switch thread
         * scheduler 线程枚举,int类型: defaultThread、newThread、io、mainThread
         */
        public static void switchThread(@Scheduler int scheduler, final Runnable runnable) {
            if (scheduler == NEW_THREAD) {
                new Thread(() -> {
                        if (runnable != null) {
                            runnable.run();
                        }
                }).start();
                return;
            } else if (scheduler == IO) {
                TaskScheduler.executeTask(() -> {
                        if (runnable != null) {
                            runnable.run();
                        }
                });
                return;
            } else if (scheduler == MAIN_THREAD) {
                if (!isMainThread()) {
                    TaskScheduler.executeMain(() -> {
                            if (runnable != null) {
                                runnable.run();
                            }
                    });
                    return;
                }
            }
            if (runnable != null) {
                runnable.run();
            }
        }
    
        public static boolean isMainThread() {
            return Looper.getMainLooper().getThread() == Thread.currentThread();
        }
    

    3. 任务调度

    3.1 开始前的准备

    我们先来定义3个接口

    interface.png

    然后是2个对应的包装类,后面会用到

    Task -> TaskEmitter
    Function -> FunctionEmitter

    public class Emitter {
        public int scheduler;
    }
    
    public class TaskEmitter<T> extends Emitter {
        public Task<T> task;
    
        public TaskEmitter(Task<T> task, @Schedulers.Scheduler int scheduler) {
            this.task = task;
            this.scheduler = scheduler;
        }
    }
    
    public class FunctionEmitter<T, R> extends Emitter {
        public Function<? super T, ? extends R> function;
    
        public FunctionEmitter(Function<? super T, ? extends R> function, @Schedulers.Scheduler int scheduler) {
            this.function = function;
            this.scheduler = scheduler;
        }
    }
    

    3.2 Create

    开始前,我们知道一些开源库如Glide,惯用.with(...)形式,这种方式实质:静态方法 + return new Instance(),
    这里我们也用这种模式来开始create(...)。

    实现分三步走

    Step 1: Create

        public static <T> TaskScheduler<T> create(final Task<T> task) {
            TaskScheduler<T> schedulers = new TaskScheduler<T>();
            schedulers.task = task;
            return schedulers;
        }
    

    创建TaskScheduler实例,持有 源任务task

        public TaskObserve<T> subscribeOn(@Schedulers.Scheduler int scheduler) {
            this.subscribeScheduler = scheduler;
            return new TaskObserve<T>(new TaskEmitter<T>(task, subscribeScheduler));
        }
    

    指定 源任务task 执行所在线程,丢弃当前TaskScheduler实例。
    源任务task线程枚举 注入TaskEmitter后,返回新的实例TaskObserve,后续逻辑全由TaskObserve处理

    Step 2: TaskObserve中间件

    public static class TaskObserve<T> {
            private TaskEmitter taskEmitter;
            private List<FunctionEmitter> emitters;
            private int observeOnScheduler = Schedulers.defaultThread();
    
            TaskObserve(TaskEmitter<T> taskEmitter) {
                this.taskEmitter = taskEmitter;
                this.emitters = new ArrayList<>();
            }
    
            ...
    }
    

    TaskObserve: 中间件,初始和map转换时生成,包含以下成员
    taskEmitter: 源任务
    emitters: 转换队列,map转换时递增
    observeOnScheduler: 线程枚举,observeOn观察者所在线程,可重复调用,当然只保留最后一次指定的线程

            TaskObserve(TaskObserve middle) {
                this.taskEmitter = middle.taskEmitter;
                this.observeOnScheduler = middle.observeOnScheduler;
                this.emitters = middle.emitters;
            }
    
            public <TR> TaskObserve<TR> map(Function<? super T, ? extends TR> f) {
                this.emitters.add(new FunctionEmitter<T, TR>(f, observeOnScheduler));
                return new TaskObserve<TR>(this);
            }
    

    map转换时,将 转换体Function 、当前 线程枚举 observeOnScheduler注入 FunctionEmitter ,添加到 转换队列
    返回新的实例TaskObserve,丢弃当前TaskObserve实例,新实例线程枚举observeOnScheduler默认为默认线程

    Step 3: Subscribe,才是开始!!!

    核心思想

    1. 先执行 源任务 ,返回值
    2. 递归从 转换队列 取出 FunctionEmitter (含有转换体、线程枚举),Schedulers.switchThread(...)指定线程执行,转换返回值
    3. 转换队列 执行尽,提交任务,任务结束
            public void subscribe(final Observer<T> callback) {
                // 指定源任务线程枚举
                Schedulers.switchThread(taskEmitter.scheduler, () -> {
                        try {
                            // 执行源任务
                            Object t = taskEmitter.task.run();
                            // 转换队列是否为空
                            if (assertInterrupt(t)) {
                                // 转换队列空,提交本次任务,任务结束
                                submit(t, callback);
                                return;
                            }
                            // 转换队列不为空,继续转换
                            apply(t, emitters, callback);
                        } catch (Throwable e) {
                            // 任务流抛出异常,即时中断,任务结束
                            error(e, callback);
                        }
                });
            }
    
            private boolean assertInterrupt(Object emitter) throws Exception {
                if (emitter == null) {
                    // 转换返回值,不能为Null!!!
                    throw new RuntimeException("Apply output must not be null!");
                }
                return emitters.size() <= 0;
            }
    

    assertInterrupt判断当前转换队列,是否执行尽了

    Step 3 - 1: Apply转换队列转换

            private <E, F> void apply(final E o, final List<FunctionEmitter> emitters, final Observer<F> callback) {
                // 依次从转换队列取出FunctionEmitter,然后移除
                final FunctionEmitter<E, F> f = emitters.get(0);
                emitters.remove(f);
                // 指定当前转换线程枚举
                Schedulers.switchThread(f.scheduler, () -> {
                        try {
                            // 转换,返回转换值
                            Object emitter = f.function.apply(o);
                            // 转换队列是否为空
                            if (assertInterrupt(emitter)) {
                                // 转换队列空,提交本次任务,任务结束
                                submit(emitter, callback);
                                return;
                            }
                            // 转换队列不为空,继续转换
                            apply(emitter, emitters, callback);
                        } catch (Throwable e) {
                            // 任务流抛出异常,即时中断,任务结束
                            error(e, callback);
                        }
                });
            }
    

    Step 3 - 2: Submit提交

            private <S> void submit(final Object result, final Observer<S> callback) {
                // 指定当前转换线程枚举,即当前中间件线程枚举observeOnScheduler
                Schedulers.switchThread(observeOnScheduler, () -> {
                        try {
                            if (callback != null) {
                                // 成功,任务结束
                                callback.onNext((S) result);
                            }
                        } catch (Throwable e) {
                            error(e, callback);
                        }
                });
            }
    
            private <S> void error(final Throwable e, final Observer<S> callback) {
                // 指定当前转换线程枚举,即当前中间件线程枚举observeOnScheduler
                Schedulers.switchThread(observeOnScheduler, () -> {
                        if (callback != null) {
                            // 出错,任务结束
                            callback.onError(e);
                        }
                });
            }
    

    小结:

    泛型: java泛型属于类型擦除,无论T、F还是R...,最终都是Object,所以我们可以不用泛型,用Object。
    设计: 这里的 任务流 实现方式为递归嵌套调用。

    相关文章

      网友评论

        本文标题:RxJava mini

        本文链接:https://www.haomeiwen.com/subject/lygtmftx.html