这篇就补上RxJavaAdapter的支持了,末尾附上笔者总结心得
⁄(⁄ ⁄•⁄ω⁄•⁄ ⁄)⁄
介绍
RxJava现在绝对是个Android开发的大杀器了,面试的时候你要是说用过实际项目里肯定能在一定程度上加分的(๑•̀ㅂ•́)و✧
这里附上扔物线大大的文章:给 Android 开发者的 RxJava 详解
完整依赖
compile 'com.squareup.retrofit2:retrofit:2.1.0'
compile 'com.squareup.retrofit2:converter-gson:2.1.0'
compile 'com.squareup.retrofit2:adapter-rxjava:2.1.0'
compile 'io.reactivex:rxandroid:1.2.1'
public interface API{
//一般用法
//@GET("users/search/{keyword}")
//Call<List<UserBean>> searchUserList(@Path("keyword") String keyword);
//Rx用法,Observable是rx包下的,不是android.database下的
@GET("users/search/{keyword}")
Observable<List<UserBean>> searchUserList(@Path("keyword") String keyword);
}
new Retrofit.Builder()
//多加这一句
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
.build();
//发起一个网络请求
retrofit.create(API.class)
.searchUserList("张三")
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.subscribe(new Observer<List<UserBean>>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(List<UserBean> userBeen) {
}
});
入口
前文我们已经分析过,默认发起的Call
是封装成OkHttpCall
,然后由serviceMethod
里的adapter
负责处理,那么现在对应的CallAdapter
接口实现类是怎么处理呢?
其实这里我们就不是普通的司机来开了,而是一个精通排水沟漂移的老司机来开车,下面我们来领教下老司机的车技
public final class RxJavaCallAdapterFactory extends CallAdapter.Factory {
public static RxJavaCallAdapterFactory create() {
return new RxJavaCallAdapterFactory(null);
}
public static RxJavaCallAdapterFactory createWithScheduler(Scheduler scheduler) {
if (scheduler == null) throw new NullPointerException("scheduler == null");
return new RxJavaCallAdapterFactory(scheduler);
}
private final Scheduler scheduler;
private RxJavaCallAdapterFactory(Scheduler scheduler) {
this.scheduler = scheduler;
}
@Override
public CallAdapter<?> get(Type returnType, Annotation[] annotations, Retrofit retrofit) {
//可以看到这里对returnType做了一些校验
//只支持rx.Observable,rx.Single,rx.Completable
//然而笔者只认识Observable。。。
Class<?> rawType = getRawType(returnType);
String canonicalName = rawType.getCanonicalName();
boolean isSingle = "rx.Single".equals(canonicalName);
boolean isCompletable = "rx.Completable".equals(canonicalName);
if (rawType != Observable.class && !isSingle && !isCompletable) {
return null;
}
if (!isCompletable && !(returnType instanceof ParameterizedType)) {
String name = isSingle ? "Single" : "Observable";
throw new IllegalStateException(name + " return type must be parameterized"
+ " as " + name + "<Foo> or " + name + "<? extends Foo>");
}
if (isCompletable) {
return CompletableHelper.createCallAdapter(scheduler);
}
//看这里,一般情况直接到这里然后return
CallAdapter<Observable<?>> callAdapter = getCallAdapter(returnType, scheduler);
if (isSingle) {
return SingleHelper.makeSingle(callAdapter);
}
return callAdapter;
}
//这个方法实现了返回真正的CallAdapter
private CallAdapter<Observable<?>> getCallAdapter(Type returnType, Scheduler scheduler) {
//将returnType转换成内部Type
Type observableType = getParameterUpperBound(0, (ParameterizedType) returnType);
//获取到内部的Class
Class<?> rawObservableType = getRawType(observableType);
//Response的情况
if (rawObservableType == Response.class) {
if (!(observableType instanceof ParameterizedType)) {
throw new IllegalStateException("Response must be parameterized"
+ " as Response<Foo> or Response<? extends Foo>");
}
Type responseType = getParameterUpperBound(0, (ParameterizedType) observableType);
return new ResponseCallAdapter(responseType, scheduler);
}
//Result的情况
if (rawObservableType == Result.class) {
if (!(observableType instanceof ParameterizedType)) {
throw new IllegalStateException("Result must be parameterized"
+ " as Result<Foo> or Result<? extends Foo>");
}
Type responseType = getParameterUpperBound(0, (ParameterizedType) observableType);
return new ResultCallAdapter(responseType, scheduler);
}
//一般返回这个类型
return new SimpleCallAdapter(observableType, scheduler);
}
//中间暂时省略2个类。
static final class ResponseCallAdapter implements CallAdapter<Observable<?>> {
private final Type responseType;
private final Scheduler scheduler;
ResponseCallAdapter(Type responseType, Scheduler scheduler) {
this.responseType = responseType;
this.scheduler = scheduler;
}
@Override public Type responseType() {
return responseType;
}
@Override public <R> Observable<Response<R>> adapt(Call<R> call) {
Observable<Response<R>> observable = Observable.create(new CallOnSubscribe<>(call));
if (scheduler != null) {
return observable.subscribeOn(scheduler);
}
return observable;
}
}
static final class SimpleCallAdapter implements CallAdapter<Observable<?>> {
private final Type responseType;
private final Scheduler scheduler;
SimpleCallAdapter(Type responseType, Scheduler scheduler) {
this.responseType = responseType;
this.scheduler = scheduler;
}
@Override public Type responseType() {
return responseType;
}
@Override public <R> Observable<R> adapt(Call<R> call) {
Observable<R> observable = Observable.create(new CallOnSubscribe<>(call)) //
.lift(OperatorMapResponseToBodyOrError.<R>instance());
if (scheduler != null) {
return observable.subscribeOn(scheduler);
}
return observable;
}
}
static final class ResultCallAdapter implements CallAdapter<Observable<?>> {
private final Type responseType;
private final Scheduler scheduler;
ResultCallAdapter(Type responseType, Scheduler scheduler) {
this.responseType = responseType;
this.scheduler = scheduler;
}
@Override public Type responseType() {
return responseType;
}
@Override public <R> Observable<Result<R>> adapt(Call<R> call) {
Observable<Result<R>> observable = Observable.create(new CallOnSubscribe<>(call)) //
.map(new Func1<Response<R>, Result<R>>() {
@Override public Result<R> call(Response<R> response) {
return Result.response(response);
}
}).onErrorReturn(new Func1<Throwable, Result<R>>() {
@Override public Result<R> call(Throwable throwable) {
return Result.error(throwable);
}
});
if (scheduler != null) {
return observable.subscribeOn(scheduler);
}
return observable;
}
}
}
看了上面一坨源码(其实已经省略了2个类)不知道大家有没有晕,反正我是晕了(⌒▽⌒)
那么我们从入口方法开始看起
@Override
public CallAdapter<?> get(Type returnType, Annotation[] annotations, Retrofit retrofit) {
这个方法是CallAdapter.Factory
的抽象方法,那么这个抽象方法的调用入口在哪里呢,没错我们又回到了ServiceMethod
,在这个构建者模式的build方法中,对这个执行方法进行了判断。
public ServiceMethod build() {
callAdapter = createCallAdapter();
//以下省略。。。
}
private CallAdapter<?> createCallAdapter() {
Type returnType = method.getGenericReturnType();
//返回类型的校验
if (Utils.hasUnresolvableType(returnType)) {
throw methodError(
"Method return type must not include a type variable or wildcard: %s", returnType);
}
//返回是无类型的话
if (returnType == void.class) {
throw methodError("Service methods cannot return void.");
}
Annotation[] annotations = method.getAnnotations();
try {
//适配器模式
return retrofit.callAdapter(returnType, annotations);
} catch (RuntimeException e) { // Wide exception range because factories are user code.
throw methodError(e, "Unable to create call adapter for %s", returnType);
}
}
//Retrofit的方法
public CallAdapter<?> callAdapter(Type returnType, Annotation[] annotations) {
return nextCallAdapter(null, returnType, annotations);
}
//匹配到合适的Adapter
public CallAdapter<?> nextCallAdapter(CallAdapter.Factory skipPast, Type returnType,
Annotation[] annotations) {
checkNotNull(returnType, "returnType == null");
checkNotNull(annotations, "annotations == null");
int start = adapterFactories.indexOf(skipPast) + 1;
for (int i = start, count = adapterFactories.size(); i < count; i++) {
CallAdapter<?> adapter = adapterFactories.get(i).get(returnType, annotations, this);
if (adapter != null) {
return adapter;
}
}
//省略下面的异常抛出代码
}
当当当当,看官可以看到,这里对adapterFactories
进行了遍历,这个adapterFactories
就是在我们给Retrofit添加的Adapter支持库,默认带有一个ExecutorCallAdapterFactory
,好了那么调用的入口有了,我们也不用害怕了。

深入
ExecutorCallAdapterFactory
在我的上一篇文章中讲到内部就简单的用Handler Post到主线程来实现了。
在上文的get
方法我们略过rx.Single,rx.Completable,只分析rx.Observable才不是我不会呢,在最后有一句
CallAdapter<Observable<?>> callAdapter = getCallAdapter(returnType, scheduler);
getCallAdapter
的实现细节代码就不再次帖了,省得你们说我凑字数。在实际开发中我们都会对接口返回的数据做一层包装,例如咱家的后台给的,我们自然要一层封装
public class ResponseBean<T> {
private int status;
private String msg;
private T data;
private long time;
}
那么我们在API中会这样声明一个接口
public interface API{
//注意这个Call不是OkHttp3的Call
@GET("users/search/{keyword}")
Observable<ResponseBean<List<UserBean>>> searchUserList(@Path("keyword") String keyword);
}
在getCallAdapter
方法第一行我们看到了
Type observableType = getParameterUpperBound(0, (ParameterizedType) returnType);
这个方法呢,是将返回的类型中的泛型参数作为Type获取出来,
有关Type这个类笔者写了一个类来测试
public class Test {
public static void main(String[] args) {
Method method = null;
try {
Test t=new Test();
method = t.getClass().getMethod("demoApi");
} catch (NoSuchMethodException e) {
e.printStackTrace();
}
Observable<ApplePen<Apple, Pen>> observable = demoApi();
Type returnType = method.getGenericReturnType();
Type observableType = getParameterUpperBound(0, (ParameterizedType) returnType);
System.out.println("Method的返回类型:" + returnType);
System.out.println("Observable的类型" + observable.getClass().getGenericSuperclass());
System.out.println("Observable的内部类型" + observableType);
}
/**
* 获取参数化的类型
* @param index 位置索引
* @param type
* @return
*/
public static Type getParameterUpperBound(int index, ParameterizedType type) {
Type[] types = type.getActualTypeArguments();
if (index < 0 || index >= types.length) {
throw new IllegalArgumentException(
"Index " + index + " not in range [0," + types.length + ") for " + type);
}
Type paramType = types[index];
if (paramType instanceof WildcardType) {
//通配符类型 <?>这种
return ((WildcardType) paramType).getUpperBounds()[0];
}
return paramType;
}
/**
* 模拟API中的网络接口
* @return
*/
public static Observable<ApplePen<Apple, Pen>> demoApi() {
ApplePen<Apple, Pen> applePen = new ApplePen<>();
return Observable.just(applePen);
}
public static class Apple {
}
public static class Pen {
}
public static class ApplePen<T1, T2> {
}
}
最后输出的结果
Method的返回类型:rx.Observable<com.tk.test.Test.com.tk.test.Test$ApplePen<com.tk.test.Test$Apple, com.tk.test.Test$Pen>>
Observable的类型rx.Observable<T>
Observable的内部类型com.tk.test.Test.com.tk.test.Test$ApplePen<com.tk.test.Test$Apple, com.tk.test.Test$Pen>
相信到这里,大家应该也看懂了,getCallAdapter
这个方法本质是将我们接口定义的Type类型构造出了CallAdapter,在API中定义的类型匹配如下:
Observable<Response<T>>,对应ResponseCallAdapter
Observable<Result<T>>,对应ResultCallAdapter
Observable<其他类型>,对应SimpleCallAdapter
一般我们都是根据项目封装自己的实体Bean,那我们来看看SimpleCallAdapter
司机的开车方法adapt()
,可以在源码看到其实方法很简单。
@Override public <R> Observable<R> adapt(Call<R> call) {
Observable<R> observable = Observable.create(new CallOnSubscribe<>(call)) //
.lift(OperatorMapResponseToBodyOrError.<R>instance());
if (scheduler != null) {
return observable.subscribeOn(scheduler);
}
return observable;
}
默认scheduler
调度器为空,我们也可以这样来让所有API中的Rx接口统一在io线程中处理。
new Retrofit.Builder()
.addCallAdapterFactory(RxJavaCallAdapterFactory.createWithScheduler(Schedulers.io()))
.build();
那么我们顺藤摸瓜来看看
//RxJavaCallAdapterFactory的静态内部类
static final class CallOnSubscribe<T> implements Observable.OnSubscribe<Response<T>> {
private final Call<T> originalCall;
CallOnSubscribe(Call<T> originalCall) {
this.originalCall = originalCall;
}
@Override public void call(final Subscriber<? super Response<T>> subscriber) {
// Since Call is a one-shot type, clone it for each new subscriber.
Call<T> call = originalCall.clone();
// Wrap the call in a helper which handles both unsubscription and backpressure.
RequestArbiter<T> requestArbiter = new RequestArbiter<>(call, subscriber);
subscriber.add(requestArbiter);
subscriber.setProducer(requestArbiter);
}
}
注意这里需要一定的RxJava基础了,不过就既然能坚持看到这里应该都是dalao了(;¬_¬)
可以看出主要逻辑在CallOnSubscribe
这个类中实现,然后经过一个lift
操作符变换,当Http响应非[200,300)时抛出一个HttpException
异常,非常简单是不是,接下来我们来会会CallOnSubscribe
,其实它也就做了三件事:
-
clone 了原来的 call,因为 okhttp3.Call 只能用一次。
-
实例化了一个叫 RequestArbiter 的类,翻译出来叫请求仲裁者,内部实现了Subscription和Producer接口,继承于AtomicBoolean。
-
把producer设置进subscriber。
Ps:AtomicBoolean,用于线程更高并发的场景,对变量赋值和方法提供原子性操作,通俗来讲就是加锁,不知道有没有说错:-)
那么我们来看看RequestArbiter
的Producer
的具体方法实现
@Override
public void request(long n) {
if (n < 0) throw new IllegalArgumentException("n < 0: " + n);
if (n == 0) return; // Nothing to do when requesting 0.
if (!compareAndSet(false, true)) return; // Request was already triggered.
try {
//同步发起网络请求
Response<T> response = call.execute();
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(response);
}
} catch (Throwable t) {
Exceptions.throwIfFatal(t);
if (!subscriber.isUnsubscribed()) {
subscriber.onError(t);
}
return;
}
if (!subscriber.isUnsubscribed()) {
subscriber.onCompleted();
}
}
Producer在RxJava1.0中是一个可以向上游请求数据的类,RxJava2已经出了,据说有不小的改动,笔者这里就照自己的理解抛砖引玉了。
大部分情况下Subscriber(比如订报纸的用户) 都是被动接受Observable(报刊)发出的数据,但要是Observable发得太快(新闻太多),Subscriber 处理不过来(用户看不过来),在应对这种场景,RxJava诞生了一种Subscriber主动pull的机制,通过给Subscriber设置Producer(感兴趣的内容推送),通过 Subscriber#setProducer 方法,Subscriber就会通过Producer向Observable根据自己的能力来请求数据,通过 Producer#request 方法,在Producer收到请求之后在推给上游,上游再根据请求的量给 Subscriber 发数据(比如对编程感兴趣)
总结
分析到这里其实Retrofit的源码已经可见一斑了,最后我们整体来过一遍,开始高能
- 定义API接口
- 配置Retrofit参数
- 发起动态代理请求
- 将接口的的所有定义赋值成ServiceMethod
- 在ServiceMethod#build方法对callAdapter,responseType,responseConverter以及其他参数的初始化
- 将调用接口的参数args和ServiceMethod打包成OkHttpCall
- 通过ServiceMethod的CallAdapter以适配器模式发起请求
- 适配器的匹配模式根据接口的返回类型比那里支持库来匹配
- 在网络响应后都会调用OkHttpCall的实现方法,execute同步,enqueue异步
- 在网络返回后OkHttpCall会调用parseResponse方法,本质是通过ServiceMethod的responseConverter来解析成泛型T
- 将对应的及结果返回给调用端
Retrofit完美的完成了OkHttp对其他扩展功能的松耦合,最后附上一张个人理解的图,如有错误,敬请指正!

“你们要进窄门。因为引到灭亡,那门是宽的,路是大的,进去的人也多;引到永生,那门是窄的,路是小的,找着的人也少。” (马太福音 7:13-14 和合本)
网友评论