RxJava与Retrofit是当前使用比较广泛的两个框架,很多开发者同时使用了这两个框架并以此为基础搭建了网络请求。笔者也在使用,下面介绍一下如何利用RxJava实现简单的网络请求相关回调(onStart onSuccess等方法),并提供取消网络请求方法cancelRequest()。至于Retrofit部分配置以后有时间再做具体介绍,重点在于构建Retrofit时需要设置RxJava2CallAdapterFactory,请求接口调用方法返回Observable对象。
本文主要介绍相关回调设计及网络请求取消功能,主要是RxJava层面观察者部分,直接上代码
import android.app.Dialog;
import android.content.Context;
import android.os.Handler;
import android.os.Looper;
import android.support.annotation.NonNull;
import com.dev.kit.basemodule.BuildConfig;
import com.dev.kit.basemodule.R;
import com.dev.kit.basemodule.View.NetProgressDialog;
import com.dev.kit.basemodule.netRequest.Configs.Config;
import com.dev.kit.basemodule.netRequest.subscribers.NetRequestCallback;
import com.dev.kit.basemodule.netRequest.util.OnNetProgressCancelListener;
import com.dev.kit.basemodule.result.BaseResult;
import com.dev.kit.basemodule.util.ToastUtil;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import retrofit2.HttpException;
/**
* 网络请求订阅者
* Created by cuiyan on 16/6/2 14:09
*/
public class NetRequestSubscriber<T> implements Observer<T> {
private Dialog progressDialog;
private Disposable disposable;
private NetRequestCallback<T> netRequestCallback;
private Context context;
/**
* @param netRequestCallback 网络请求回调
*/
public NetRequestSubscriber(@NonNull NetRequestCallback<T> netRequestCallback, Context context) {
this(netRequestCallback, context, false, null);
}
/**
* @param netRequestCallback 网络请求回调
* @param showProgress 是否显示网络请求加载对话框
* @param progressTip loading提示语
* @see NetProgressDialog
*/
public NetRequestSubscriber(@NonNull final NetRequestCallback<T> netRequestCallback, Context context, boolean showProgress, String progressTip) {
this.netRequestCallback = netRequestCallback;
this.context = context;
if (showProgress) {
progressDialog = NetProgressDialog.getInstance(context, progressTip, new OnNetProgressCancelListener() {
@Override
public void onCancelRequest() {
cancelRequest();
}
});
}
}
/**
* @param netRequestCallback 网络请求回调
* @param progressDialog dialog 自定义对话框
*/
public NetRequestSubscriber(@NonNull NetRequestCallback<T> netRequestCallback, Context context, @NonNull Dialog progressDialog) {
this.netRequestCallback = netRequestCallback;
this.context = context;
this.progressDialog = progressDialog;
}
@Override
public void onSubscribe(@NonNull Disposable d) {
this.disposable = d;
showProgress();
onRequestStart();
}
@Override
public synchronized void onNext(final T t) {
if (t == null) {
onRequestResultNull();
} else {
if (t instanceof BaseResult && !Config.REQUEST_SUCCESS_CODE.equals(((BaseResult) t).getCode())) {
ToastUtil.showToast(context, ((BaseResult) t).getMessage());
}
onRequestSuccess(t);
}
}
@Override
public synchronized void onError(Throwable throwable) {
dismissProgress();
onRequestError(throwable);
if (throwable instanceof HttpException) {
ToastUtil.showToast(context, ((HttpException) throwable).message() + ((HttpException) throwable).code());
} else {
if (BuildConfig.DEBUG) {
ToastUtil.showToast(context, "error:" + throwable.getMessage());
} else {
ToastUtil.showToast(context, context.getString(R.string.error_net_request_failed));
}
}
}
/**
* {@link NetRequestSubscriber#onError(Throwable)}
* {@link Observer#onError(Throwable)}
* {@link Observer#onComplete()} (Throwable)}
* 该方法与onError方法互斥
*/
@Override
public void onComplete() {
dismissProgress();
netRequestCallback.onFinish();
}
private void onRequestStart() {
if (Looper.myLooper() != context.getMainLooper()) {
Handler handler = new Handler(context.getMainLooper());
handler.post(new Runnable() {
@Override
public void run() {
netRequestCallback.onStart();
}
});
} else {
netRequestCallback.onStart();
}
}
private void onRequestSuccess(final T t) {
if (Looper.myLooper() != context.getMainLooper()) {
Handler handler = new Handler(context.getMainLooper());
handler.post(new Runnable() {
@Override
public void run() {
netRequestCallback.onSuccess(t);
}
});
} else {
netRequestCallback.onSuccess(t);
}
}
private void onRequestResultNull() {
if (Looper.myLooper() != context.getMainLooper()) {
Handler handler = new Handler(context.getMainLooper());
handler.post(new Runnable() {
@Override
public void run() {
netRequestCallback.onResultNull();
}
});
} else {
netRequestCallback.onResultNull();
}
}
private void onRequestError(final Throwable throwable) {
throwable.printStackTrace();
if (Looper.myLooper() != context.getMainLooper()) {
Handler handler = new Handler(context.getMainLooper());
handler.post(new Runnable() {
@Override
public void run() {
netRequestCallback.onError(throwable);
netRequestCallback.onFinish();
}
});
} else {
netRequestCallback.onError(throwable);
netRequestCallback.onFinish();
}
}
/**
*
*
*/
private void showProgress() {
if (progressDialog != null && !progressDialog.isShowing()) {
progressDialog.show();
}
}
private void dismissProgress() {
if (progressDialog != null && progressDialog.isShowing()) {
progressDialog.dismiss();
}
}
public void cancelRequest() {
dismissProgress();
if (disposable != null && !disposable.isDisposed()) {
disposable.dispose();
}
netRequestCallback.onCancel();
netRequestCallback.onFinish();
}
}
NetRequestSubscriber实现了Observer接口,利用Observer接口提供的以下四个方法即可实现简单的网络请求相关回调,上述回调实现的前提是构建Retrofit时要添加RxJava2CallAdapterFactory,这部分流程后续有空再聊。
/**
* Provides the Observer with the means of cancelling (disposing) the
* connection (channel) with the Observable in both
* synchronous (from within {@link #onNext(Object)}) and asynchronous manner.
* @param d the Disposable instance whose {@link Disposable#dispose()} can
* be called anytime to cancel the connection
* @since 2.0
*/
void onSubscribe(@NonNull Disposable d);
/**
* Provides the Observer with a new item to observe.
* <p>
* The {@link Observable} may call this method 0 or more times.
* <p>
* The {@code Observable} will not call this method again after it calls either {@link #onComplete} or
* {@link #onError}.
*
* @param t
* the item emitted by the Observable
*/
void onNext(@NonNull T t);
/**
* Notifies the Observer that the {@link Observable} has experienced an error condition.
* <p>
* If the {@link Observable} calls this method, it will not thereafter call {@link #onNext} or
* {@link #onComplete}.
*
* @param e
* the exception encountered by the Observable
*/
void onError(@NonNull Throwable e);
/**
* Notifies the Observer that the {@link Observable} has finished sending push-based notifications.
* <p>
* The {@link Observable} will not call this method if it calls {@link #onError}.
*/
void onComplete();
以上四个方法的作用及调用时机,api描述的已经很清晰,不再赘述,需要注意的地方是onError方法与onComplete方法互斥,当onError方法触发时,不会再触发onComplete方法。
下面我们看一下cancelRequest() ,顾名思义是要取消网络请求,那么是如何做到的?下面只做一点单介绍,定位到关键类及相关方法,说清细节还是比较麻烦的,篇幅也会比较大。
可以看到cancelRequest() 方法中调用了disposable.dispose()来取消网络请求,那么disposable到底是什么鬼?其实这个鬼是RxJava提供的类。源码如下:
/**
* Represents a disposable resource.
*/
public interface Disposable {
/**
* Dispose the resource, the operation should be idempotent.
*/
void dispose();
/**
* Returns true if this resource has been disposed.
* @return true if this resource has been disposed
*/
boolean isDisposed();
}
看Disposable源码,其实还是一头雾水的,无法直接看出是如何取消网络请求的,但其实想一想,取消网络请求实际上也不是RxJava能做到的,那应该是网络请求框架该做的事,Disposable只是个神辅而已。
前述已经说过构建Retrofit需要配置RxJava2CallAdapterFactory,是为了支持了以Observable形式观测网络请求,那么RxJava2CallAdapterFactory的作用是如何体现的,与调用Disposable.dispose()后取消网络请求有何关联?我们简单看一下RxJava2CallAdapterFactory类
该类通过工厂方法创建实例,其构造方法只有一个,如下
private RxJava2CallAdapterFactory(@Nullable Scheduler scheduler, boolean isAsync) {
this.scheduler = scheduler;
this.isAsync = isAsync;
}
RxJava2CallAdapterFactory的核心方法
public CallAdapter<?, ?> get(Type returnType, Annotation[] annotations, Retrofit retrofit) {
Class<?> rawType = getRawType(returnType);
// 此处省略n个字
return new RxJava2CallAdapter(responseType, scheduler, isAsync, isResult, isBody, isFlowable,
isSingle, isMaybe, false);
}
重点来了:RxJava2CallAdapter 及 isAsync参数,直接看RxJava2CallAdapter核心代码,如下:
@Override
public Object adapt(Call<R> call) {
************************************************重点*****************************************************
Observable<Response<R>> responseObservable = isAsync ? new CallEnqueueObservable<>(call) : new CallExecuteObservable<>(call);
*********************************************************************************************************
Observable<?> observable;
if (isResult) {
observable = new ResultObservable<>(responseObservable);
} else if (isBody) {
observable = new BodyObservable<>(responseObservable);
} else {
observable = responseObservable;
}
if (scheduler != null) {
observable = observable.subscribeOn(scheduler);
}
if (isFlowable) {
return observable.toFlowable(BackpressureStrategy.LATEST);
}
if (isSingle) {
return observable.singleOrError();
}
if (isMaybe) {
return observable.singleElement();
}
if (isCompletable) {
return observable.ignoreElements();
}
return observable;
}
看两行*号之间的重点部分, 定位到两个关键类CallEnqueueObservable和CallExecuteObservable粗略看一下两个类:
1.CallEnqueueObservable.java源码如下:
final class CallEnqueueObservable<T> extends Observable<Response<T>> {
private final Call<T> originalCall;
CallEnqueueObservable(Call<T> originalCall) {
this.originalCall = originalCall;
}
@Override protected void subscribeActual(Observer<? super Response<T>> observer) {
// Since Call is a one-shot type, clone it for each new observer.
Call<T> call = originalCall.clone();
CallCallback<T> callback = new CallCallback<>(call, observer);
observer.onSubscribe(callback);
call.enqueue(callback);
}
private static final class CallCallback<T> implements Disposable, Callback<T> {
private final Call<?> call;
private final Observer<? super Response<T>> observer;
boolean terminated = false;
CallCallback(Call<?> call, Observer<? super Response<T>> observer) {
this.call = call;
this.observer = observer;
}
@Override public void onResponse(Call<T> call, Response<T> response) {
if (call.isCanceled()) return;
try {
observer.onNext(response);
if (!call.isCanceled()) {
terminated = true;
observer.onComplete();
}
} catch (Throwable t) {
if (terminated) {
RxJavaPlugins.onError(t);
} else if (!call.isCanceled()) {
try {
observer.onError(t);
} catch (Throwable inner) {
Exceptions.throwIfFatal(inner);
RxJavaPlugins.onError(new CompositeException(t, inner));
}
}
}
}
@Override public void onFailure(Call<T> call, Throwable t) {
if (call.isCanceled()) return;
try {
observer.onError(t);
} catch (Throwable inner) {
Exceptions.throwIfFatal(inner);
RxJavaPlugins.onError(new CompositeException(t, inner));
}
}
@Override public void dispose() {
call.cancel();
}
@Override public boolean isDisposed() {
return call.isCanceled();
}
}
看到 dispose()方法没 ^ _ ^,调用了 call.cancel(),这才是关键所在
@Override
public void dispose() {
call.cancel();
}
2.CallExecuteObservable.java源码如下
final class CallExecuteObservable<T> extends Observable<Response<T>> {
private final Call<T> originalCall;
CallExecuteObservable(Call<T> originalCall) {
this.originalCall = originalCall;
}
@Override protected void subscribeActual(Observer<? super Response<T>> observer) {
// Since Call is a one-shot type, clone it for each new observer.
Call<T> call = originalCall.clone();
observer.onSubscribe(new CallDisposable(call));
boolean terminated = false;
try {
Response<T> response = call.execute();
if (!call.isCanceled()) {
observer.onNext(response);
}
if (!call.isCanceled()) {
terminated = true;
observer.onComplete();
}
} catch (Throwable t) {
Exceptions.throwIfFatal(t);
if (terminated) {
RxJavaPlugins.onError(t);
} else if (!call.isCanceled()) {
try {
observer.onError(t);
} catch (Throwable inner) {
Exceptions.throwIfFatal(inner);
RxJavaPlugins.onError(new CompositeException(t, inner));
}
}
}
}
private static final class CallDisposable implements Disposable {
private final Call<?> call;
CallDisposable(Call<?> call) {
this.call = call;
}
@Override public void dispose() {
call.cancel();
}
@Override public boolean isDisposed() {
return call.isCanceled();
}
}
重点在subscribeActual方法中第三行observer.onSubscribe(new CallDisposable(call));CallDisposable是CallExecuteObservable定义的内部类,实现了Disposable接口,dispose()方法中也是调用了call.cancel()方法来取消网络请求
付与该问相关重要java类
RxJava部分:
Observer.java
Observable.java
Disposable.java
Retrofit部分:
Retrofit.java (着重关注create方法)
OkHttpCall.java
RxJava2CallAdapterFactory.java (着重关注get方法)
RxJava2CallAdapter (着重关注adapt方法)
CallEnqueueObservable.java (着重关注subscribeActual方法及CallCallback内部类)
CallExecuteObservable.java (着重关注subscribeActual方法)
该文并未详细介绍具体流程,且由于笔者水平有限,可能有错误或不妥之处,如该文有幸被读,望赐教
另附完整示例:https://github.com/670832188/TestApp
网友评论