写这么一篇文章主要是为了解惑,我们都知道Retrofit
可以配合RxJava
一起使用,而且那种链式的调用简直了,但是一直有个疑惑:
getObservable().subscribe(new Observer<String>() {
@Override
public void onNext(String value) {
// ... 得到数据
}
})
看上面那段伪代码之后我们都知道Observable
是需要subscribe
才会真正执行的,那么Retrofit
是怎么实现这个流程的呢?不然老是能得到数据却不懂的怎么来的,所以为了解读这一脸的懵逼只能从源码中去寻找答案。
简单使用
val mRetrofit: Retrofit = Retrofit.Builder()
.baseUrl(HttpUrl.parse(Constant.URL))
.client(okHttpBuilder.build())
.addCallAdapterFactory(RxJava2CallAdapterFactory.createWithScheduler(Schedulers.io()))
.addConverterFactory(GsonConverterFactory.create(GsonBuilder().create()))
.build()
嗯嗯,只要加了RxJava2CallAdapterFactory.createWithScheduler
之后就能愉快的结合RxJava
一起使用了:
GankHttpRequest.instance.mApiService.getAndroidData()
.compose(Constant.transformer())
.subscribe(object : Consumer<MutableList<AndroidResult>> {
override fun accept(t: MutableList<AndroidResult>?) {
callback?.onHttpSuccess(t)
}
}, object : Consumer<Throwable> {
override fun accept(t: Throwable?) {
}
})
更多例子源码查看:https://github.com/Neacy/GankKotlin
总是很佩服Square
开源的项目,因为解决了很多难题,以上就是Retrofit
和RxJava
的简单效果。
开始分析
我们直接奔主题进入口开始分析mRetrofit.create(ApiService::class.java)
也就是Retrofit
中的create
方法:
public <T> T create(final Class<T> service) {
return (T) Proxy.newProxyInstance(service.getClassLoader(), new Class<?>[] { service },
new InvocationHandler() {
private final Platform platform = Platform.get();
@Override public Object invoke(Object proxy, Method method, @Nullable Object[] args)
throws Throwable {
ServiceMethod<Object, Object> serviceMethod =
(ServiceMethod<Object, Object>) loadServiceMethod(method);
OkHttpCall<Object> okHttpCall = new OkHttpCall<>(serviceMethod, args);
return serviceMethod.callAdapter.adapt(okHttpCall);
}
});
}
这里重点分析loadServiceMethod
方法,点进源码可以看到主要执行new ServiceMethod.Builder<>(this, method).build()
经过一系列折腾最后回到Retrofit
中的nextCallAdapter
方法:
public CallAdapter<?, ?> nextCallAdapter(@Nullable CallAdapter.Factory skipPast, Type returnType,
Annotation[] annotations) {
checkNotNull(returnType, "returnType == null");
checkNotNull(annotations, "annotations == null");
int start = adapterFactories.indexOf(skipPast) + 1;
for (int i = start, count = adapterFactories.size(); i < count; i++) {
CallAdapter<?, ?> adapter = adapterFactories.get(i).get(returnType, annotations, this);
if (adapter != null) {
return adapter;
}
}
}
这里主要是调用adapterFactories.get(returnType, annotations, this)
这里的adapterFactories
就是我们初始化传进来的RxJava2CallAdapterFactory
类所以很自然get
方法执行之后返回的是RxJava2CallAdapter
,很好;终于看到跟主题相关的Rx开头的类了。
执行完ServiceMethod
的初始化后代码继续走:
OkHttpCall<Object> okHttpCall = new OkHttpCall<>(serviceMethod, args);
return serviceMethod.callAdapter.adapt(okHttpCall);
首先我们明白一点serviceMethod.callAdapter
也就是我们前面返回的RxJava2CallAdapter
对象,所以自然进入该类中的adapt
方法:
@Override public Object adapt(Call<R> call) {
Observable<Response<R>> responseObservable = isAsync
? new CallEnqueueObservable<>(call)
: new CallExecuteObservable<>(call);
Observable<?> observable;
if (isResult) {
observable = new ResultObservable<>(responseObservable);
} else if (isBody) {
observable = new BodyObservable<>(responseObservable);
} else {
observable = responseObservable;
}
if (scheduler != null) {
observable = observable.subscribeOn(scheduler);
}
if (isFlowable) {
return observable.toFlowable(BackpressureStrategy.LATEST);
}
if (isSingle) {
return observable.singleOrError();
}
if (isMaybe) {
return observable.singleElement();
}
if (isCompletable) {
return observable.ignoreElements();
}
return observable;
}
Observable
好刺眼的词,这不就是RxJava
的类嘛,有点慌仿佛就要结束了。
这里简单点我们只挑CallExecuteObservable
来分析,这个类的代码不长直接贴上来看看:
final class CallExecuteObservable<T> extends Observable<Response<T>> {
private final Call<T> originalCall;
CallExecuteObservable(Call<T> originalCall) {
this.originalCall = originalCall;
}
@Override protected void subscribeActual(Observer<? super Response<T>> observer) {
// Since Call is a one-shot type, clone it for each new observer.
Call<T> call = originalCall.clone();
observer.onSubscribe(new CallDisposable(call));
boolean terminated = false;
try {
Response<T> response = call.execute();
if (!call.isCanceled()) {
observer.onNext(response);
}
if (!call.isCanceled()) {
terminated = true;
observer.onComplete();
}
} catch (Throwable t) {
Exceptions.throwIfFatal(t);
if (terminated) {
RxJavaPlugins.onError(t);
} else if (!call.isCanceled()) {
try {
observer.onError(t);
} catch (Throwable inner) {
Exceptions.throwIfFatal(inner);
RxJavaPlugins.onError(new CompositeException(t, inner));
}
}
}
}
private static final class CallDisposable implements Disposable {
private final Call<?> call;
CallDisposable(Call<?> call) {
this.call = call;
}
@Override public void dispose() {
call.cancel();
}
@Override public boolean isDisposed() {
return call.isCanceled();
}
}
}
那什么时候开始执行呢?
这时候我们需要回头看一下Retrofit.create
这里用到了动态代理
所以再invoke
中serviceMethod.callAdapter.adapt(okHttpCall)
就是把RxJava2CallAdapter
中的Observable
返回回去,所以:
当我们代码中调用subscribe
的时候会执行Observable.subscribeActual
,回头看看这方法中做了什么:
Response<T> response = call.execute();// 使用OkHttp执行接口请求
if (!call.isCanceled()) {
observer.onNext(response);
}
if (!call.isCanceled()) {
terminated = true;
observer.onComplete();
}
很轻松地我们界面就得到了Retrofit
中的Observable
发射出来的数据了然后我们就可以做任何处理了。
我们再回头看看一下RxJava2CallAdapter.adapt
方法:
Observable<?> observable;
if (isResult) {
observable = new ResultObservable<>(responseObservable);
} else if (isBody) {
observable = new BodyObservable<>(responseObservable);
} else {
observable = responseObservable;
}
if (scheduler != null) {
observable = observable.subscribeOn(scheduler);
}
看到observable被各种条件进行赋值,不过我们知道了CallExecuteObservable
这个是怎么发射数据了现在再回头看已经很清晰了。
综上:
整个流程其实就是RxJava2CallAdapterFactory
-> RxJava2CallAdapter
-> xxxxxxObservable
-> onNext
例子的源码地址: https://github.com/Neacy/GankKotlin
网友评论