-
为什么要使用RxJava处理Retrofit网络请求
Retrofit的调用过程我们前面文章已经整理过了,对于Android来讲呢,绝大部分的网络请求任务都是需要回调操作进行UI修改的,Retrofit网络请求底层是socket通信,因为网络的不确定性所以是阻塞性的,那么网络请求的工作就要放在子线程里去做,比如下面这行代码:
List<User> users = service.groupList(1001).execute().body();
我们需要把这块代码放进子线程中去调用,然后调用定义在主线程的方法去更新UI,这块工作实现起来也不麻烦,但是看起来没有那么优雅且需要自己维护线程间的通信。
RxJava是一个优秀的基于观察者模式的异步调用框架,结合Retrofit可以完成优雅的网络请求回调任务。
-
结合ViewModel中调用应用场景
实际开发中,为了解除耦合,MVC模式下我们通常会把网络请求的代码放在ViewModel中,下面的代码可以全部用kotlin或者java实现,这里都用到了而已。看下面的调用代码:
BaseRequest req = new BaseRequest(); MyServiceImpl.getData(req, lifecycleProvider) .subscribe(new HttpObserver<MyResponse, BaseRequest>(req, RxApiManager.HttpTaskId.TASK_ID_FLIGHT_DETAILS_FLIGHTWEATHERINFO, getIViewModelService()) { @Override public void onSuccess(@NonNull MyResponse response) { dataBeanData.postValue(response); } @Override public void onFailure(@NotNull BaseResponse response) { dataBeanData.postValue(null); } });
getData内容下面会讲到,这里先知道它返回一个Observable对象,然后通过subscribe方法和Observer联系起来。这里的dataBeanData是一个MutableLiveData,它会通过post方法把数据发送到主线程。
看一下getData:
public static Observable<MyResponse> getData(BaseRequest request, @NonNull LifecycleProvider lifecycle) { return RxObservable.INSTANCE.getObservable(new MyApi().getData(request), lifecycle); }
RxObservable.INSTANCE.getObservable:
fun <T> getObservable(@NonNull apiObservable: Observable<T>, @NonNull lifecycle: LifecycleProvider<*>): Observable<T> { return apiObservable .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .compose<T>(lifecycle.bindToLifecycle()) }
apiObservable是什么?通过new MyApi().getData(request)获得:
fun getData(request: BaseRequest): Observable<MyResponse> { return mApi.getData(getRequestBody(request)) }
mApi是:
private var mApi: MyService = getApiService(MyService::class.java)
getApiService是其父类中的方法:
fun <T> getApiService(apiService: Class<T>): T { return HttpCall.getInstance().client.httpClient.create(apiService) }
HttpCall是自定义的扩展工具类:
public class HttpCall { private static HttpCall call; private HttpClient client; private HttpCall() { client = new HttpClient() { @Override protected String setBaseUrl() { LogUtils.v(this, "client.getUrl() = "+HttpUrlUtil.getInstance().getUrl()); return HttpUrlUtil.getInstance().getUrl(); } }; } public static HttpCall getInstance() { if (call == null) { synchronized (HttpCall.class) { if (call == null) { call = new HttpCall(); } } } return call; } public HttpClient getClient() { return client; } }
它的client是自定义的HttpClient:
abstract class HttpClient { private lateinit var client: Retrofit val httpClient: Retrofit get() { if (!::client.isInitialized) { val okHttpClientBuilder = OkHttpClient.Builder() .retryOnConnectionFailure(true) .addInterceptor(HeaderInterceptor()) .addInterceptor(LoggingInterceptor()) .addInterceptor(TransactionIdInterceptor()) .addInterceptor(EncodeRequestInterceptor()) .addInterceptor(DecodeResponseInterceptor()) .addInterceptor(DynamicTimeoutInterceptor()) .hostnameVerifier(SafeHostnameVerifier()) val sslSocketFactory = SslContextFactory.getSSLSocketFactory() if (sslSocketFactory != null) { okHttpClientBuilder.sslSocketFactory(sslSocketFactory, CustomTrustManager()) } val okHttpClient = okHttpClientBuilder.build() client = Retrofit.Builder() .baseUrl(setBaseUrl()) .client(okHttpClient) .addConverterFactory(GsonConverterFactory.create(CustomGson.buildGson())) .addCallAdapterFactory(RxJava2CallAdapterFactory.create()) .build() } return client } /** * base url 统一以/http * * @return String */ protected abstract fun setBaseUrl(): String }
可以看到,这个类封装了创建Retrofit对象的工作。
所以mApi其实就是Retrofit对象创建的MyService的代理对象,那么mApi.getData就是MyService里面的getData方法:
@POST(Api.RELATIVE_URL) Observable<MyResponse> queryFlightList(@Body RequestBody requestBody);
Api.RELATIVE_URL是一个相对的url地址,结合前面的baseUrl组合成完整的请求url。
到这一步就相当于Retrofit的invoke方法执行完返回Call了,那这里为什么是返回的Observable<MyResponse>呢?
还记得我们前面分析Retrofit源码的时候有一个addCallAdapterFactory的api吗,这里添加的是RxJava2CallAdapterFactory.create(),返回了RxJava2CallAdapterFactory,这就是原因。
这还得从Retrofit源码说起,HttpServiceMethod的parseAnnotations中返回了一个CallAdapted,构造它的方法入参有一个callAdapter,这个callAdapter通过createCallAdapter创建:
public CallAdapter<?, ?> nextCallAdapter( @Nullable CallAdapter.Factory skipPast, Type returnType, Annotation[] annotations) { Objects.requireNonNull(returnType, "returnType == null"); Objects.requireNonNull(annotations, "annotations == null"); int start = callAdapterFactories.indexOf(skipPast) + 1; for (int i = start, count = callAdapterFactories.size(); i < count; i++) { CallAdapter<?, ?> adapter = callAdapterFactories.get(i).get(returnType, annotations, this); if (adapter != null) { return adapter; } } StringBuilder builder = new StringBuilder("Could not locate call adapter for ").append(returnType).append(".\n"); if (skipPast != null) { builder.append(" Skipped:"); for (int i = 0; i < start; i++) { builder.append("\n * ").append(callAdapterFactories.get(i).getClass().getName()); } builder.append('\n'); } builder.append(" Tried:"); for (int i = start, count = callAdapterFactories.size(); i < count; i++) { builder.append("\n * ").append(callAdapterFactories.get(i).getClass().getName()); } throw new IllegalArgumentException(builder.toString()); }
callAdapterFactories.get(i).get(returnType, annotations, this)会得到一个CallAdapter,构建Retrofit的build方法中:
List<CallAdapter.Factory> callAdapterFactories = new ArrayList<>(this.callAdapterFactories); callAdapterFactories.addAll(platform.defaultCallAdapterFactories(callbackExecutor));
this.callAdapterFactories又是通过addCallAdapterFactory方法添加的元素:
public Builder addCallAdapterFactory(CallAdapter.Factory factory) { callAdapterFactories.add(Objects.requireNonNull(factory, "factory == null")); return this; }
所以callAdapterFactories中含有了RxJava2CallAdapterFactory,那么调用它的get方法就得到了一个CallAdapter:
@Override public @Nullable CallAdapter<?, ?> get( Type returnType, Annotation[] annotations, Retrofit retrofit) { //getRawType的作用:譬如List[]、List<>等形式的类型会得到List.class Class<?> rawType = getRawType(returnType); if (rawType == Completable.class) { // Completable is not parameterized (which is what the rest of this method deals with) so it // can only be created with a single configuration. return new RxJava2CallAdapter( Void.class, scheduler, isAsync, false, true, false, false, false, true); } boolean isFlowable = rawType == Flowable.class; boolean isSingle = rawType == Single.class; boolean isMaybe = rawType == Maybe.class; if (rawType != Observable.class && !isFlowable && !isSingle && !isMaybe) { return null; } boolean isResult = false; boolean isBody = false; Type responseType; //如果rawType不是前面的五种则必须是ParameterizedType(T<>这种泛型类型) if (!(returnType instanceof ParameterizedType)) { String name = isFlowable ? "Flowable" : isSingle ? "Single" : isMaybe ? "Maybe" : "Observable"; throw new IllegalStateException( name + " return type must be parameterized" + " as " + name + "<Foo> or " + name + "<? extends Foo>"); } //得到泛型参数中的上界(<? extends String>会得到String的Type,如果不是WildcardType这中<?>通配符的类型的则直接返回returnType) //前两个if-elseif直接按照Observable<Foo<Foo2>>这样理解,如果取到的上界类型是Response或Result则必须上界类型也是ParameterizedType,因为这两个类的定义里有泛型要求(Response<T>、Result<T>) Type observableType = getParameterUpperBound(0, (ParameterizedType) returnType); Class<?> rawObservableType = getRawType(observableType); if (rawObservableType == Response.class) { if (!(observableType instanceof ParameterizedType)) { throw new IllegalStateException( "Response must be parameterized" + " as Response<Foo> or Response<? extends Foo>"); } responseType = getParameterUpperBound(0, (ParameterizedType) observableType); } else if (rawObservableType == Result.class) { if (!(observableType instanceof ParameterizedType)) { throw new IllegalStateException( "Result must be parameterized" + " as Result<Foo> or Result<? extends Foo>"); } responseType = getParameterUpperBound(0, (ParameterizedType) observableType); isResult = true; } else { responseType = observableType; isBody = true; } return new RxJava2CallAdapter( responseType, scheduler, isAsync, isResult, isBody, isFlowable, isSingle, isMaybe, false); }
所以callAdapter.adapt方法实际上就是RxJava2CallAdapter的adapt方法:
@Override public Object adapt(Call<R> call) { Observable<Response<R>> responseObservable = isAsync ? new CallEnqueueObservable<>(call) : new CallExecuteObservable<>(call); Observable<?> observable; if (isResult) { observable = new ResultObservable<>(responseObservable); } else if (isBody) { observable = new BodyObservable<>(responseObservable); } else { observable = responseObservable; } if (scheduler != null) { observable = observable.subscribeOn(scheduler); } if (isFlowable) { return observable.toFlowable(BackpressureStrategy.LATEST); } if (isSingle) { return observable.singleOrError(); } if (isMaybe) { return observable.singleElement(); } if (isCompletable) { return observable.ignoreElements(); } return RxJavaPlugins.onAssembly(observable); }
所以为什么上面的MyService的方法返回为什么是Observable。
那现在RxObservable.INSTANCE.getObservable中的apiObservable就得到了,再贴一遍:
fun <T> getObservable(@NonNull apiObservable: Observable<T>, @NonNull lifecycle: LifecycleProvider<*>): Observable<T> { return apiObservable .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .compose<T>(lifecycle.bindToLifecycle()) }
subscribeOn(Schedulers.io())表示在非阻塞线程中进行网络请求,observeOn(AndroidSchedulers.mainThread())表示在主线程中处理回调,lifecycleProvider是Activity或Fragment的生命周期引用,compose将这个RxJava请求和生命周期绑定在一起实现其生命周期自动跟随组件生命周期一致,这个知识会再写一篇文章分析。
至此,MyServiceImpl.getData(req, lifecycleProvider)就走完了,接下来就是执行网络请求了。
-
调用过程
ViewModel中的subscribe方法是开始的起点,其参数HttpObserver是一个自定义的实现了Observer接口的类:
@SuppressWarnings("all") abstract class HttpObserver<T : BaseResponse, K : BaseRequest> : Observer<T> { private var mIViewModelService: IViewModelService? = null private var showErrorMessage = true private var showLoading = true private var manuelFinishLoading = false private var needSuccessResultMsg = true private var showSuccessMessage = false private var httpTaskId: RxApiManager.HttpTaskId private var request: K /** * 默认只有1个任务处理 */ private var finishCount = 1 /** * @param httpTaskId 注意:HttpTaskId 非同一请求不可使用相同ID * @param mIViewModelService IViewModelService */ constructor(request: K, httpTaskId: RxApiManager.HttpTaskId, mIViewModelService: IViewModelService?) { this.request = request this.mIViewModelService = mIViewModelService this.httpTaskId = httpTaskId LogUtils.e(this, "HttpObserver transactionId = "+request.transactionId) } /** * @param httpTaskId 注意:HttpTaskId 非同一请求不可使用相同ID * @param mIViewModelService IViewModelService */ constructor(request: K, httpTaskId: RxApiManager.HttpTaskId, mIViewModelService: IViewModelService?, showErrorMessage: Boolean = true) { this.mIViewModelService = mIViewModelService this.showErrorMessage = showErrorMessage this.httpTaskId = httpTaskId this.request = request } /** * @param httpTaskId 注意:HttpTaskId 非同一请求不可使用相同ID * @param mIViewModelService IViewModelService */ constructor(request: K, httpTaskId: RxApiManager.HttpTaskId, mIViewModelService: IViewModelService?, showErrorMessage: Boolean = true, showLoading: Boolean = true) { this.mIViewModelService = mIViewModelService this.showErrorMessage = showErrorMessage this.showLoading = showLoading this.httpTaskId = httpTaskId this.request = request } constructor(request: K, httpTaskId: RxApiManager.HttpTaskId, mIViewModelService: IViewModelService?, showErrorMessage: Boolean = true, showLoading: Boolean = true, showSuccessMessage: Boolean = false) { this.mIViewModelService = mIViewModelService this.showErrorMessage = showErrorMessage this.showSuccessMessage = showSuccessMessage this.showLoading = showLoading this.httpTaskId = httpTaskId this.request = request } /** * 手动调用结束loading * * @param manuelFinishLoading */ fun setManuelFinishLoading(manuelFinishLoading: Boolean): HttpObserver<T, K> { this.manuelFinishLoading = manuelFinishLoading return this } /** * 多个任务情况下,根据任务数量结束loading * * @param finishCount * @return */ fun setFinishCount(finishCount: Int): HttpObserver<T, K> { this.finishCount = finishCount return this } /** * 设置是否显示loading * * @param isShowLoading * @return */ fun setShowLoading(isShowLoading: Boolean): HttpObserver<T, K> { this.showLoading = isShowLoading return this } /** * 是否显示错误信息 * * @param showErrorMessage * @return */ fun setShowErrorMsg(showErrorMessage: Boolean): HttpObserver<T, K> { this.showErrorMessage = showErrorMessage return this } /** * 是否默认设置"成功" */ fun setNeedSuccessResultMsg(needSuccessResultMsg: Boolean): HttpObserver<T, K> { this.needSuccessResultMsg = needSuccessResultMsg return this } override fun onComplete() { RxApiManager.get().cancel(this.httpTaskId) if (!manuelFinishLoading && showLoading && finishCount == 1) { mIViewModelService?.finishLoading() } } override fun onSubscribe(d: Disposable) { RxApiManager.get().add(httpTaskId, d) if (showLoading) { mIViewModelService?.preLoading() } } /** * 根据具体的Api 业务逻辑去重写 onSuccess 方法! * http==0&&code==0 回调 * 必须实现 * * @param response response */ abstract fun onSuccess(response: T) /** * 选择性重写 * 如果只需要code跟msg重写此方法 */ open fun onFailure(response: BaseResponse) { } /** * 选择性重写 * 如果需要用到code、msg、data重写此方法 */ open fun onServerFailure(response: T) { onError(ServerException(response.resultCode, response.resultMsg)) } /** * Gio埋点处理 */ open fun onGioHttpErrorInfoCollect(response: T){ //两个都为null的时候不埋 if(response.resultCode.isEmpty() && response.resultMsg.isEmpty()){ return } val gioHttpErrorInfo = GioHttpErrorInfo(response.resultCode, response.resultMsg, response.transactionId) CollectDataUtil.setGioEvent(CollectDataUtil.GioEventKey.ERRORMESSAGE, gioHttpErrorInfo) } /** * 处理公共逻辑 * 包含显示公共弹框方法和逻辑 */ open fun onServerHandle(response: T) { val hasPublicDialog = response.publicDialogResponse != null && !response.publicDialogResponse?.content.isNullOrEmpty() val hasPublicDialogs = response.publicDialogResponses?.dialogs != null if (hasPublicDialog || hasPublicDialogs) { //显示公共弹框使用dialog和对应的值 val publicDialogBean = PublicDialogBean() .setPublicDialogResponse(response.publicDialogResponse) .setPublicDialogResponses(response.publicDialogResponses) .setHttpObserverPublicDialogListener(HttpObserverPublicDialogListener { _: PublicDialogBean, _: T, _: Byte -> }) ARouterManager.startDialogActivityPublic(response, publicDialogBean) //使用透明activity弹对话框(暂时使用延时跳转,防止跳转页面时候被覆盖) // Handler().postDelayed({ // ARouterManager.startDialogActivityPublic(response, publicDialogBean) // }, 500) // //使用普通dialog模式 // mIViewModelService?.showPublicDialog(publicDialogBean, response, HttpObserverPublicDialogListener { // publicDialogBean : PublicDialogBean, response : T , clickType : Byte-> // mIViewModelService?.publicDialogExcute(publicDialogBean, response, clickType) // }) } } override fun onNext(t: T) { if (finishCount > 1) { finishCount-- } when { HttpErrorCode.SUCCESS_CODE == t.resultCode -> { CacheManager.saveResponse(httpTaskId, t) if (TextUtils.isEmpty(t.resultMsg)) { //成功情况若后台没有msg且自定义设置默认,则默认"成功" if (needSuccessResultMsg) { t.resultMsg = BaseApp.getBaseApplication().getString(R.string.success) } } if (showSuccessMessage) { if (TextUtils.isEmpty(t.resultMsg)) { t.resultMsg = BaseApp.getBaseApplication().getString(R.string.success) } mIViewModelService?.showSuccessMessage(t.resultMsg) } onSuccess(t) onServerHandle(t) } HttpErrorCode.SESSION_OUT_CODE == t.resultCode -> { //session验证不通过,会报会话超时,客户端与服务重新握手,获取session,并删除本地用户信息 AppDataManager.mAppSpService.cleanPublicDialog() AppDataManager.mAppSpService.setLogout() JPushLocalUtil.instance.cleanAliasAndTag() AppDataManager.mAppSpService.appSessionId = "" if (mIViewModelService != null) { if (BuildConfig.DEBUG && TextUtils.isEmpty(t.resultMsg)) { t.resultMsg = BaseApp.getAppString(R.string.http_session_error) } //GIO Http错误信息埋点 onGioHttpErrorInfoCollect(t) mIViewModelService?.onFailure(true, t.resultMsg) } getTimeOut() } else -> { //GIO Http错误信息埋点 onGioHttpErrorInfoCollect(t) if (BuildConfig.DEBUG && TextUtils.isEmpty(t.resultMsg)) { //服务器报错时如果没有错误信息,则默认未知错误 t.resultMsg = BaseApp.getBaseApplication().getString(R.string.http_un_known_error) } onServerFailure(t) onServerHandle(t) } } if (!manuelFinishLoading && showLoading && finishCount == 1) { mIViewModelService?.finishLoading() } } override fun onError(t: Throwable) { LogUtils.e(this, t) var code = HttpErrorCode.CONNECT_ERROR_CODE var errorMessage = "" if (t is NetErrorException) { errorMessage = t.errorMessage } else if (t is HttpException) { errorMessage = t.message ?: "" } else if (t is JsonParseException || t is JSONException || t is ParseException || t is MalformedJsonException) { //解析数据错误 errorMessage = BaseApp.getBaseApplication().getString(R.string.http_analytic_data_error) } else if (t is SocketTimeoutException) { errorMessage = BaseApp.getBaseApplication().getString(R.string.http_service_time_out_error) } else if (t is ConnectException) { errorMessage = BaseApp.getBaseApplication().getString(R.string.http_connect_error) } else if (t is UnknownHostException) { errorMessage = BaseApp.getBaseApplication().getString(R.string.http_analytic_server_error) } else if (t is UnknownServiceException) { errorMessage = BaseApp.getBaseApplication().getString(R.string.http_un_known_error) } else if (t is ServerException) { //服务器非0 code = t.code errorMessage = t.message ?: "" } if (TextUtils.isEmpty(errorMessage)) { errorMessage = BaseApp.getBaseApplication().getString(R.string.http_un_known_error) } //showLoading为true时,防止静默加载接口finishLoading,影响其他同时调用的线程 if (!manuelFinishLoading && showLoading) { mIViewModelService?.finishLoading() } mIViewModelService?.onFailure(showErrorMessage, errorMessage) val baseResponse = BaseResponse() baseResponse.resultCode = code baseResponse.resultMsg = errorMessage onFailure(baseResponse) } /** * session验证不通过,会报会话超时,客户端与服务重新握手,获取session * 完成后跳转首页 */ private fun getTimeOut() { LibApi().getTimeout(TimeoutRequest()) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(object : Observer<TimeoutResponse> { override fun onSubscribe(d: Disposable) { RxApiManager.get().add(RxApiManager.HttpTaskId.TASK_ID_GET_TIME_OUT, d) } override fun onNext(timeoutResponse: TimeoutResponse) { if (HttpErrorCode.SUCCESS_CODE == timeoutResponse.resultCode && timeoutResponse.data != null) { AppDataManager.mAppSpService.appSessionId = timeoutResponse.data?.appSessionId NbsUtil.setNbsUserIdentifier() } } override fun onError(e: Throwable) { RxApiManager.get().cancel(RxApiManager.HttpTaskId.TASK_ID_GET_TIME_OUT) } override fun onComplete() { mIViewModelService?.finishLoading() if ("MainActivity" != ActivityStackManager.getManager().lastActivityClassName?.simpleName) { ARouterManager.startToMainActivity(AppConstant.MAIN_TAB_INDEX_HOME) } RxApiManager.get().cancel(RxApiManager.HttpTaskId.TASK_ID_GET_TIME_OUT) } }) } }
这就是观察者,通过subscribe方法和前面得到的的Observable绑定在一起。根据RxJava的设计原理,subscribe最终就是调用被观察者的subscribeActual方法,这里的被观察者要回到RxJava2CallAdapter的adapt方法中找,根据RxJava2CallAdapterFactory的get方法得知我们这里的返回类型的rawType不是Flowable、Maybe、Single、Completable,其第一个泛型类型的rawType也不是Response和Result,所以isBody是true,所以adapt返回的是BodyObservable,它持有了一个Observable<Response<R>>对象,因为我们是通过RxJava2CallAdapterFactory.create()创建的而不是RxJava2CallAdapterFactory.createAsync(),所以isAsync是false,所以这里的Observable<Response<R>>是CallExecuteObservable,它的subscribeActual如下:
@Override protected void subscribeActual(Observer<? super Response<T>> observer) { // Since Call is a one-shot type, clone it for each new observer. Call<T> call = originalCall.clone(); CallDisposable disposable = new CallDisposable(call); observer.onSubscribe(disposable); if (disposable.isDisposed()) { return; } boolean terminated = false; try { Response<T> response = call.execute(); if (!disposable.isDisposed()) { observer.onNext(response); } if (!disposable.isDisposed()) { terminated = true; observer.onComplete(); } } catch (Throwable t) { Exceptions.throwIfFatal(t); if (terminated) { RxJavaPlugins.onError(t); } else if (!disposable.isDisposed()) { try { observer.onError(t); } catch (Throwable inner) { Exceptions.throwIfFatal(inner); RxJavaPlugins.onError(new CompositeException(t, inner)); } } } }
CallDisposable是:
private static final class CallDisposable implements Disposable { private final Call<?> call; private volatile boolean disposed; CallDisposable(Call<?> call) { this.call = call; } @Override public void dispose() { disposed = true; call.cancel(); } @Override public boolean isDisposed() { return disposed; } }
observer.onSubscribe就是HttpObserver的onSubscribe:
override fun onSubscribe(d: Disposable) { RxApiManager.get().add(httpTaskId, d) if (showLoading) { mIViewModelService?.preLoading() } }
RxApiManager保存此请求的taskId,如果需要的话弹出加载框。
RxApiManager是用来管理taskId的:
public class RxApiManager { private static RxApiManager sInstance = null; private ArrayMap<HttpTaskId, Disposable> maps = new ArrayMap<>(); private ArrayMap<LogicTaskId, Disposable> logicMaps = new ArrayMap<>(); public static RxApiManager get() { if (sInstance == null) { synchronized (RxApiManager.class) { if (sInstance == null) { sInstance = new RxApiManager(); } } } return sInstance; } public boolean hasHttpTaskById(HttpTaskId tag) { if (tag == null) { return false; } if (maps.containsKey(tag)) { return true; } return false; } public void add(HttpTaskId tag, Disposable disposable) { if (tag == null) { return; } if (maps.containsKey(tag)) { Disposable mapDisposable = maps.get(tag); if (mapDisposable != null && !mapDisposable.isDisposed()) { mapDisposable.dispose(); maps.remove(tag); } } maps.put(tag, disposable); } public void add(LogicTaskId tag, Disposable disposable) { if (tag == null) { return; } if (logicMaps.containsKey(tag)) { Disposable mapDisposable = logicMaps.get(tag); if (mapDisposable != null && !mapDisposable.isDisposed()) { mapDisposable.dispose(); logicMaps.remove(tag); } } logicMaps.put(tag, disposable); } public void remove(HttpTaskId tag) { if (!maps.isEmpty()) { maps.remove(tag); } } public void removeAll() { maps.clear(); logicMaps.clear(); } public void cancel(HttpTaskId tag) { if (tag == null) { return; } if (maps.isEmpty()) { return; } Disposable disposable = maps.get(tag); if (disposable == null) { return; } if (!disposable.isDisposed()) { disposable.dispose(); maps.remove(tag); } } public void cancel(LogicTaskId tag) { if (tag == null) { return; } if (logicMaps.isEmpty()) { return; } Disposable disposable = logicMaps.get(tag); if (disposable == null) { return; } if (!disposable.isDisposed()) { disposable.dispose(); logicMaps.remove(tag); } } public void cancel(HttpTaskId... tags) { HttpTaskId[] taskIds = tags; if (tags == null) { return; } if (maps.isEmpty()) { return; } for (HttpTaskId tag : taskIds) { Disposable disposable = maps.get(tag); if (disposable == null) { continue; } if (!disposable.isDisposed()) { disposable.dispose(); maps.remove(tag); } } } public void cancelAll() { if (maps.isEmpty()) { return; } Set<HttpTaskId> keys = maps.keySet(); for (HttpTaskId apiKey : keys) { Disposable disposable = maps.get(apiKey); if (disposable == null) { continue; } if (!disposable.isDisposed()) { disposable.dispose(); } } maps.clear(); } public void cancelLogicAll() { if (logicMaps.isEmpty()) { return; } Set<LogicTaskId> keys = logicMaps.keySet(); for (LogicTaskId apiKey : keys) { Disposable disposable = logicMaps.get(apiKey); if (disposable == null) { continue; } if (!disposable.isDisposed()) { disposable.dispose(); } } logicMaps.clear(); } }
IViewModelService是构造HttpObserver的时候传入的,因为Activity和Fragment的自定义基类实现了这个接口,所有的Activity和Fragment都继承自基类,所以mIViewModelService就是Activity或者Fragment的实例,这个接口提供网络请求过程中伴随的显示/隐藏加载框和提示信息的功能:
public interface IViewModelService { /*** 预加载*/ void preLoading(); /** * 完成加载 */ void finishLoading(); /** * 加载失败 * @param showErrorMessage 是否显示错误信息 * @param errorMessage 错误信息 */ void onFailure(boolean showErrorMessage, String errorMessage); /** * 成功msg * * @param message message */ void showSuccessMessage(String message); /** * 显示紧急公共弹框 * @param publicDialogBean PublicDialogBean * @param listener HttpObserverPublicDialogListener */ <T extends BaseResponse> void showPublicDialog(PublicDialogBean publicDialogBean, BaseResponse response, HttpObserverPublicDialogListener<T> listener); /** * 公共弹框点击处理回调 * @param publicDialogBean * @param response * @param clickType */ void publicDialogExcute(PublicDialogBean publicDialogBean, BaseResponse response, byte clickType); }
接下来调用call.execute(),call就是HttpServiceMethod的invoke中构造的OkHttpCall,就调用到Retrofit的部分了。
call.execute()返回response之后调用observer.onNext(response)方法,这个方法中会根据resultCode执行不同操作,注意onSuccess()就是交给HttpObserver的匿名类对象覆写的onSuccess,这个方法是abstract方法所以必须进行覆写,再然后就是我们前面说的在onSuccess中通过MutableLiveData的postValue方法通知界面组件更新UI。
还需要注意的是onNext结束之后会调用observer.onComplete()方法,这里的onComplete中会通过RxApiManager把当前这个CallDisposable进行dispose操作,dispose中调用call.cancel()把socket给close()掉。
如果出现异常会走到catch块中调用observer.onError()进行错误情况处理。
-
关于线程间通信
目前为止,我们知道了call和回调是怎么一起工作的,但是把call放进子线程,回调回到主线程的操作在哪里产生影响的呢?
还记得RxObservable.INSTANCE.getObservable吗?这个方法返回的Observable才是要执行subscribe方法的,所以这个方法里设置的就是线程间切换的关键:
fun <T> getObservable(@NonNull apiObservable: Observable<T>, @NonNull lifecycle: LifecycleProvider<*>): Observable<T> { return apiObservable .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .compose<T>(lifecycle.bindToLifecycle()) }
通过源码,我们得到这块代码的调用链是:
CallExecuteObservable->ObservableSubscribeOn->ObservableObserveOn->ObservableFromUnsafeSource
返回的就是ObservableFromUnsafeSource,也就是入口是调用它的subscribeActual方法:
@Override protected void subscribeActual(Observer<? super T> observer) { source.subscribe(observer); }
这里的source是lifecycle.bindToLifecycle()返回的对象,这里牵扯到和UI组件生命周期绑定防止内存泄露的问题,由于较复杂会再写一篇文章分析它,这里先不研究,现在只需要知道返回的对象是ObservableTakeUntil,所以这里就是调用它的subscribeActual方法:
@Override public void subscribeActual(Observer<? super T> child) { TakeUntilMainObserver<T, U> parent = new TakeUntilMainObserver<T, U>(child); child.onSubscribe(parent); other.subscribe(parent.otherObserver); source.subscribe(parent); }
other.subscribe(parent.otherObserver)是关于验证生命周期的功能,source.subscribe(parent)就是我们现在要讲的开始的地方。
这个source就是compose的调用者ObservableObserveOn,它的的subscribeActual方法:
@Override protected void subscribeActual(Observer<? super T> observer) { if (scheduler instanceof TrampolineScheduler) { source.subscribe(observer); } else { Scheduler.Worker w = scheduler.createWorker(); source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); } }
这里创建了一个ObserveOnObserver对象,把上面的HttpObserver添加进去,还有scheduler.createWorker()创建的一个Worker,scheduler是AndroidSchedulers.mainThread(),也就是:
private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler( new Callable<Scheduler>() { @Override public Scheduler call() throws Exception { return MainHolder.DEFAULT; } });
继续往上返,到了ObservableSubscribeOn的subscribeActual:
@Override public void subscribeActual(final Observer<? super T> observer) { final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer); observer.onSubscribe(parent); parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); }
沿着observer.onSubscribe(parent)往下追溯会走到HttpObserver的onSubscribe方法里,也就是显示加载框等逻辑的地方,此时真正的网络请求还没开始,还是在主线程中操作,然后执行scheduler.scheduleDirect(new SubscribeTask(parent)),scheduler也就是Schedulers.io():
@NonNull public static Scheduler io() { return RxJavaPlugins.onIoScheduler(IO); }
IO = RxJavaPlugins.initIoScheduler(new IOTask());
static final class IOTask implements Callable<Scheduler> { @Override public Scheduler call() throws Exception { return IoHolder.DEFAULT; } }
initIoScheduler:
@NonNull public static Scheduler initIoScheduler(@NonNull Callable<Scheduler> defaultScheduler) { ObjectHelper.requireNonNull(defaultScheduler, "Scheduler Callable can't be null"); Function<? super Callable<Scheduler>, ? extends Scheduler> f = onInitIoHandler; if (f == null) { return callRequireNonNull(defaultScheduler); } return applyRequireNonNull(f, defaultScheduler); }
callRequireNonNull:
@NonNull static Scheduler callRequireNonNull(@NonNull Callable<Scheduler> s) { try { return ObjectHelper.requireNonNull(s.call(), "Scheduler Callable result can't be null"); } catch (Throwable ex) { throw ExceptionHelper.wrapOrThrow(ex); } }
s.call也就是得到了IoHolder.DEFAULT:
static final class IoHolder { static final Scheduler DEFAULT = new IoScheduler(); }
scheduleDirect的调用:
@NonNull public Disposable scheduleDirect(@NonNull Runnable run) { return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS); }
@NonNull public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { final Worker w = createWorker(); final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); DisposeTask task = new DisposeTask(decoratedRun, w); w.schedule(task, delay, unit); return task; }
可以看到Worker通过createWorker()创建,也就是前面的IoScheduler的createWorker方法:
@NonNull @Override public Worker createWorker() { return new EventLoopWorker(pool.get()); }
接下来执行schedule方法就是EventLoopWorker的schedule方法:
@NonNull @Override public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) { if (tasks.isDisposed()) { // don't schedule, we are unsubscribed return EmptyDisposable.INSTANCE; } return threadWorker.scheduleActual(action, delayTime, unit, tasks); }
threadWorker是EventLoopWorker构造方法中pool.get()所得,pool又是createWorker构造时传入的pool.get()所得,createWorker中的pool又是:
public IoScheduler(ThreadFactory threadFactory) { this.threadFactory = threadFactory; this.pool = new AtomicReference<CachedWorkerPool>(NONE); start(); }
调用get()得到NONE,NONE是:
NONE = new CachedWorkerPool(0, null, WORKER_THREAD_FACTORY); NONE.shutdown();
所以CachedWorkerPool的get是:
ThreadWorker get() { if (allWorkers.isDisposed()) { return SHUTDOWN_THREAD_WORKER; } while (!expiringWorkerQueue.isEmpty()) { ThreadWorker threadWorker = expiringWorkerQueue.poll(); if (threadWorker != null) { return threadWorker; } } // No cached worker found, so create a new one. ThreadWorker w = new ThreadWorker(threadFactory); allWorkers.add(w); return w; }
所以threadWorker就是ThreadWorker实例,那它的scheduleActual方法是:
@NonNull public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) { Runnable decoratedRun = RxJavaPlugins.onSchedule(run); ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent); if (parent != null) { if (!parent.add(sr)) { return sr; } } Future<?> f; try { //没有延迟执行,直接submit if (delayTime <= 0) { f = executor.submit((Callable<Object>)sr); } else { f = executor.schedule((Callable<Object>)sr, delayTime, unit); } sr.setFuture(f); } catch (RejectedExecutionException ex) { if (parent != null) { parent.remove(sr); } RxJavaPlugins.onError(ex); } return sr; }
executor通过SchedulerPoolFactory.create(threadFactory)创建,threadFactory不难找到:
WORKER_THREAD_FACTORY = new RxThreadFactory(WORKER_THREAD_NAME_PREFIX, priority)
SchedulerPoolFactory.create:
public static ScheduledExecutorService create(ThreadFactory factory) { final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory); tryPutIntoPool(PURGE_ENABLED, exec); return exec; }
看到了我们熟悉的Executors.newScheduledThreadPool,所以executor是ScheduledThreadPoolExecutor,其实再往下是Executors封装线程池的部分了,我们这里简单的过一下,找到启动线程的地方,它的submit方法:
public <T> Future<T> submit(Callable<T> task) { return schedule(task, 0, NANOSECONDS); }
最终都会进入schedule方法:
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { if (callable == null || unit == null) throw new NullPointerException(); RunnableScheduledFuture<V> t = decorateTask(callable, new ScheduledFutureTask<V>(callable, triggerTime(delay, unit), sequencer.getAndIncrement())); delayedExecute(t); return t; }
private void delayedExecute(RunnableScheduledFuture<?> task) { if (isShutdown()) reject(task); else { super.getQueue().add(task); if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else ensurePrestart(); } }
这里的ensurePrestart会调用其父类的方法:
void ensurePrestart() { int wc = workerCountOf(ctl.get()); if (wc < corePoolSize) addWorker(null, true); else if (wc == 0) addWorker(null, false); }
addWorker代码:
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
t是w.thread,w是new Worker(firstTask),w.thread是在Worker的构造方法里创建的:
Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); }
前面已知threadFactory是RxThreadFactory:
@Override public Thread newThread(Runnable r) { StringBuilder nameBuilder = new StringBuilder(prefix).append('-').append(incrementAndGet()); // if (CREATE_TRACE) { // nameBuilder.append("\r\n"); // for (StackTraceElement se :Thread.currentThread().getStackTrace()) { // String s = se.toString(); // if (s.contains("sun.reflect.")) { // continue; // } // if (s.contains("junit.runners.")) { // continue; // } // if (s.contains("org.gradle.internal.")) { // continue; // } // if (s.contains("java.util.concurrent.ThreadPoolExecutor")) { // continue; // } // nameBuilder.append(s).append("\r\n"); // } // } String name = nameBuilder.toString(); Thread t = nonBlocking ? new RxCustomThread(r, name) : new Thread(r, name); t.setPriority(priority); t.setDaemon(true); return t; }
返回了一个RxCustomThread,设置daemon为true(守护线程,后台一直运行)。
t.start就是RxCustomThread的start:
public synchronized void start() { /** * This method is not invoked for the main method thread or "system" * group threads created/set up by the VM. Any new functionality added * to this method in the future may have to also be added to the VM. * * A zero status value corresponds to state "NEW". */ // Android-changed: throw if 'started' is true if (threadStatus != 0 || started) throw new IllegalThreadStateException(); /* Notify the group that this thread is about to be started * so that it can be added to the group's list of threads * and the group's unstarted count can be decremented. */ group.add(this); started = false; try { nativeCreate(this, stackSize, daemon); started = true; } finally { try { if (!started) { group.threadStartFailed(this); } } catch (Throwable ignore) { /* do nothing. If start0 threw a Throwable then it will be passed up the call stack */ } } }
调用nativeCreate方法开启线程,这样一来就会运行Runnable对象的run方法,最终的Runnable对象是ObservableSubscribeOn的subscribeActual中scheduler.scheduleDirect(new SubscribeTask(parent))传入的SubscribeTask对象:
final class SubscribeTask implements Runnable { private final SubscribeOnObserver<T> parent; SubscribeTask(SubscribeOnObserver<T> parent) { this.parent = parent; } @Override public void run() { source.subscribe(parent); } }
source.subscribe(parent)!source就是最顶层的CallExecuteObservable,调用它的subscribrActual(),这就和我们前面对起来了,然后在CallExecuteObservable的subscribrActual中调用的onNext、onComplete和onError等方法就会沿着downstream一步步下放,在ObservableObserveOn中的ObserveOnObserver中的schedule方法中:
void schedule() { if (getAndIncrement() == 0) { worker.schedule(this); } }
前面说过这个worker是通过主线程创建的,和前面的Schedulers.io()一样的原理,只不过这个Worker是运行在主线程中的,schedule参数传的是this,因为ObservableObserveOn实现了Runnable接口:
@Override public void run() { if (outputFused) { drainFused(); } else { drainNormal(); } }
因为不满足条件所以没调用requestFusion,outputFused会一直是false,所以一直会执行drainNormal:
void drainNormal() { int missed = 1; final SimpleQueue<T> q = queue; final Observer<? super T> a = downstream; for (;;) { if (checkTerminated(done, q.isEmpty(), a)) { return; } for (;;) { boolean d = done; T v; try { v = q.poll(); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); disposed = true; upstream.dispose(); q.clear(); a.onError(ex); worker.dispose(); return; } boolean empty = v == null; if (checkTerminated(d, empty, a)) { return; } if (empty) { break; } a.onNext(v); } missed = addAndGet(-missed); if (missed == 0) { break; } } }
至此,整个子线程请求主线程回调的大概过程就完成了。
网友评论