美文网首页Android技术知识Android开发经验谈Android开发
Android开源库Retrofit中RxJava的工作流程分析

Android开源库Retrofit中RxJava的工作流程分析

作者: 左手木亽 | 来源:发表于2018-07-06 16:00 被阅读41次

    写这么一篇文章主要是为了解惑,我们都知道Retrofit可以配合RxJava一起使用,而且那种链式的调用简直了,但是一直有个疑惑:

    getObservable().subscribe(new Observer<String>() {
           @Override
           public void onNext(String value) {
               // ... 得到数据
           }
       })
    

    看上面那段伪代码之后我们都知道Observable是需要subscribe才会真正执行的,那么Retrofit是怎么实现这个流程的呢?不然老是能得到数据却不懂的怎么来的,所以为了解读这一脸的懵逼只能从源码中去寻找答案。

    简单使用

    val mRetrofit: Retrofit = Retrofit.Builder()
            .baseUrl(HttpUrl.parse(Constant.URL))
            .client(okHttpBuilder.build())
            .addCallAdapterFactory(RxJava2CallAdapterFactory.createWithScheduler(Schedulers.io()))
            .addConverterFactory(GsonConverterFactory.create(GsonBuilder().create()))
            .build()
    

    嗯嗯,只要加了RxJava2CallAdapterFactory.createWithScheduler之后就能愉快的结合RxJava一起使用了:

    GankHttpRequest.instance.mApiService.getAndroidData()
            .compose(Constant.transformer())
            .subscribe(object : Consumer<MutableList<AndroidResult>> {
                override fun accept(t: MutableList<AndroidResult>?) {
                    callback?.onHttpSuccess(t)
                }
            }, object : Consumer<Throwable> {
                override fun accept(t: Throwable?) {
                }
            })
    

    更多例子源码查看:https://github.com/Neacy/GankKotlin
    总是很佩服Square开源的项目,因为解决了很多难题,以上就是RetrofitRxJava的简单效果。

    开始分析

    我们直接奔主题进入口开始分析mRetrofit.create(ApiService::class.java)也就是Retrofit中的create方法:

      public <T> T create(final Class<T> service) {
        return (T) Proxy.newProxyInstance(service.getClassLoader(), new Class<?>[] { service },
            new InvocationHandler() {
              private final Platform platform = Platform.get();
              @Override public Object invoke(Object proxy, Method method, @Nullable Object[] args)
                  throws Throwable {
                ServiceMethod<Object, Object> serviceMethod =
                    (ServiceMethod<Object, Object>) loadServiceMethod(method);
                OkHttpCall<Object> okHttpCall = new OkHttpCall<>(serviceMethod, args);
                return serviceMethod.callAdapter.adapt(okHttpCall);
              }
            });
      }
    

    这里重点分析loadServiceMethod方法,点进源码可以看到主要执行new ServiceMethod.Builder<>(this, method).build()经过一系列折腾最后回到Retrofit中的nextCallAdapter方法:

    public CallAdapter<?, ?> nextCallAdapter(@Nullable CallAdapter.Factory skipPast, Type returnType,
        Annotation[] annotations) {
      checkNotNull(returnType, "returnType == null");
      checkNotNull(annotations, "annotations == null");
    
      int start = adapterFactories.indexOf(skipPast) + 1;
      for (int i = start, count = adapterFactories.size(); i < count; i++) {
        CallAdapter<?, ?> adapter = adapterFactories.get(i).get(returnType, annotations, this);
        if (adapter != null) {
          return adapter;
        }
      }
    }
    

    这里主要是调用adapterFactories.get(returnType, annotations, this)这里的adapterFactories就是我们初始化传进来的RxJava2CallAdapterFactory类所以很自然get方法执行之后返回的是RxJava2CallAdapter,很好;终于看到跟主题相关的Rx开头的类了。

    执行完ServiceMethod的初始化后代码继续走:

    OkHttpCall<Object> okHttpCall = new OkHttpCall<>(serviceMethod, args);
    return serviceMethod.callAdapter.adapt(okHttpCall);
    

    首先我们明白一点serviceMethod.callAdapter也就是我们前面返回的RxJava2CallAdapter对象,所以自然进入该类中的adapt方法:

      @Override public Object adapt(Call<R> call) {
        Observable<Response<R>> responseObservable = isAsync
            ? new CallEnqueueObservable<>(call)
            : new CallExecuteObservable<>(call);
    
        Observable<?> observable;
        if (isResult) {
          observable = new ResultObservable<>(responseObservable);
        } else if (isBody) {
          observable = new BodyObservable<>(responseObservable);
        } else {
          observable = responseObservable;
        }
    
        if (scheduler != null) {
          observable = observable.subscribeOn(scheduler);
        }
    
        if (isFlowable) {
          return observable.toFlowable(BackpressureStrategy.LATEST);
        }
        if (isSingle) {
          return observable.singleOrError();
        }
        if (isMaybe) {
          return observable.singleElement();
        }
        if (isCompletable) {
          return observable.ignoreElements();
        }
        return observable;
      }
    

    Observable好刺眼的词,这不就是RxJava的类嘛,有点慌仿佛就要结束了。
    这里简单点我们只挑CallExecuteObservable来分析,这个类的代码不长直接贴上来看看:

    final class CallExecuteObservable<T> extends Observable<Response<T>> {
      private final Call<T> originalCall;
    
      CallExecuteObservable(Call<T> originalCall) {
        this.originalCall = originalCall;
      }
    
      @Override protected void subscribeActual(Observer<? super Response<T>> observer) {
        // Since Call is a one-shot type, clone it for each new observer.
        Call<T> call = originalCall.clone();
        observer.onSubscribe(new CallDisposable(call));
    
        boolean terminated = false;
        try {
          Response<T> response = call.execute();
          if (!call.isCanceled()) {
            observer.onNext(response);
          }
          if (!call.isCanceled()) {
            terminated = true;
            observer.onComplete();
          }
        } catch (Throwable t) {
          Exceptions.throwIfFatal(t);
          if (terminated) {
            RxJavaPlugins.onError(t);
          } else if (!call.isCanceled()) {
            try {
              observer.onError(t);
            } catch (Throwable inner) {
              Exceptions.throwIfFatal(inner);
              RxJavaPlugins.onError(new CompositeException(t, inner));
            }
          }
        }
      }
    
      private static final class CallDisposable implements Disposable {
        private final Call<?> call;
    
        CallDisposable(Call<?> call) {
          this.call = call;
        }
    
        @Override public void dispose() {
          call.cancel();
        }
    
        @Override public boolean isDisposed() {
          return call.isCanceled();
        }
      }
    }
    

    那什么时候开始执行呢?
    这时候我们需要回头看一下Retrofit.create这里用到了动态代理所以再invokeserviceMethod.callAdapter.adapt(okHttpCall)就是把RxJava2CallAdapter中的Observable返回回去,所以:

    当我们代码中调用subscribe的时候会执行Observable.subscribeActual,回头看看这方法中做了什么:

    Response<T> response = call.execute();// 使用OkHttp执行接口请求
    if (!call.isCanceled()) {
      observer.onNext(response);
    }
    if (!call.isCanceled()) {
      terminated = true;
      observer.onComplete();
    }
    

    很轻松地我们界面就得到了Retrofit中的Observable发射出来的数据了然后我们就可以做任何处理了。
    我们再回头看看一下RxJava2CallAdapter.adapt方法:

    Observable<?> observable;
    if (isResult) {
      observable = new ResultObservable<>(responseObservable);
    } else if (isBody) {
      observable = new BodyObservable<>(responseObservable);
    } else {
      observable = responseObservable;
    }
    
    if (scheduler != null) {
      observable = observable.subscribeOn(scheduler);
    }
    

    看到observable被各种条件进行赋值,不过我们知道了CallExecuteObservable这个是怎么发射数据了现在再回头看已经很清晰了。

    综上:
    整个流程其实就是RxJava2CallAdapterFactory -> RxJava2CallAdapter -> xxxxxxObservable -> onNext

    例子的源码地址: https://github.com/Neacy/GankKotlin

    相关文章

      网友评论

        本文标题:Android开源库Retrofit中RxJava的工作流程分析

        本文链接:https://www.haomeiwen.com/subject/iahbuftx.html