Retrofit
Retrofit发起一个简单地网络请求
导入retrofit
compile 'com.squareup.retrofit2:retrofit:2.3.0'
compile 'com.squareup.retrofit2:converter-gson:2.3.0'
compile ('com.squareup.retrofit2:adapter-rxjava:2.0.0'){
exclude group: 'io.reactivex', module: 'rxjava'
}//支持rxjava
compile ('io.reactivex:rxandroid:1.2.1'){
exclude group: 'io.reactivex', module: 'rxjava'
}
新建一个api
public interface Api {
@GET("api/v1/abc/def")
Observable<ResponseBody> getTotal();
@GET("api/v1/mnl/xyz")
Observable<ResponseBody> getMoneyTotal();
}
初始化Retrofit和Okhttp
初始化Okhttp
private OkHttpClient initOkHttpClient(){
OkHttpClient client = new OkHttpClient.Builder()
.addInterceptor(new Interceptor() {
@Override
public Response intercept(Chain chain) throws IOException {
Request r = chain.request().newBuilder()
.addHeader("x-device-info","HUAWEI/7.0/").build();
return chain.proceed(r);
}
}).build();
return client;
}
初始化Retrofit
Retrofit retrofit = new Retrofit.Builder()
.baseUrl("http://****.com/")
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
.client(initOkHttpClient())
.build();
RxJavaCallAdapterFactory.createWithScheduler 接受Scheduler
可以在发起请求的时候指定Schedulers.io();
发起网络请求
Api api = retrofit.create(Api.class);
Observable<ResponseBody> observable = api.getTotal();
observable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Subscriber<ResponseBody>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onNext(ResponseBody responseBody) {
try {
Log.e(TAG, "call: "+ new String(responseBody.bytes()));
} catch (IOException e) {
e.printStackTrace();
}
}
});
看完上面代码产生以下疑问
- API 接口怎么映射到对应的请求
retrofit.create(Api.class)
-
ConverterFactory
和ConverterFactory
怎么工作的 - Retrofit怎么支持Rxjava的
解决问题最粗暴的方法断点跟踪,大致跟下代码。
Copy源代码到工程里面,导入对应的包
compile 'com.squareup.okhttp3:okhttp:3.2.0'
compile 'com.google.code.gson:gson:2.6.1'
compile 'io.reactivex:rxjava:1.2.1'
compile 'com.google.code.findbugs:jsr305:3.0.0'
流程图
- 先来看看create(Api.class)的代码
public <T> T create(final Class<T> service) {
return (T) Proxy.newProxyInstance(service.getClassLoader(), new Class<?>[] { service },
new InvocationHandler() {
private final Platform platform = Platform.get();
@Override public Object invoke(Object proxy, Method method, @Nullable Object[] args)
throws Throwable {
Log.e(TAG, "invoke: "+ method.getName() );
// If the method is a method from Object then defer to normal invocation.
if (method.getDeclaringClass() == Object.class) {
return method.invoke(this, args);
}
ServiceMethod<Object, Object> serviceMethod =
(ServiceMethod<Object, Object>) loadServiceMethod(method);
OkHttpCall<Object> okHttpCall = new OkHttpCall<>(serviceMethod, args);
return serviceMethod.callAdapter.adapt(okHttpCall);
}
});
}
这里用到了动态代理模式Proxy.newProxyInstance
每次调用到Api
中的方法都会回调InvocationHandler
中的invoke
打印出来的log如下
image-55fe26-1514379291279)]
我们发现有
toString
方法也打印出来了,如果是Object的方法就直接返回当前方法自己
-
下来是loadMethod(Method method) 返回一个
Adapts an invocation of an interface method into an HTTP call
映射一个接口的调用到网络请求
看到这个基本就能想到loadMethod是通过method初始化一个ServiceMethodCallAdapter<R, T>
Adapts a {@link Call} with response type {@code R} into the type of {@code T}. Instances are
created by {@linkplain Factory a factory} which is
{@linkplain Retrofit.Builder#addCallAdapterFactory(Factory) installed} into the {@link Retrofit}
instance.//Retrofit.loadMethod result = new ServiceMethod.Builder<>(this, method).build();// this是retrofit
// ServiceMthod.Builder Builder(Retrofit retrofit, Method method) { this.retrofit = retrofit; this.method = method; this.methodAnnotations = method.getAnnotations(); //获取方法的参数类型 this.parameterTypes = method.getGenericParameterTypes(); //获取参数的注解 this.parameterAnnotationsArray = method.getParameterAnnotations(); }
build中主要完成以下功能
- 通过
createCallAdapter()
初始化callAdapter - 通过
createResponseConverter
初始化responseConverter - 解析方法上面的注解
parseMethodAnnotation
- httpMethod @GET @POST
- relativeUrl
- headers
- 解析参数的注解
parseParameter
- 通过
-
新建一个OkHttpCall extens Call
Call
An invocation of a Retrofit method that sends a request to a webserver and returns a response.
Each call yields its own HTTP request and response pair. Use {@link #clone} to make multiple
calls with the same parameters to the same webserver; this may be used to implement polling or
to retry a failed call.
<p>Calls may be executed synchronously with {@link #execute}, or asynchronously with {@link
enqueue}. In either case the call can be canceled at any time with {@link cancel}. A call that
is busy writing its request or reading its response may receive a {@link IOException}; this is
working as designed.
@param <T> Successful response body type.
public interface Call<T> extends Cloneable {
/**
* Synchronously send the request and return its response.
*
* @throws IOException if a problem occurred talking to the server.
* @throws RuntimeException (and subclasses) if an unexpected error occurs creating the request
* or decoding the response.
*/
Response<T> execute() throws IOException;
/**
* Asynchronously send the request and notify {@code callback} of its response or if an error
* occurred talking to the server, creating the request, or processing the response.
*/
void enqueue(Callback<T> callback);
/**
* Returns true if this call has been either {@linkplain #execute() executed} or {@linkplain
* #enqueue(Callback) enqueued}. It is an error to execute or enqueue a call more than once.
*/
boolean isExecuted();
/**
* Cancel this call. An attempt will be made to cancel in-flight calls, and if the call has not
* yet been executed it never will be.
*/
void cancel();
/** True if {@link #cancel()} was called. */
boolean isCanceled();
/**
* Create a new, identical call to this one which can be enqueued or executed even if this call
* has already been.
*/
Call<T> clone();
/** The original HTTP request. */
Request request();
}
重点注意execute
这个真正发起请求的方法(代码在下面),而request
仅仅是原生http请求时调用
OkHttpCall<Object> okHttpCall = new OkHttpCall<>(serviceMethod, args);
return serviceMethod.callAdapter.adapt(okHttpCall);
通过RxJavaCallAdapter.adapt
@Override public Object adapt(Call<R> call) {
OnSubscribe<Response<R>> callFunc = isAsync
? new CallEnqueueOnSubscribe<>(call)
: new CallExecuteOnSubscribe<>(call);
OnSubscribe<?> func;
if (isResult) {
func = new ResultOnSubscribe<>(callFunc);
} else if (isBody) {
func = new BodyOnSubscribe<>(callFunc);
} else {
func = callFunc;
}
Observable<?> observable = Observable.create(func);
if (scheduler != null) {
observable = observable.subscribeOn(scheduler);
}
if (isSingle) {
return observable.toSingle();
}
if (isCompletable) {
return observable.toCompletable();
}
return observable;
}
定义一个CallExecuteOnSubscribe
,当Observeable被订阅,回调call方法发起网络请求
同时这也可以为Observale指定线程类型
final class CallExecuteOnSubscribe<T> implements OnSubscribe<Response<T>> {
private final Call<T> originalCall;
CallExecuteOnSubscribe(Call<T> originalCall) {
this.originalCall = originalCall;
}
@Override public void call(Subscriber<? super Response<T>> subscriber) {
// Since Call is a one-shot type, clone it for each new subscriber.
Call<T> call = originalCall.clone();
CallArbiter<T> arbiter = new CallArbiter<>(call, subscriber);
subscriber.add(arbiter);
subscriber.setProducer(arbiter);
Response<T> response;
try {
response = call.execute();
} catch (Throwable t) {
Exceptions.throwIfFatal(t);
arbiter.emitError(t);
return;
}
arbiter.emitResponse(response);
}
}
/**OKHttpCall**/
@Override public Response<T> execute() throws IOException {
okhttp3.Call call;
synchronized (this) {
if (executed) throw new IllegalStateException("Already executed.");
executed = true;
.......
call = rawCall;
if (call == null) {
call = rawCall = createRawCall();
}
}
if (canceled) {
call.cancel();
}
return parseResponse(call.execute());
}
private okhttp3.Call createRawCall() throws IOException {
Request request = serviceMethod.toRequest(args);
okhttp3.Call call = serviceMethod.callFactory.newCall(request);
return call;
}
这里的callFactory就是我们初始化的OkhttpClient
- response = call.execute() 是阻塞的,这个是在那个线程里面跑,这个线程什么时候开启的?
- CallArbiter deliverResponse中会分别调用 subscriber.onNext(response)和subscriber.onComplete()。按我的理解他应该直接回调MainActivity中的Subscriber.onNext
顺序如下
CallArbiter----deliverResponse---onNext
MainActivity---Subscriber--------onNext
CallArbiter----deliverResponse---onComplete
MainActivity---Subscriber--------onComplete
加上线程可以看出因为不在一个线程里面。
MainActivity------------------开始调用API--------------- ---------------------main
Retrofit.InvocationHandler----invoke 调用;getTotal---------------------------main
ServiceMethod.Builder---------createCallAdapter------------------------------main
ServiceMethod.Builder---------createResponseConverter------------------------main
ServiceMethod.Builder---------parseMethodAnnotation -------------------------main
RxJavaCallAdapter-------------adapt -----------------------------------------main
MainActivity------------------调用API结束-------------------------------------main
MainActivity------------------已经添加订阅者-----------------------------------main
CallExecuteOnSubscribe--------call -------------------------------RxIoScheduler-2
CallArbiter-------------------request---(state=0)-----------------RxIoScheduler-2
OkHttpCall--------------------execute ----------------------------RxIoScheduler-2
CallArbiter-------------------emitResponse----(state=1)-----------RxIoScheduler-2
CallArbiter-------------------request.deliverResponse.onNext -----RxIoScheduler-2
CallArbiter-------------------equest.deliverResponse.onCompleted---RxIoScheduler-2
MainActivity.Subscriber-------onNext-----------------------------------------main
MainActivity.Subscriber-------onCompleted------------------------------------main
看到这个log基本上面那个问题:什么时候切换的线程就清楚了,当Observable添加到订阅者后,切换线程。
- CallArbiter是干嘛的
final class CallArbiter<T> extends AtomicInteger implements Subscription, Producer {
CallArbiter(Call<T> call, Subscriber<? super Response<T>> subscriber) {
super(STATE_WAITING);
this.call = call;
this.subscriber = subscriber;
}
@Override
public void request(long amount) {
if (amount == 0) {
return;
}
while (true) {
int state = get();
switch (state) {
case STATE_WAITING:
if (compareAndSet(STATE_WAITING, STATE_REQUESTED)) {
return;
}
break; // State transition failed. Try again.
case STATE_HAS_RESPONSE:
if (compareAndSet(STATE_HAS_RESPONSE, STATE_TERMINATED)) {
deliverResponse(response);
return;
}
break; // State transition failed. Try again.
......
}
}
}
void emitResponse(Response<T> response) {
while (true) {
int state = get();
switch (state) {
case STATE_WAITING:
this.response = response;
if (compareAndSet(STATE_WAITING, STATE_HAS_RESPONSE)) {
return;
}
break; // State transition failed. Try again.
case STATE_REQUESTED:
if (compareAndSet(STATE_REQUESTED, STATE_TERMINATED)) {
deliverResponse(response);
return;
}
break; // State transition failed. Try again.
.......
}
}
}
private void deliverResponse(Response<T> response) {
try {
if (!isUnsubscribed()) {
subscriber.onNext(response);
}
}
......
try {
if (!isUnsubscribed()) {
subscriber.onCompleted();
}
} .......
}
}
这个类继承Producer,查看下这个接口的定义
Interface that establishes a request-channel between an Observable and a Subscriber and allows
the Subscriber to request a certain amount of items from the Observable (otherwise known as
backpressure).
大致意思是在Observable和Subscriber之间建立一个请求通道,允许订阅者从可观察对象处请求大量项目
当Observable注册到Subscriber上时,会调用request.
public interface Producer {
/**
* Request a certain maximum number of items from this Producer. This is a way of requesting backpressure.
* To disable backpressure, pass {@code Long.MAX_VALUE} to this method.
* <p>
* Requests are additive but if a sequence of requests totals more than {@code Long.MAX_VALUE} then
* {@code Long.MAX_VALUE} requests will be actioned and the extras <i>may</i> be ignored. Arriving at
* {@code Long.MAX_VALUE} by addition of requests cannot be assumed to disable backpressure. For example,
* the code below may result in {@code Long.MAX_VALUE} requests being actioned only.**/
void request(long n);
}//rxjava 中 Observeable是怎么注册到Subscriber上的
通过request把当前状态设置为STATE_REQUESTED
在手动调用emitResponse时,执行deliverResonse,完成回调
继承Subscription
是为了把Subscriber的取消映射到Call中
继承AtomicInteger
为了线程安全,按照Java编程思想里面说的,如果不是JVM的大神请不要使用这个类。这里就知道干嘛就好了
基本先这样吧,还有几个地方不太清楚,以后再补吧。
1.网络错误怎么处理的
2.超时了,怎么发起请求
网友评论