美文网首页Android技术知识
阅读 rxjava 源代码

阅读 rxjava 源代码

作者: 017830d97951 | 来源:发表于2016-11-20 01:47 被阅读0次

    介绍

    事件驱动异步调用是两种慢慢被大家接收的编程范式了。rxjava 库利用观察者模式,把事件驱动异步调用程序组合在一起。

    基于异步调用和事件驱动的程序,经常陷入回调陷阱,导致程序的可读性下降,写出来的程序像意大利面条(callback hell, callback spaghetti)。参考 http://callbackhell.com。

    本文从源代码级别上,介绍了 rxjava 中最重要的几个接口和类。

    Obserable

    final Observable<String> observable = Observable.empty();
    

    Observable 只有一个成员变量 onSubscribe

    onSubscribe 只有一个成员方法

    Observer

    class SimpleObserver implements Observer<String> {
         @Override
         public void onCompleted() {}
         @Override
         public void onError(Throwable e) {}
         @Override
         public void onNext(String s) {}
    }
    final Observer<String> observer = new SimpleObserver();
    

    observer 没有任何成员变量。

    subscribe

    observable.subscribe(observer);
    

    这一堆 subcribe 函数都落在了 Subscriber 参数的那个版本上了,返回值都是 Subscription

    image::images/observable-11bb0.png[]

    subscriber 其实就是 observer ,也是 subscription。

    subscription 用来停止订阅 unsubscribe

    subscribe 比较矫情的一段代码,简化如下。

    class Observable<T> {
      public final Subscription subscribe(Subscriber<? super T> subscriber) {
          onSubscribe.call(subscriber);
          return subscriber;
      }
    }
    
    1. subscriber 就是传进去的第一个参数 observable.subscribe(observer)subscriber 也是一个 Observer, 因为 Subcriber 继承 Observer
    2. 这个参数 subscriber 传递给对象 onSubscribecall 方法, onSubscribe.call(subscriber)
    3. subscriber 作为一个 Subscription 返回。 因为 Subscriber 继承 Subscription

    由此可见 Subscriber 这个类才是 rxjava 的核心。subscriber 对象不停的在各个类之间流转。 各种各样不同 OnSubscribe 接口的实现,可以去产生数据源,然后调用 subscriber.onNext(data)

    小结

    一个超级简化版的 rxjava 可以是下面这个样子。

    public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
    }
    public class Observable<T> {
        final OnSubscribe<T> onSubscribe;
    
        public Observable(OnSubscribe<T> onSubscribe) {
            this.onSubscribe = onSubscribe;
        }
        public final Subscription subscribe(Subscriber<? super T> subscriber) {
            onSubscribe.call(subscriber);
            return subscriber;
        }
    
    }
    public interface Observer<T> {
        void onCompleted();
        void onError(Throwable e);
        void onNext(T t);
    }
    public interface Subscription {
        void unsubscribe();
        boolean isUnsubscribed();
    }
    public abstract class Subscriber<T> implements Observer<T>, Subscription {
    }
    

    尽管 Subscriber 类所占的篇幅很小,他确实核心的一个类。

    测试一下这段代码

        @Test
        public void main() throws Exception {
            final Observable<String> observable = new Observable<>(subscriber -> {
                subscriber.onNext("hello world");
                subscriber.onCompleted();
            });
            final Subscriber<String> subscriber = new Subscriber<String>() {
                @Override
                public void onCompleted() {
                    System.out.println("byebye");
                }
                @Override
                public void onError(Throwable e) {
                }
                @Override
                public void onNext(String s) {
                    System.out.println("> " + s);
                }
                @Override
                public void unsubscribe() {
                }
                @Override
                public boolean isUnsubscribed() {
                    return false;
                }
            };
            observable.subscribe(subscriber);
        }
    

    相关文章

      网友评论

        本文标题:阅读 rxjava 源代码

        本文链接:https://www.haomeiwen.com/subject/mywepttx.html