美文网首页
RxJava 源码学习之最简单的Demo

RxJava 源码学习之最简单的Demo

作者: yoosir | 来源:发表于2016-11-01 01:01 被阅读0次

    ** 会看(我在这呢,X_X!!!) =》会用** =》 会说 =》会写 =》会模仿 =》**能new **

    说明:本文章是依据 RxJava 1.x (1.2.1) 源码分析学习的。

    这是开头

    本文主要看源码分析一个RxJava最简单的使用示例。一步俩步,摩擦......

    正文在这

    首先,写个最简单的RxJava使用示例,如下:

    //创建一个Observable 的对象
    Observable<String> mObservable = Observable.create( 
       new Observable.OnSubscribe<String>(){ 
           @Override  
          public void call(Subscriber<? super String> subscriber) { 
               subscriber.onNext("Hello world!"); 
               subscriber.onCompleted(); 
           } 
       });
    //创建一个Subscriber对象
    Subscriber<String> mSubscriber = new Subscriber<String>() { 
       @Override 
       public void onCompleted() { 
           //完成  
      } 
       @Override  
      public void onError(Throwable e) { 
           //异常  
      }  
      @Override  
      public void onNext(String s) { 
           //下一步  
          System.out.print(s); 
       }
    };
    //关联 Observable 和 Subscriber
    mObservable.subscribe(mSubscriber);
    

    我们就根据上面最简单的使用示例,按下面步骤来学习一下RxJava的源码:

    • Observable(被观察者) 的创建
    • Observer(观察者)的创建
    • subscribe() 绑定

    开始喽:

    • 1.Observable(被观察者) 的创建

    Observable.create 入手

    public static <T> Observable<T> create(OnSubscribe<T> f) { 
       return new Observable<T>(RxJavaHooks.onCreate(f));
    } 
    //.....
    //相关代码:RxJavaHooks.onCreate(f)
    //.....  
    final OnSubscribe<T> onSubscribe;  
    protected Observable(OnSubscribe<T> f) {
       this.onSubscribe = f;
    }
    

    create创建了一个Observable对象,并将OnSubscribe f参数赋值给了Observable 对象的成员变量 onSubscribe。当然,这中间调用了RxJavaHooks.onCreate(f),我们来看下它的源码。

    /**
     * Hook to call when an Observable is created.
     * @param <T> the value type
     * @param onSubscribe the original OnSubscribe logic
     * @return the original or replacement OnSubscribe instance
     */
    @SuppressWarnings({ "rawtypes", "unchecked" })
    public static <T> Observable.OnSubscribe<T> onCreate(Observable.OnSubscribe<T> onSubscribe) { 
       Func1<Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableCreate; 
       if (f != null) { 
           return f.call(onSubscribe); 
       } 
       return onSubscribe;
    }
    

    这里其实也就是返回了 onSubscribe 本身,RxJavaHooks是一个代理对象, 仅仅用作调试的时候可以插入一些测试代码,大家这样理解就行了。
    Ok,上面说了这么久 Observable.OnSubscribe ,那这个 Observable.OnSubscribe 主要是干嘛的呢?,,看源码:

    /**
       * Invoked when Observable.subscribe is called.
       * @param <T> the output value type
     */
    public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> { 
       // cover for generics insanity
    }.
    //Action1
    public interface Action1<T> extends Action {  
       void call(T t);
    }
    

    可以看到 Observable.OnSubscribe是一个拥有 call方法的接口类而已。不过,注意到 Observable.subscribe(后面讲)被调用时,该接口方法会被调用。
    好的,总结就是create()方法创建了一个Observable,且给Observable中OnSubscribe变量进行赋值。

    • 2.Observer(观察者)的创建

    在文中,为了方便,所有的观察者(Observer)我将用 Subscriber(是实现Observer的接口的抽象类) 来代替,这个会在下面 subscribe()讲解中说明。先看下,Subscriber 的构造:

    public abstract class Subscriber<T> implements Observer<T>, Subscription {  
        ...省略其他代码..
        protected Subscriber() {    this(null, false);}
        protected Subscriber(Subscriber<?> subscriber) {
          this(subscriber, true);
        }
        protected Subscriber(Subscriber<?> subscriber, boolean shareSubscriptions) {
          this.subscriber = subscriber;
          this.subscriptions = shareSubscriptions && subscriber != null ? subscriber.subscriptions : new SubscriptionList();
        }
        ...省略其他代码...
    }
    

    Subscriber是一个抽象类,主要实现了 ObserverSubscription接口类。

    public interface Observer<T> {  
        //提示观察者,当被观察者的所有事件已经结束了。
        //与 onError 互斥,
        void onCompleted();  
    
        //提示观察者,当有一个异常发生时。
        //与 onCompleted互斥,
        void onError(Throwable e);  
    
        /** 为观察者提供了一个新的项目观察
         *  这个方法可能被调用 0 ~ N 次。
         *  当 onCompleted 或 onError 被调用后,onNext 不会在被调用
        */
        void onNext(T t);
    }
    .....
    public interface Subscription {   
        //观察者与被观察者解绑,不再对它进行观察
        void unsubscribe();
        //检查是否已经解绑    
        boolean isUnsubscribed();
    }
    

    上面的代码注释已经很清楚了写明了 Subscriber 的相关方法作用。

    3.subscribe() 绑定

    subscribe() 实现 被观察者Observable 和 观察者**Subscriber ** 的绑定关系,也是代码开始执行的地方。现在来看下这个方法的源码,我们先找到 subscribe(final Observer<? super T> observer)看下,如下:

    public final Subscription subscribe(final Observer<? super T> observer) {  
      if (observer instanceof Subscriber) {  
          return subscribe((Subscriber<? super T>)observer);  
      }  
      return subscribe(new ObserverSubscriber<T>(observer));
    }
    .....
    //再查看下 ObserverSubscriber这个类
    public final class ObserverSubscriber<T> extends Subscriber<T> { 
       final Observer<? super T> observer; 
       public ObserverSubscriber(Observer<? super T> observer) {   
         this.observer = observer;  
      }    
      @Override  
      public void onNext(T t) {   
         observer.onNext(t);  
      }   
      @Override  
      public void onError(Throwable e) {    
        observer.onError(e);  
      }     
     @Override  
      public void onCompleted() {   
         observer.onCompleted();  
      }
    }
    

    从上面代码中,我们看到在 subscribe()中最终就是将 Observer 转成了 Subscriber 进行后续代码执行的。这也就说明了之前的用Subsciber代替Observer的原因。
    好的,我们现在继续查看 subscribe()方法的源码:

    public final Subscription subscribe(Subscriber<? super T> subscriber) { 
       return Observable.subscribe(subscriber, this);
    }
    static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) { 
     ....省略无关代码...
     subscriber.onStart();
     ...省略无关代码.....
     hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);  
     return hook.onSubscribeReturn(subscriber); 
    }
    

    subscriber.onStart(); (观察者Subscriber的onStart 方法可以做些预先操作,比如发起请求前,显示进度条等),最主要的是 执行了这一句hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);,ok,我们先看下 hook.onSubscribeStart的实现:

    public <T> OnSubscribe<T> onSubscribeStart(Observable<? extends T> observableInstance, final OnSubscribe<T> onSubscribe) {  
      // pass through by default 
      return onSubscribe;
    }
    

    竟然,就是直接返回了 onSubscribe这个对象,什么事也没干,而我们知道这个 onSubscribe就是我们创建 Observable时传进入的参数,所以 subscribe()简化如下

    //subscriber 就是我们的观察者
    onSubscribe.call(subscriber);
    

    然后,就是在 call() 方法中执行 Subscriber的 onNext(),onCompleted(),或 onError()相关方法了。

    Observable<String> mObservable = Observable.create( 
       new Observable.OnSubscribe<String>(){ 
           @Override  
          public void call(Subscriber<? super String> subscriber) { 
              //调用 subscriber 的相关方法
               subscriber.onNext("Hello world!"); 
               subscriber.onCompleted(); 
           } 
       });
    

    我们来画一个流程图,如下:


    Paste_Image.png

    结合图,我们整理下思路:

    • 首先,通过 create() 方法创建ObservableA对象,并赋值其变量OnSubscribeA。
    • 创建观察者对象Subscriber。
    • 然后,通过 subscribe() 绑定 ObservableA 和 Subsciber,并调用 OnSubcribeA.call()。
    • 最后,就是在 call()方法中,执行调用 Subscriber 的 onNext / onCompleted / onError 的对应方法。

    尾巴呢?

    到这里,我们算是把RxJava最简单的部分弄清楚了,如果文中有什么差错或者不懂的地方,欢迎大家指正与交流!

    接下来呢,我们将向高级部分冲刺,比如 各种操作符lift的原理Scheduler线程控制的原理 等等,我们将一一探索其源码的奥秘,(●'◡'●)

    相关文章

      网友评论

          本文标题:RxJava 源码学习之最简单的Demo

          本文链接:https://www.haomeiwen.com/subject/kcnputtx.html