美文网首页Android-RxJavavxg's androidAndroid知识
自己动手实现 RxJava 理解其调用链

自己动手实现 RxJava 理解其调用链

作者: 风风风筝 | 来源:发表于2016-10-17 07:59 被阅读700次

    RxJava 拥有繁多的 API 和复杂的逻辑链,学习复杂的知识,一般从整体再到具体,为了学习 RxJava 的原理,参考其源码,自己动手实现一个简化的 RxJava,旨在理解调用链

    阅读本文,建议先下载代码 LittleRx,毕竟在IDE里阅读代码比在网页上要清晰得多,也可以看下打印的日志

    最主要的4个类:Observable、OnSubscribe、Operator、Subscriber

    1、最简单的,创建一个Observable,然后订阅

    Observable
            .create(new OnSubscribe<Integer>() {
                @Override
                public void call(Subscriber<? super Integer> subscriber) {
                    subscriber.onNext(1);
                }
            })
            .subscribe(new Subscriber<Integer>() {
                @Override
                public void onNext(Integer integer) {
                    System.out.println(integer);
                }
            });
    
    public class Observable<T> {
        private OnSubscribe<T> onSubscribe;
    
        private Observable(OnSubscribe<T> onSubscribe) {
            this.onSubscribe = onSubscribe;
        }
    
        public final void subscribe(Subscriber<? super T> subscriber) {
            onSubscribe.call(subscriber);
        }
    
        public static <T> Observable<T> create(OnSubscribe<T> onSubscribe) {
            return new Observable<>(onSubscribe);
        }
    }
    

    这里可以看出 subscribe(subscriber)-->onSubscribe.call(subscriber),所以没有订阅动作就不会触发 OnSubscribe.call()

    2、map 和 lift

    Observable
            .create(new OnSubscribe<Integer>() {
                @Override
                public void call(Subscriber<? super Integer> subscriber) {
                    subscriber.onNext(1);
                }
            })
            .map(new Func1<Integer, String>() {
                @Override
                public String call(Integer integer) {
                    return "map" + integer;
                }
            })
            .subscribe(new Subscriber<String>() {
                @Override
                public void onNext(String s) {
                    System.out.println(s);
                }
            });
    

    非链式的写法

    OnSubscribe<Integer> onSubscribe = new OnSubscribe<>();
    Observable<Integer> observable = Observable.create(onSubscribe);
    Func1<Integer, String> func = new Func1<>();
    Observable<String> observable2 = observable.map(func);
    Subscriber<String> subscriber = new Subscriber<>();
    observable2.subscribe(subscriber);
    

    create() 跟之前一样,那么 map() 做了什么

    public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
        return lift(new OperatorMap<T, R>(func));
    }
    
    public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
        return new Observable<R>(new OnSubscribeLift<T, R>(onSubscribe, operator));
    }
    

    lift() 根据上一个 observable 的 onSubscribe 创建一个新的 OnSubscribeLift 返回一个新的 observable2,上面我们说过 subscribe(subscriber)-->onSubscribe.call(subscriber),所以我们接着看 OnSubscribeLift

    public class OnSubscribeLift<T, R> implements OnSubscribe<R> {
    
        final OnSubscribe<T> parent;
        final Operator<? extends R, ? super T> operator;
    
        public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator) {
            this.parent = parent;
            this.operator = operator;
        }
    
        // 先不用关心具体实现,下面讲到再看
        @Override
        public void call(Subscriber<? super R> r) {
            Subscriber<? super T> st = operator.call(r); // 这个 operator 就是 OperatorMap
            parent.call(st); // parent 就是第一个 observable 的 onSubscribe
        }
    }
    

    再看下 OperatorMap

    public final class OperatorMap<T, R> implements Operator<R, T> {
    
        final Func1<? super T, ? extends R> transformer;
    
        public OperatorMap(Func1<? super T, ? extends R> transformer) {
            this.transformer = transformer;
        }
    
        // 先不用关心具体实现,下面讲到再看
        @Override
        public Subscriber<? super T> call(final Subscriber<? super R> o) {
            return new MapSubscriber<T, R>(o, transformer);
        }
        
        private class MapSubscriber<T, R> extends Subscriber<T> {
    
            private Subscriber<? super R> actual;
            private Func1<? super T, ? extends R> transformer;
    
            public MapSubscriber(Subscriber<? super R> o, Func1<? super T, ? extends R> transformer) {
                this.actual = o;
                this.transformer = transformer;
            }
    
            // 先不用关心具体实现,下面讲到再看
            @Override
            public void onNext(T t) {
                R r = transformer.call(t);
                actual.onNext(r);
            }
        }
    
    }
    

    我们把 map() 和 lift() 都去掉,使用最基本的类来实现

    OnSubscribe<Integer> onSubscribe = new OnSubscribe<>();
    Observable<Integer> observable = new Observable<>(onSubscribe);
    Func1<Integer, String> func = new Func1<>();
    
    OperatorMap<Integer, String> operatorMap = new OperatorMap<>(func);
    
    OnSubscribeLift<Integer, String> onSubscribe2 = new OnSubscribeLift<>(onSubscribe, operatorMap);
    
    Observable<String> observable2 = new Observable<>(onSubscribe2);
    
    Subscriber<String> subscriber = new Subscriber<>();
    observable2.subscribe(subscriber);
    

    到这里,清楚了如何把第一个 Observable<Integer> 转成 Observable<String>,包括 OnSubscribe<Integer> onSubscribe 和 OnSubscribeLift<Integer, String> onSubscribe2 的关系

    那么最终的 subscribe() 如何调用到第一个 observable.call(Subscriber<Integer>) 里面的 Subscriber<Integer>.onNext(Integer) 又如何调用到最终的订阅者 subscriber<String>().onNext(String)

    **1) observable2.subscribe(subscriber) -->

    1. onSubscribe2.call(subscriber) 即 OnSubscribeLift.call(subscriber) -->
    2. Subscriber<Integer> st = operatorMap.call(subscriber) 即
    3. Subscriber<Integer> st = new MapSubscriber<Integer, String>(subscriber, func)
    4. parent.call(st) 即 onSubscribe.call(st) -->
    5. st.onNext(1) 即 MapSubscriber.onNext(1) -->
    6. String string = func.call(1)
    7. subscriber.onNext(string)**

    至此 Observable.create().map().subscribe() 的调用链就分析完了
    很多操作符本质都是 lift(),以此类推,lift() 2次

    3、subscribeOn

    Scheduler 内部比较繁杂,我们简化下,把 subscribeOn(Scheduler) 简化成 subscribeOnIO()

    Observable
            .create(new OnSubscribe<Integer>() {
                @Override
                public void call(Subscriber<? super Integer> subscriber) {
                    subscriber.onNext(1);
                }
            })
            .subscribeOnIO()
            .subscribe(new Subscriber<Integer>() {
                @Override
                public void onNext(Integer integer) {
                    System.out.println(integer);
                }
            });
    

    如何实现 subscribeOnIO() 让第一个 observable 的 onSubscribe 运行在子线程

    public final Observable<T> subscribeOnIO() {
        return create(new OnSubscribeOnIO<T>(this));
    }
    
    public final class OnSubscribeOnIO<T> implements OnSubscribe<T> {
    
        private static ExecutorService threadPool = Executors.newCachedThreadPool();
    
        final Observable<T> source;
    
        public OnSubscribeOnIO(Observable<T> source) {
            this.source = source;
        }
    
        @Override
        public void call(final Subscriber<? super T> subscriber) {
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    source.subscribe(subscriber); // --> onSubscribe.call(subscriber) --> subscriber.onNext(1)
                }
            };
            threadPool.submit(runnable);
        }
    }
    

    从上面看出 subscribeOnIO() 新建了一个线程并加入 CachedThreadPool,在子线程里订阅上一个 Observable,后续的调用都在这个线程里完成

    再考虑下复杂点的,加入 map()

    Observable
            .create(new OnSubscribe<Integer>() {
                @Override
                public void call(Subscriber<? super Integer> subscriber) {
                    System.out.println(Thread.currentThread());
                    subscriber.onNext(1);
                }
            })
            .map(new Func1<Integer, String>() {
                @Override
                public String call(Integer integer) {
                    System.out.println(Thread.currentThread());
                    return "map" + integer;
                }
            })
            .subscribeOnIO()
            .subscribe(new Subscriber<String>() {
                @Override
                public void onNext(String s) {
                    System.out.println(Thread.currentThread());
                    System.out.println(s);
                }
            });
    

    非链式的写法

    OnSubscribe<Integer> onSubscribe = new OnSubscribe<>();
    Observable<Integer> observable = new Observable<>(onSubscribe);
    Func1<Integer, String> func = new Func1<>();
    OperatorMap<Integer, String> operatorMap = new OperatorMap<>(func);
    OnSubscribeLift<Integer, String> onSubscribe2 = new OnSubscribeLift<>(onSubscribe, operatorMap);
    Observable<String> observable2 = new Observable<>(onSubscribe2);
    
    OnSubscribeOnIO<String> onSubscribe3 = new OnSubscribeOnIO(observable2);
    Observable<String> observable3 = new Observable<>(onSubscribe3);
    
    Subscriber<String> subscriber = new Subscriber<>();
    observable3.subscribe(subscriber);
    

    **1) observable3.subscribe(subscriber) -->

    1. onSubscribe3.call(subscriber) 即 OnSubscribeOnIO.call(subscriber) -->
    2. 子线程 new Runnable(){} --> observable2.subscribe(subscriber)
    3. onSubscribe2.call(subscriber) 即 OnSubscribeLift.call(subscriber) -->
    4. Subscriber<Integer> st = operatorMap.call(subscriber) 即
    5. Subscriber<Integer> st = new MapSubscriber<Integer, String>(subscriber, func)
    6. parent.call(st) 即 onSubscribe.call(st) -->
    7. st.onNext(1) 即 MapSubscriber.onNext(1) -->
    8. String string = func.call(1)
    9. subscriber.onNext(string)**

    那要是把 map() 与 subscribeOnIO() 换下位置呢

    OnSubscribe<Integer> onSubscribe = new OnSubscribe<>();
    Observable<Integer> observable = new Observable<>(onSubscribe);
    
    OnSubscribeOnIO<Integer> onSubscribe2 = new OnSubscribeOnIO(observable);
    Observable<Integer> observable2 = new Observable<>(onSubscribe2);
    
    Func1<Integer, String> func = new Func1<>();
    OperatorMap<Integer, String> operatorMap = new OperatorMap<>(func);
    OnSubscribeLift<Integer, String> onSubscribe3 = new OnSubscribeLift<>(onSubscrib2, operatorMap);
    Observable<String> observable3 = new Observable<>(onSubscribe3);
    Subscriber<String> subscriber = new Subscriber<>();
    observable3.subscribe(subscriber);
    

    **1) observable3.subscribe(subscriber) -->

    1. onSubscribe3.call(subscriber) 即 OnSubscribeLift.call(subscriber) -->
    2. Subscriber<Integer> st = operatorMap.call(subscriber) 即
    3. Subscriber<Integer> st = new MapSubscriber<Integer, String>(subscriber, func)
    4. parent.call(st) 即 onSubscribe2.call(st) 即 OnSubscribeOnIO.call(st) -->
    5. 子线程 new Runnable(){} --> observable.subscribe(st) -->
    6. onSubscribe.call(st) -->
    7. st.onNext(1) 即 MapSubscriber.onNext(1) -->
    8. String string = func.call(1)
    9. subscriber.onNext(string)**

    看得出来,不管 subscribeOnIO() 在哪,第一个 onSubscribe.call() 总是运行在子线程

    4、observeOn

    先看下 demo 最终写法

    Handler handler = new Handler();
    
    Observable
            .create(new OnSubscribe<Integer>() {
                @Override
                public void call(Subscriber<? super Integer> subscriber) {
                    subscriber.onNext(1);
                }
            })
            .observeOn(handler)
            .map(new Func1<Integer, String>() {
                @Override
                public String call(Integer integer) {
                    return "map" + integer;
                }
            })
            .subscribeOnIO()
            .subscribe(new Subscriber<String>() {
                @Override
                public void onNext(String s) {
                    System.out.println(s);
                }
            });
    
    handler.loop(); //队列没有消息时会挂起当前线程,直到收到新的消息
    

    同样我们也自己实现一个简单的可以切换回主线程的 observeOn(Handler)

    public class Observable<T> {
        ...
        public final Observable<T> observeOn(Handler handler) {
            return lift(new OperatorObserveOn<T>(handler));
        }
    }
    

    OperatorObserveOn

    public final class OperatorObserveOn<T> implements Operator<T, T> {
        private Handler handler;
    
        public OperatorObserveOn(Handler handler) {
            this.handler = handler;
        }
    
        @Override
        public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {
            Subscriber<T> s = new Subscriber<T>() {
                @Override
                public void onNext(final T t) {
                    handler.post(new Runnable() {
                        @Override
                        public void run() {
                            subscriber.onNext(t);
                        }
                    });
                }
            };
            return s;
        }
    }
    

    自定义Handler

    public class Handler {
    
        private ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(10);
    
        public void loop() {
            for (; ; ) {
                Runnable runnable;
                try {
                    runnable = queue.take();// 没有数据则一直阻塞,直到有数据自动唤醒
                } catch (InterruptedException e) {
                    return;
                }
                if (runnable == null) {
                    return;
                }
                runnable.run();
            }
        }
    
        public void post(Runnable runnable) {
            try {
                queue.put(runnable);// 没有空间则一直阻塞,直到有空间
            } catch (InterruptedException e) {
                return;
            }
        }
    }
    

    非链式写法

    OnSubscribe<Integer> onSubscribe = new OnSubscribe<>();
    Observable<Integer> observable = new Observable<>(onSubscribe);
    
    OperatorObserveOn<Integer> operatorObserveOn = new OperatorObserveOn(handler);
    OnSubscribeLift<Integer, String> onSubscribe2 = new OnSubscribeLift<>(onSubscribe, operatorObserveOn);
    
    Func1<Integer, String> func = new Func1<>();
    OperatorMap<Integer, String> operatorMap = new OperatorMap<>(func);
    OnSubscribeLift<Integer, String> onSubscribe3 = new OnSubscribeLift<>(onSubscribe2, operatorMap);
    Observable<String> observable2 = new Observable<>(onSubscribe3);
    OnSubscribeOnIO<String> onSubscribe4 = new OnSubscribeOnIO(observable2);
    Observable<String> observable3 = new Observable<>(onSubscribe4);
    Subscriber<String> subscriber = new Subscriber<>();
    observable3.subscribe(subscriber);
    

    **1) observable3.subscribe(subscriber) -->

    1. onSubscribe4.call(subscriber) 即 OnSubscribeOnIO.call(subscriber) -->
    2. 子线程 new Runnable(){} --> observable2.subscribe(subscriber)
    3. onSubscribe3.call(subscriber) 即 OnSubscribeLift.call(subscriber) -->
    4. Subscriber<Integer> st = operatorMap.call(subscriber) 即
    5. Subscriber<Integer> st = new MapSubscriber<Integer, String>(subscriber, func)
    6. parent.call(st) 即 onSubscribe2.call(st) 即 OnSubscribeLift.call(st)-->
    7. Subscriber<Integer> st2 = operatorObserveOn.call(st) -->
    8. parent.call(st2) 即 onSubscribe.call(st2) -->
    9. st2.onNext(1) --> // onNext()里面切换到Handler所在线程
    10. st.onNext(1) -->
    11. String string = func.call(1)
    12. subscriber.onNext(string)**

    5、其他

    总的来说,调用链确实有点复杂,不过也还是可以接受的,一个调用链花点时间想想还是能清楚,只是每碰到一个调用链都要花点时间才能想清楚,还没能力能在几秒内就能想清楚,只能是多想多锻炼了。比如想想上面的,如果把 observeOn(handler) 放在 map() 后面呢

    相关文章

      网友评论

      • 4b5624f4a07c:每次看rxjava代码都懵逼,感谢提供了个学习的例子!
        能分享下看源码的经验吗?
        风风风筝:就像学画画,画棵树,先学画枝干,不要盯着树叶上的脉络

      本文标题:自己动手实现 RxJava 理解其调用链

      本文链接:https://www.haomeiwen.com/subject/qufoyttx.html