美文网首页
RxJava 源码学习之最简单的Demo

RxJava 源码学习之最简单的Demo

作者: yoosir | 来源:发表于2016-11-01 01:01 被阅读0次

** 会看(我在这呢,X_X!!!) =》会用** =》 会说 =》会写 =》会模仿 =》**能new **

说明:本文章是依据 RxJava 1.x (1.2.1) 源码分析学习的。

这是开头

本文主要看源码分析一个RxJava最简单的使用示例。一步俩步,摩擦......

正文在这

首先,写个最简单的RxJava使用示例,如下:

//创建一个Observable 的对象
Observable<String> mObservable = Observable.create( 
   new Observable.OnSubscribe<String>(){ 
       @Override  
      public void call(Subscriber<? super String> subscriber) { 
           subscriber.onNext("Hello world!"); 
           subscriber.onCompleted(); 
       } 
   });
//创建一个Subscriber对象
Subscriber<String> mSubscriber = new Subscriber<String>() { 
   @Override 
   public void onCompleted() { 
       //完成  
  } 
   @Override  
  public void onError(Throwable e) { 
       //异常  
  }  
  @Override  
  public void onNext(String s) { 
       //下一步  
      System.out.print(s); 
   }
};
//关联 Observable 和 Subscriber
mObservable.subscribe(mSubscriber);

我们就根据上面最简单的使用示例,按下面步骤来学习一下RxJava的源码:

  • Observable(被观察者) 的创建
  • Observer(观察者)的创建
  • subscribe() 绑定

开始喽:

  • 1.Observable(被观察者) 的创建

Observable.create 入手

public static <T> Observable<T> create(OnSubscribe<T> f) { 
   return new Observable<T>(RxJavaHooks.onCreate(f));
} 
//.....
//相关代码:RxJavaHooks.onCreate(f)
//.....  
final OnSubscribe<T> onSubscribe;  
protected Observable(OnSubscribe<T> f) {
   this.onSubscribe = f;
}

create创建了一个Observable对象,并将OnSubscribe f参数赋值给了Observable 对象的成员变量 onSubscribe。当然,这中间调用了RxJavaHooks.onCreate(f),我们来看下它的源码。

/**
 * Hook to call when an Observable is created.
 * @param <T> the value type
 * @param onSubscribe the original OnSubscribe logic
 * @return the original or replacement OnSubscribe instance
 */
@SuppressWarnings({ "rawtypes", "unchecked" })
public static <T> Observable.OnSubscribe<T> onCreate(Observable.OnSubscribe<T> onSubscribe) { 
   Func1<Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableCreate; 
   if (f != null) { 
       return f.call(onSubscribe); 
   } 
   return onSubscribe;
}

这里其实也就是返回了 onSubscribe 本身,RxJavaHooks是一个代理对象, 仅仅用作调试的时候可以插入一些测试代码,大家这样理解就行了。
Ok,上面说了这么久 Observable.OnSubscribe ,那这个 Observable.OnSubscribe 主要是干嘛的呢?,,看源码:

/**
   * Invoked when Observable.subscribe is called.
   * @param <T> the output value type
 */
public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> { 
   // cover for generics insanity
}.
//Action1
public interface Action1<T> extends Action {  
   void call(T t);
}

可以看到 Observable.OnSubscribe是一个拥有 call方法的接口类而已。不过,注意到 Observable.subscribe(后面讲)被调用时,该接口方法会被调用。
好的,总结就是create()方法创建了一个Observable,且给Observable中OnSubscribe变量进行赋值。

  • 2.Observer(观察者)的创建

在文中,为了方便,所有的观察者(Observer)我将用 Subscriber(是实现Observer的接口的抽象类) 来代替,这个会在下面 subscribe()讲解中说明。先看下,Subscriber 的构造:

public abstract class Subscriber<T> implements Observer<T>, Subscription {  
    ...省略其他代码..
    protected Subscriber() {    this(null, false);}
    protected Subscriber(Subscriber<?> subscriber) {
      this(subscriber, true);
    }
    protected Subscriber(Subscriber<?> subscriber, boolean shareSubscriptions) {
      this.subscriber = subscriber;
      this.subscriptions = shareSubscriptions && subscriber != null ? subscriber.subscriptions : new SubscriptionList();
    }
    ...省略其他代码...
}

Subscriber是一个抽象类,主要实现了 ObserverSubscription接口类。

