美文网首页
Retrofit 2.1 + Rxjava 源码解析(一)

Retrofit 2.1 + Rxjava 源码解析(一)

作者: innovatorCL | 来源:发表于2017-05-05 15:37 被阅读325次

    1.创建Retrofit对象

    
    OkHttpClient.Builder okHttpClient = new OkHttpClient.Builder();
    retrofit = new Retrofit.Builder()
                    .client(okHttpClient.build())
                    .addConverterFactory(GsonConverterFactory.create())
                    .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
                    .baseUrl(base_url)
                    .build();
    

    这里是普通的 Retrofit 对象创建过程,传入一些必要的参数:okHttpClientconverterFactorycallAdapterFactory(不搭配 Rxjava 的时候使用 Retrofit 默认的 callAdapterFactory,什么都不做),baseUrl 。

    这里特别要注意的是传入了 RxJavaCallAdapterFactory.create() 这个RxjavaCallAdapter 对象,这个对象将彻底改变 Retrofit 的使用。使得 Retrofit 搭配 Rxjava 变成可能,不得不佩服 Retrofit 作者的编程功底,开放 CallAdapterFactory 这个接口,使 Retrofit 的灵活性更高。

    2.创建接口的动态代理对象

    给出实验的接口

    public interface NetApiService {
    
        //post请求
        @FormUrlEncoded
        @POST("{url}")
        Observable<ResponseBody> executePost(
                @Path("url") String url,
                @Field("params") String params,
                @Field("signature") String signature
        );
    
    }
    
    netApiService = retrofit.create(NetApiService .class);  //返回一个动态代理对象
    

    这里也是 Retrofit 神奇的地方,传入一个接口,就可以生成实现了这个接口的对象,当然这个只是 Java 代码生成的动态代理对象。下面我们进
    create() 方法看看。

    public <T> T create(final Class<T> service) {
        Utils.validateServiceInterface(service);  //验证外部传进的“服务”接口是否合法
        if (validateEagerly) {
          eagerlyValidateMethods(service);  //根据validateEagerly判断是否对接口中的全部方法进行缓存
        }
        //使用Proxy工厂类返回一个泛型动态代理实例。
        return (T) Proxy.newProxyInstance(service.getClassLoader(), new Class<?>[] { service },
            new InvocationHandler() {
              private final Platform platform = Platform.get();
    
              @Override public Object invoke(Object proxy, Method method, Object... args)
                  throws Throwable {
                // If the method is a method from Object then defer to normal invocation.
                if (method.getDeclaringClass() == Object.class) {
                  return method.invoke(this, args);
                }
                if (platform.isDefaultMethod(method)) {
                  return platform.invokeDefaultMethod(method, service, proxy, args);
                }
                ServiceMethod serviceMethod = loadServiceMethod(method);
                OkHttpCall okHttpCall = new OkHttpCall<>(serviceMethod, args);
                return serviceMethod.callAdapter.adapt(okHttpCall);
              }
            });
      }
    

    在这里我们首先对传入的接口进行检验是否是接口,然后根据 validateEagerly 判断是否对接口中的全部方法进行缓存,最后我们用 java.lang.reflect.Proxy; 创建一个泛型的动态代理对象,返回这个对象。(不懂 java动态代理技术 的同学别着急,我会在文末给出参考资料)

    3.创建Observable

    Observable<ResponseBody> observable = netApiService.executePost(url, params, signature);
    

    调用动态代理对象的接口方法,这时候会调用

    new InvocationHandler() {
              private final Platform platform = Platform.get();
    
              @Override public Object invoke(Object proxy, Method method, Object... args)
                  throws Throwable {
                // If the method is a method from Object then defer to normal invocation.
                if (method.getDeclaringClass() == Object.class) {
                  return method.invoke(this, args);
                }
                if (platform.isDefaultMethod(method)) {
                  return platform.invokeDefaultMethod(method, service, proxy, args);
                }
                ServiceMethod serviceMethod = loadServiceMethod(method);
                OkHttpCall okHttpCall = new OkHttpCall<>(serviceMethod, args);
                return serviceMethod.callAdapter.adapt(okHttpCall);
              }
            }
    

    InvocationHandlerinvoke() 方法,在这里我们可以看到有三个参数:
    proxy 表示通过 Proxy.newProxyInstance() 生成的代理类对象。
    method 表示代理对象被调用的函数。
    args 表示代理对象被调用的函数的参数。
    调用代理对象的每个函数实际最终都是调用了 InvocationHandlerinvoke 函数。

    由于这个是接口的方法,所以不会进第一个 if ,因为也不是默认方法,所以也不会进第二个 if 。这样就可以看到我们的代理对象在调用了接口的方法后实际上是 new 了一个 okHttpCall<> 对象,然后将这个对象作为参数传进了 callAdapter.adapt(); 方法中

    由于我们之前传入的是 RxJavaCallAdapterFactory.create() ,所以我们深入 RxJavaCallAdapterFactory.java 看看构造 Observable 的方法,在adapt()可以看到:

    static final class ResponseCallAdapter implements CallAdapter<Observable<?>> {
        private final Type responseType;
        private final Scheduler scheduler;
    
        ResponseCallAdapter(Type responseType, Scheduler scheduler) {
          this.responseType = responseType;
          this.scheduler = scheduler;
        }
    
        @Override public Type responseType() {
          return responseType;
        }
    
        @Override public <R> Observable<Response<R>> adapt(Call<R> call) {
          Observable<Response<R>> observable = Observable.create(new CallOnSubscribe<>(call));
          if (scheduler != null) {
            return observable.subscribeOn(scheduler);
          }
          return observable;
        }
      }
    

    在这里我们看到,这里将传入的 okHttp 对象 作为参数,构造了 CallOnSubscribe 对象
    CallOnSubscribe 是何方神圣???按照 Rxjava 构造 Observable 方法来说,这个 CallOnSubscribe 应该是一个实现了Observable.OnSubscribe<T> 接口的对象。

    我们看看源码,果然如此。

    static final class CallOnSubscribe<T> implements Observable.OnSubscribe<Response<T>> {
        private final Call<T> originalCall;
    
        CallOnSubscribe(Call<T> originalCall) {
          this.originalCall = originalCall;
        }
    
        @Override public void call(final Subscriber<? super Response<T>> subscriber) {
          // Since Call is a one-shot type, clone it for each new subscriber.
          Call<T> call = originalCall.clone();
    
          // Wrap the call in a helper which handles both unsubscription and backpressure.
          RequestArbiter<T> requestArbiter = new RequestArbiter<>(call, subscriber);
          subscriber.add(requestArbiter);
          subscriber.setProducer(requestArbiter);
        }
      }
    

    看到这里,相信你大概也懂了为什么调用生成的动态代理对象的接口方法不像只使用 Retrofit 那样返回一个 okHttpCall<> 对象,而是返回一个 Observable<ResponseBody> 对象。其实这就是
    RxJavaCallAdapterFactory 做的转换。

    仔细看 CallOnSubscribe<T>call() 方法,我们发现这里的 subscriber (其实是调用 subscribe() 方法时传进来的 subscriber ,就是外部的观察者)添加了一个 requestArbiter 对象。这个对象很重要,在 subscriber.setProducer(requestArbiter); 时,它会控制
    okHttpCall对象 直接联网获取数据,然后回调给观察者 subscriber

    4.observable.subscribe(subscriber);订阅

    这里的代码不多,就一行 observable.subscribe(subscriber); 。我们仔细看看在 subscribe 方法里面发生了什么神奇的事?(提前剧透一下,Observable.OnSubscribe<T>对象 很棒,它就相当于桥梁,将 Observable 和 Observer 连接起来)

     public final Subscription subscribe(Subscriber<? super T> subscriber) {
            return Observable.subscribe(subscriber, this);
        }
        
        static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
         // validate and proceed
        //判断传进来的参数,即是观察者对象,被观察者对象,是否为空。
            if (subscriber == null) {
                throw new IllegalArgumentException("observer can not be null");
            }
            if (observable.onSubscribe == null) {
                throw new IllegalStateException("onSubscribe function can not be null.");
                /*
                 * the subscribe function can also be overridden but generally that's not the appropriate approach
                 * so I won't mention that in the exception
                 */
            }
            
            // new Subscriber so onStart it
           //重要的操作,可以在订阅之前做一些准备工作
            subscriber.onStart();
            
            /*
             * See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
             * to user code from within an Observer"
             */
            // if not already wrapped
            if (!(subscriber instanceof SafeSubscriber)) {
                // assign to `observer` so we return the protected version
                subscriber = new SafeSubscriber<T>(subscriber);
            }
    
            // The code below is exactly the same an unsafeSubscribe but not used because it would 
            // add a significant depth to already huge call stacks.
            try {
                // allow the hook to intercept and/or decorate
                hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
                return hook.onSubscribeReturn(subscriber);
            } catch (Throwable e) {
                // special handling for certain Throwable/Error/Exception types
                Exceptions.throwIfFatal(e);
                // in case the subscriber can't listen to exceptions anymore
                if (subscriber.isUnsubscribed()) {
                    RxJavaPluginUtils.handleException(hook.onSubscribeError(e));
                } else {
                    // if an unhandled error occurs executing the onSubscribe we will propagate it
                    try {
                        subscriber.onError(hook.onSubscribeError(e));
                    } catch (Throwable e2) {
                        Exceptions.throwIfFatal(e2);
                        // if this happens it means the onError itself failed (perhaps an invalid function implementation)
                        // so we are unable to propagate the error correctly and will just throw
                        RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
                        // TODO could the hook be the cause of the error in the on error handling.
                        hook.onSubscribeError(r);
                        // TODO why aren't we throwing the hook's return value.
                        throw r;
                    }
                }
                return Subscriptions.unsubscribed();
            }
        }
    

    在这段代码中,我们看到 subscribe() 过程中,会先调用 onStart() ,一般这个方法在 subscribe 刚开始,而事件还未发送之前被调用,可以用于做一些准备工作,例如数据的清零或重置。这是一个可选方法,默认情况下它的实现为空。需要注意的是,如果对准备工作的线程有要求(例如弹出一个显示进度的对话框,这必须在主线程执行), onStart()就不适用了(因为它总是在 subscribe 所发生的线程被调用,而不能指定线程。要在指定的线程来做准备工作,可以使用 doOnSubscribe()
    方法)(先记住这个知识点,请保留,现在先不管线程切换)

    高能来了!!!
    我们重点看看这句代码:

    hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
                return hook.onSubscribeReturn(subscriber);
    
     public <T> OnSubscribe<T> onSubscribeStart(Observable<? extends T> observableInstance, final OnSubscribe<T> onSubscribe) {
            // pass through by default
            return onSubscribe;
        }
    

    其实 onSubscribeStart() 方法直接返回了 onSubscribe 对象,然后直接调用 onSubscribecall(subscriber) 方法。记得我们刚刚分析,这里其实是调用了 CallOnSubscribe<T>对象call() 方法。也就是在这里进行了联网获取数据,然后回调 Subscriber 观察者的方法。(具体的代码就是 call() 方法的subscriber.setProducer(requestArbiter);)。

    public void setProducer(Producer p) {
            long toRequest;
            boolean passToSubscriber = false;
            synchronized (this) {
                toRequest = requested;
                producer = p;
                if (subscriber != null) {
                    // middle operator ... we pass through unless a request has been made
                    if (toRequest == NOT_SET) {
                        // we pass through to the next producer as nothing has been requested
                        passToSubscriber = true;
                    }
                }
            }
            // do after releasing lock
            if (passToSubscriber) {
                subscriber.setProducer(producer);
            } else {
                // we execute the request with whatever has been requested (or Long.MAX_VALUE)
                if (toRequest == NOT_SET) {
                    producer.request(Long.MAX_VALUE);
                } else {
                    producer.request(toRequest);
                }
            }
        }
    

    最后会调用 producer.request(toRequest); 方法。
    这个 request() 方法,就是 RequestArbiter<T> requestArbiter = new RequestArbiter<>(call, subscriber);request()

    static final class RequestArbiter<T> extends AtomicBoolean implements Subscription, Producer {
        private final Call<T> call;
        private final Subscriber<? super Response<T>> subscriber;
    
        RequestArbiter(Call<T> call, Subscriber<? super Response<T>> subscriber) {
          this.call = call;
          this.subscriber = subscriber;
        }
    
        @Override public void request(long n) {
          if (n < 0) throw new IllegalArgumentException("n < 0: " + n);
          if (n == 0) return; // Nothing to do when requesting 0.
          if (!compareAndSet(false, true)) return; // Request was already triggered.
    
          try {
            Response<T> response = call.execute();
            if (!subscriber.isUnsubscribed()) {
              subscriber.onNext(response);
            }
          } catch (Throwable t) {
            Exceptions.throwIfFatal(t);
            if (!subscriber.isUnsubscribed()) {
              subscriber.onError(t);
            }
            return;
          }
    
          if (!subscriber.isUnsubscribed()) {
            subscriber.onCompleted();
          }
        }
    
        @Override public void unsubscribe() {
          call.cancel();
        }
    
        @Override public boolean isUnsubscribed() {
          return call.isCanceled();
        }
      }
    

    Response<T> response = call.execute(); 这里进行了网络请求;
    if (!subscriber.isUnsubscribed()) { subscriber.onNext(response); } 这里进行回调。

    接下来的 onError()onCompleted() 方法的回调一样的,就不分析了。

    至此,我们就完整地了解了 Retrofit + Rxjava 中从创建 Observable 和 Observer 到 Observable 订阅 Observer 的流程,以及中间隐藏的联网和回调的过程。

    参考资料

    Retrofit 2.1 源码分析
    Java 动态代理技术
    Rxjava 源码分析一

    相关文章

      网友评论

          本文标题:Retrofit 2.1 + Rxjava 源码解析(一)

          本文链接:https://www.haomeiwen.com/subject/tzvjtxtx.html