美文网首页
RxJava和Retrofit2的统一处理单个请求

RxJava和Retrofit2的统一处理单个请求

作者: 林祖朋 | 来源:发表于2018-03-09 11:44 被阅读185次

前言
RxJava和Retrofit2用了一段时间了,写个小例子,分享出来,有什么不对的地方还请大神在评论区指正。

发现问题
最近在帮兄弟公司做一个资讯类的项目,使用了RxJava和Retrofit2这对黄金组合,在编写代码的过程中发现有很多很多的网络请求都需要做.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).onErrorReturn()的处理,为避免这样,需要沉思。
解决问题

import android.util.Log;

import com.wei.caiqiwang.data.entity.BaseResponse;

import rx.Observable;
import rx.Subscriber;
import rx.Subscription;
import rx.android.schedulers.AndroidSchedulers;
import rx.functions.Func1;
import rx.schedulers.Schedulers;

public class RxNet {
    /**
     * 统一处理单个请求
     */
    public static <T> Subscription request(Observable<BaseResponse<T>> observable, final RxNetCallBack<T> callBack) {
        return observable
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .onErrorReturn(new Func1<Throwable, BaseResponse<T>>() {
                    @Override
                    public BaseResponse<T> call(Throwable throwable) {
                        Log.v("LinNetError",throwable.getMessage());
                        callBack.onFailure(ExceptionHandle.handleException(throwable));
                        return null;
                    }
                })
                .subscribe(new Subscriber<BaseResponse<T>>() {
                    @Override
                    public void onCompleted() {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onNext(BaseResponse<T> baseResponse) {
                        if (baseResponse.getCode().equals("200")) {
                            callBack.onSuccess(baseResponse.getData());
                        } else {
                            callBack.onFailure(baseResponse.getMsg());
                        }
                    }
                });

    }

    /**
     * 统一处理单个请求没有 msg body
     */
    public static Subscription requestWithoutBody(Observable<BaseResponse> observable, final RxNetCallBack<String> callBack) {
        return observable
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .onErrorReturn(new Func1<Throwable, BaseResponse>() {
                    @Override
                    public BaseResponse call(Throwable throwable) {
                        callBack.onFailure(ExceptionHandle.handleException(throwable));
                        return null;
                    }
                })
                .subscribe(new Subscriber<BaseResponse>() {
                    @Override
                    public void onCompleted() {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onNext(BaseResponse baseResponse) {
                        if (baseResponse.getCode().equals("200")) {
                            callBack.onSuccess(baseResponse.getMsg());
                        } else {
                            callBack.onFailure(baseResponse.getMsg());
                        }
                    }
                });

    }
}

回调就是普通的回调

public interface RxNetCallBack<T> {
    /**
     * 数据请求成功
     *
     * @param data 请求到的数据
     */
    void onSuccess(T data);

    /**
     * 数据请求失败
     */
    void onFailure(String msg);
}

错误异常处理(可能不全):

import android.net.ParseException;

import com.google.gson.JsonParseException;


import org.apache.http.conn.ConnectTimeoutException;
import org.json.JSONException;

import java.net.ConnectException;

import retrofit2.HttpException;

public class ExceptionHandle {

    private static final int UNAUTHORIZED = 401;
    private static final int FORBIDDEN = 403;
    private static final int NOT_FOUND = 404;
    private static final int REQUEST_TIMEOUT = 408;
    private static final int INTERNAL_SERVER_ERROR = 500;
    private static final int BAD_GATEWAY = 502;
    private static final int SERVICE_UNAVAILABLE = 503;
    private static final int GATEWAY_TIMEOUT = 504;

    public static String handleException(Throwable e) {
        String errorMsg;
        if (e instanceof HttpException) {
            HttpException httpException = (HttpException) e;
            switch (httpException.code()) {
                case UNAUTHORIZED:
                case FORBIDDEN:
                case NOT_FOUND:
                case REQUEST_TIMEOUT:
                case GATEWAY_TIMEOUT:
                case INTERNAL_SERVER_ERROR:
                case BAD_GATEWAY:
                case SERVICE_UNAVAILABLE:
                default:
                    errorMsg = "网络错误";
                    break;
            }
            return errorMsg + ":" + httpException.code();
        } else if (e instanceof JsonParseException || e instanceof JSONException || e instanceof ParseException) {
            return "解析错误";
        } else if (e instanceof ConnectException) {
            return "连接失败";
        } else if (e instanceof javax.net.ssl.SSLHandshakeException) {
            return "证书验证失败";
        } else if (e instanceof ConnectTimeoutException) {
            return "连接超时";
        } else if (e instanceof java.net.SocketTimeoutException) {
            return "连接超时";
        } else {
            return "未知错误";
        }
    }
}

然后就是ApiManager:

import android.util.Log;


import com.wei.demo.data.AppConstants;

import java.util.concurrent.TimeUnit;

import okhttp3.OkHttpClient;
import okhttp3.logging.HttpLoggingInterceptor;
import retrofit2.Retrofit;
import retrofit2.adapter.rxjava.RxJavaCallAdapterFactory;
import retrofit2.converter.gson.GsonConverterFactory;

public class ApiManager {

    private Retrofit client;

    private ApiManager() {
        client = new Retrofit.Builder()
                .baseUrl(AppConstants.Base_Url_Api_Test)
                .client(initClient())
                .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
                .addConverterFactory(GsonConverterFactory.create())
                .build();
    }

    private static volatile DemoApi INSTANCE;

    public static DemoApi getInstance() {
        if (INSTANCE == null) {
            synchronized (ApiManager.class) {
                if (INSTANCE == null) {
                    INSTANCE = new ApiManager().getApi();
                }
            }
        }
        return INSTANCE;
    }

    private DemoApi getApi() {
        return client.create(DemoApi.class);
    }

    private static OkHttpClient initClient() {
        OkHttpClient.Builder builder = new OkHttpClient.Builder();
        //声明日志类
        HttpLoggingInterceptor httpLoggingInterceptor = new HttpLoggingInterceptor(new HttpLoggingInterceptor.Logger() {
            @Override
            public void log(String message) {
                Log.v("NetLog", message);
            }
        });
        //设定日志级别
        httpLoggingInterceptor.setLevel(HttpLoggingInterceptor.Level.BODY);
        //延时
        builder.addInterceptor(httpLoggingInterceptor)
                .connectTimeout(10, TimeUnit.SECONDS)
                .readTimeout(10, TimeUnit.SECONDS)
                .writeTimeout(10, TimeUnit.SECONDS);
        return builder.build();
    }
}

怎么用?

 RxNet.request(ApiManager.getInstance().getUserMsg(map), new RxNetCallBack<List<MsgBean>>() {
            @Override
            public void onSuccess(List<MsgBean> data) {
              // 处理数据
            }

            @Override
            public void onFailure(String msg) {
                //出现了错误
                showToast(msg);
              
            }
        });

Demo https://github.com/FriendLin/NetRequestDemo

相关文章

网友评论

      本文标题:RxJava和Retrofit2的统一处理单个请求

      本文链接:https://www.haomeiwen.com/subject/rbaefftx.html