public interface Observer<T> {  
    //提示观察者,当被观察者的所有事件已经结束了。
    //与 onError 互斥,
    void onCompleted();  

    //提示观察者,当有一个异常发生时。
    //与 onCompleted互斥,
    void onError(Throwable e);  

    /** 为观察者提供了一个新的项目观察
     *  这个方法可能被调用 0 ~ N 次。
     *  当 onCompleted 或 onError 被调用后,onNext 不会在被调用
    */
    void onNext(T t);
}
.....
public interface Subscription {   
    //观察者与被观察者解绑,不再对它进行观察
    void unsubscribe();
    //检查是否已经解绑    
    boolean isUnsubscribed();
}

上面的代码注释已经很清楚了写明了 Subscriber 的相关方法作用。

3.subscribe() 绑定

subscribe() 实现 被观察者Observable 和 观察者**Subscriber ** 的绑定关系,也是代码开始执行的地方。现在来看下这个方法的源码,我们先找到 subscribe(final Observer<? super T> observer)看下,如下:

public final Subscription subscribe(final Observer<? super T> observer) {  
  if (observer instanceof Subscriber) {  
      return subscribe((Subscriber<? super T>)observer);  
  }  
  return subscribe(new ObserverSubscriber<T>(observer));
}
.....
//再查看下 ObserverSubscriber这个类
public final class ObserverSubscriber<T> extends Subscriber<T> { 
   final Observer<? super T> observer; 
   public ObserverSubscriber(Observer<? super T> observer) {   
     this.observer = observer;  
  }    
  @Override  
  public void onNext(T t) {   
     observer.onNext(t);  
  }   
  @Override  
  public void onError(Throwable e) {    
    observer.onError(e);  
  }     
 @Override  
  public void onCompleted() {   
     observer.onCompleted();  
  }
}

从上面代码中,我们看到在 subscribe()中最终就是将 Observer 转成了 Subscriber 进行后续代码执行的。这也就说明了之前的用Subsciber代替Observer的原因。
好的,我们现在继续查看 subscribe()方法的源码:

public final Subscription subscribe(Subscriber<? super T> subscriber) { 
   return Observable.subscribe(subscriber, this);
}
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) { 
 ....省略无关代码...
 subscriber.onStart();
 ...省略无关代码.....
 hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);  
 return hook.onSubscribeReturn(subscriber); 
}

subscriber.onStart(); (观察者Subscriber的onStart 方法可以做些预先操作,比如发起请求前,显示进度条等),最主要的是 执行了这一句hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);,ok,我们先看下 hook.onSubscribeStart的实现:

public <T> OnSubscribe<T> onSubscribeStart(Observable<? extends T> observableInstance, final OnSubscribe<T> onSubscribe) {  
  // pass through by default 
  return onSubscribe;
}

竟然,就是直接返回了 onSubscribe这个对象,什么事也没干,而我们知道这个 onSubscribe就是我们创建 Observable时传进入的参数,所以 subscribe()简化如下

//subscriber 就是我们的观察者
onSubscribe.call(subscriber);

然后,就是在 call() 方法中执行 Subscriber的 onNext(),onCompleted(),或 onError()相关方法了。

Observable<String> mObservable = Observable.create( 
   new Observable.OnSubscribe<String>(){ 
       @Override  
      public void call(Subscriber<? super String> subscriber) { 
          //调用 subscriber 的相关方法
           subscriber.onNext("Hello world!"); 
           subscriber.onCompleted(); 
       } 
   });

我们来画一个流程图,如下:


Paste_Image.png

结合图,我们整理下思路:

  • 首先,通过 create() 方法创建ObservableA对象,并赋值其变量OnSubscribeA。
  • 创建观察者对象Subscriber。
  • 然后,通过 subscribe() 绑定 ObservableA 和 Subsciber,并调用 OnSubcribeA.call()。
  • 最后,就是在 call()方法中,执行调用 Subscriber 的 onNext / onCompleted / onError 的对应方法。

尾巴呢?

到这里,我们算是把RxJava最简单的部分弄清楚了,如果文中有什么差错或者不懂的地方,欢迎大家指正与交流!

接下来呢,我们将向高级部分冲刺,比如 各种操作符lift的原理Scheduler线程控制的原理 等等,我们将一一探索其源码的奥秘,(●'◡'●)

相关文章

网友评论

      本文标题:RxJava 源码学习之最简单的Demo

      本文链接:https://www.haomeiwen.com/subject/kcnputtx.html