美文网首页 移动 前端 Python Android Java
Rxjava2 + Retrofit +DBflow + 自定义

Rxjava2 + Retrofit +DBflow + 自定义

作者: 梅林骑士 | 来源:发表于2017-08-30 23:29 被阅读49次

最喜欢干的事,莫过于拿着工资搭框架了。
其实这个框架已经出来很久了,并不是什么新鲜玩意儿了,只不过我一直没有尝试着去写一篇内容比较大的文章来分享,这次就卖弄一下,希望各种大神轻喷,有什么问题也希望各位大神不吝赐教。

Retrofit的接入

ApiService

首先Retrofit的框架架构搭建其实比较简单,因为Retrofit本身已经极致简单了。

/**
 * Author       : yizhihao (Merlin)
 * Create time  : 2017-08-23 15:48
 * contact      :
 * 562536056@qq.com || yizhihao.hut@gmail.com
 */
public interface ApiService {

    @GET("{url}")
    Observable<ResponseBody> executeGet(
            @Path("url") String url,
            @QueryMap Map<String, String> maps);


    @POST("{url}")
    Observable<ResponseBody> executePost(
            @Path("url") String url,
            @QueryMap Map<String, String> maps);

    @POST("{url}")
    Observable<ResponseBody> executeCachePost(
            @Path("url") String url,
            @QueryMap Map<String, String> maps);

    @POST("{url}")
    Observable<ResponseBody> uploadFiles(
            @Path("url") String url,
            @Path("headers") Map<String, String> headers,
            @Part("filename") String description,
            @PartMap()  Map<String, RequestBody> maps);

    @Streaming
    @GET
    Observable<ResponseBody> downloadFile(@Url String fileUrl);
}

上面的代码通过将接口返回类型通用化返回结合rxjava的Observable这样我们就可以愉快的用rxjava来处理线程切换了。

Retrofit接口对象
public static Retrofit retrofit() {
        return retrofit(sBaseUrl);
    }

    public static Retrofit retrofit(String baseUrl) {
        return new Retrofit.Builder()
                .baseUrl(baseUrl)
                .client(getInstance().getHttpClient())//添加自定义OkHttpClient
                .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
                .addConverterFactory(GsonConverterFactory.create(GsonUtils.getInstance().getGson()))
                .build();
    }

    public OkHttpClient getHttpClient() {
        if (client == null) {
            client = new OkHttpClient.Builder()
                 //.addNetworkInterceptor(newCacheNetworkInterceptor())
                    //日志,可以配置 level 为 BASIC / HEADERS / BODY
                    .addInterceptor(new LoggingInterceptor()) 
                    .connectTimeout(DEFAULT_TIME_OUT, TimeUnit.SECONDS)
                    //.cache(provideCache())
                    .retryOnConnectionFailure(true)
                    .build();
        }
        return client;
    }

Retrofit管理类主要是整合okhttp进行必要的配置

缓存拦截器

细心的读者可能发现了CacheNetworkInterceptor这个注释的拦截器,它的职责本来要添加的NetworkInterceptor是为了做缓存Hook的。
但是查阅了一些资料,还有okhttp源码,其实okhttp本身是自带缓存逻辑的,这套逻辑完全遵守RFC协议进行缓存控制的。很多人都去hook掉了这步。其实查阅源码可以看到


image.png

从源码中不难看出,我们在自定义cache的时候,okhttp会把自己的internalCache给废弃掉,而我们在okhttp的内部拦截器中也会看到CacheInterceptor,这个类其实就是实现了okhttp的Cache-control。所以我并没有选择去拦截Response手动添加Cache-control进行缓存处理。当然大家要用我也拦不住,毕竟也挺方便的。

日志拦截器

LoggingInterceptor拦截器主要是为了打印请求发送和收到请求的Log.
·public class LoggingInterceptor implements Interceptor {

private boolean debugMode = DebugConstant.isDebug;

@Override
public Response intercept(Chain chain) throws IOException {
    if(!debugMode){
        return chain.proceed(chain.request());
    }
    //这个chain里面包含了request和response,所以你要什么都可以从这里拿
    Request request = chain.request();

    long t1 = System.nanoTime();//请求发起的时间
    LogUtils.e(String.format("发送请求 %s on %s%n%s", request.url(), chain.connection(), request.headers()));

    Response response = chain.proceed(request);

    long t2 = System.nanoTime();//收到响应的时间

    //这里不能直接使用response.body().string()的方式输出日志
    //因为response.body().string()之后,response中的流会被关闭,程序会报错,我们需要创建出一
    //个新的response给应用层处理
    ResponseBody responseBody = response.peekBody(1024 * 1024);

    LogUtils.d(String.format("接收响应: [%s]" +
                    "\n %n返回json:【%100s】 " +
                    "\n请求执行时间%.1fms" +
                    "\n%n%s",
            response.request().url(),
            responseBody.string(),
            (t2 - t1) / 1e6d,
            response.headers()));
    return response;
}


加上日志拦截器之后log如下图


image.png

看到打印出来的详细的log有木有感觉很酸爽。

Rxjava的封装

绑定Activity生命周期

对rxjava中的subcriber的封装,这里主要是将activity的生命周期和subcriber绑定联系起来,当activity被finish的时候我们的subcriber也应该dispose取消掉。

   private CompositeDisposable disposables2Stop;// 管理Stop取消订阅者者
    private CompositeDisposable disposables2Destroy;// 管理Destroy取消订阅者者

在baseaActivity中通过CompositeDisposable组合管理添加进来的Disposable。然后在ondestroy中进行统一取消,防止内存泄漏。

@Override
    protected void onDestroy() {
        super.onDestroy();
        if (disposables2Destroy == null) {
            throw new IllegalStateException(
                    "onDestroy called multiple times or onCreate not called");
        }
        disposables2Destroy.dispose();
        disposables2Destroy = null;
        if (mDelegate != null) {
            mDelegate.ondestroy();
            mDelegate = null;
        }
    }
基类订阅者BaseObserver

通用BaseObserver是继承于rxjava的Observer,在错误回调中的代码,前半部分是获取错误的堆栈进行打印的逻辑,后面是对各类错误的通用处理。

public void onError(Throwable e) {
        if (BuildConfig.DEBUG) {
            StringBuilder sb = new StringBuilder();
            StackTraceElement[] stacks = e.getStackTrace();
            sb.append(e.getMessage());
            sb.append("\n");
            for (StackTraceElement stack : stacks) {
                sb.append(stack.getMethodName());
                sb.append("(");
                sb.append(stack.getClassName());
                sb.append(".java:");
                sb.append(stack.getLineNumber());
                sb.append(")");
                sb.append("\n");
            }
            LogUtils.e("Retrofit", sb.toString());
        }
        mBaseImpl.dismissProgress();
        if (e instanceof HttpException) {                 //   HTTP错误
            onException(ExceptionReason.BAD_NETWORK);
        } else if (e instanceof ConnectException
                || e instanceof UnknownHostException) {   //   连接错误
            onException(ExceptionReason.CONNECT_ERROR);
        } else if (e instanceof InterruptedIOException) { //  连接超时
            onException(ExceptionReason.CONNECT_TIMEOUT);
        } else if (e instanceof JsonParseException
                || e instanceof JSONException
                || e instanceof ParseException) {         //  解析错误
            onException(ExceptionReason.PARSE_ERROR);
        } else {
            onException(ExceptionReason.UNKNOWN_ERROR);
        }
    }

Observer中另一个最重要的结果回调onNext中对errcode进行过滤,因为我自己封装的model层返回的BaseResponce是没有errorCode的,这个model后面会讲到,当然我也可以自己给通用BaseResponse加上200的code但是总感觉这两个逻辑还是不要耦合的好,万一code变了我model也要改,所以我在我在BaseResponce中设置了一个变量fromCache用于标记返回结果为缓存。代码如下:

@Override
    public void onNext(@NonNull T tBaseResponce) {
        LogUtils.d(tBaseResponce.errCode + " || from cache : " + tBaseResponce.fromCache);
        if (tBaseResponce.errCode == 200 || tBaseResponce.fromCache) {
            onSuccess(tBaseResponce);
        } else {
            onFail(tBaseResponce);
        }
    }
public class BaseResponse<T>{

    @SerializedName("code")
    public int errCode;

    @SerializedName("msg")
    public String errMsg;

    @SerializedName("data")
    public T realData;

    /**
     * 請求結果是否來自緩存
     */
    public boolean fromCache = false;

    public BaseResponse<T> setData(T data){
        realData = data;
        return this;
    }

    @Override
    public String toString() {
        return "BaseResponse{" +
                "errCode='" + errCode + '\'' +
                ", errMsg='" + errMsg + '\'' +
                ", data=" + realData +
                '}';
    }
}

另外BaseObserver引用的BaseImpl是activity的抽象接口,托管了进度条和绑定了activity的生命周期的逻辑。

public abstract class BaseObserver<T extends BaseResponse> implements Observer<T> {

    private BaseImpl mBaseImpl;
    //  Activity 是否在执行onStop()时取消订阅
    private boolean isAddInStop = false;
    private boolean needProgress = false;

    public BaseObserver(BaseImpl mBaseImpl,boolean needProgress) {
        this.needProgress = needProgress;
        this.mBaseImpl = mBaseImpl;
    }

    @Override
    public void onSubscribe(@NonNull Disposable d) {
        if(needProgress) mBaseImpl.showProgress("加载中");
        if (isAddInStop) {    //  在onStop中取消订阅
            mBaseImpl.addRxStop(d);
        } else { //  在onDestroy中取消订阅
            mBaseImpl.addRxDestroy(d);
        }
    }

    @Override
    public void onNext(@NonNull T tBaseResponce) {
        LogUtils.d(tBaseResponce.errCode + " || from cache : " + tBaseResponce.fromCache);
        if (tBaseResponce.errCode == 200 || tBaseResponce.fromCache) {
            onSuccess(tBaseResponce);
        } else {
            onFail(tBaseResponce);
        }
    }

    @Override
    public void onError(Throwable e) {
        if (BuildConfig.DEBUG) {
            StringBuilder sb = new StringBuilder();
            StackTraceElement[] stacks = e.getStackTrace();
            sb.append(e.getMessage());
            sb.append("\n");
            for (StackTraceElement stack : stacks) {
                sb.append(stack.getMethodName());
                sb.append("(");
                sb.append(stack.getClassName());
                sb.append(".java:");
                sb.append(stack.getLineNumber());
                sb.append(")");
                sb.append("\n");
            }
            LogUtils.e("Retrofit", sb.toString());
        }
        mBaseImpl.dismissProgress();
        if (e instanceof HttpException) {                 //   HTTP错误
            onException(ExceptionReason.BAD_NETWORK);
        } else if (e instanceof ConnectException
                || e instanceof UnknownHostException) {   //   连接错误
            onException(ExceptionReason.CONNECT_ERROR);
        } else if (e instanceof InterruptedIOException) { //  连接超时
            onException(ExceptionReason.CONNECT_TIMEOUT);
        } else if (e instanceof JsonParseException
                || e instanceof JSONException
                || e instanceof ParseException) {         //  解析错误
            onException(ExceptionReason.PARSE_ERROR);
        } else {
            onException(ExceptionReason.UNKNOWN_ERROR);
        }
    }

    @Override
    public void onComplete() {
        if(needProgress) mBaseImpl.dismissProgress();
    }

    /**
     * 请求成功
     *
     * @param response 服务器返回的数据
     */
    abstract public void onSuccess(T response);

    /**
     * 服务器返回数据,但响应码不为200
     *
     * @param response 服务器返回的数据
     */
    public void onFail(T response) {
        String message = response.errMsg;
        if (TextUtils.isEmpty(message)) {
            ToastUtils.showShort(R.string.response_return_error);
        } else {
            ToastUtils.showShort(message);
        }
    }

    /**
     * 请求异常
     *
     * @param reason
     */
    public void onException(ExceptionReason reason) {
        switch (reason) {
            case CONNECT_ERROR:
                ToastUtils.showShort(R.string.connect_error, Toast.LENGTH_SHORT);
                break;

            case CONNECT_TIMEOUT:
                ToastUtils.showShort(R.string.connect_timeout, Toast.LENGTH_SHORT);
                break;

            case BAD_NETWORK:
                ToastUtils.showShort(R.string.bad_network, Toast.LENGTH_SHORT);
                break;

            case PARSE_ERROR:
                ToastUtils.showShort(R.string.parse_error, Toast.LENGTH_SHORT);
                break;

            case UNKNOWN_ERROR:
            default:
                ToastUtils.showShort(R.string.unknown_error, Toast.LENGTH_SHORT);
                break;
        }
    }

    /**
     * 请求网络失败原因
     */
    public enum ExceptionReason {
        /**
         * 解析数据失败
         */
        PARSE_ERROR,
        /**
         * 网络问题
         */
        BAD_NETWORK,
        /**
         * 连接错误
         */
        CONNECT_ERROR,
        /**
         * 连接超时
         */
        CONNECT_TIMEOUT,
        /**
         * 未知错误
         */
        UNKNOWN_ERROR,
    }
}

Model层的封装

逻辑流程图

然后说下上面提到的model层,我定义了接口IRepository。
这个model的主要逻辑是 :


image.png

首先判断是否需要强制刷新,如果不需要强制刷新则去数据库缓存中查看是否含有对象的缓存,如果是网络获取判断是否需要缓存。这里的逻辑主要由客户端控制。

public interface IRepository<T> {

    /**
     * 用于gson解析,以及一些Logname的打印。
     * @return
     */
    Class getTClass();

    Observable<T> getEntry(final String url, Map<String, String> queryMap, final boolean needCache, boolean forceRefresh);

    Observable<T> getEntry(final String url, Map<String, String> queryMap);

    T getCache(String url) throws Exception;

    Observable<T> getEntryFromNet(String url, Map<String, String> queryMap, boolean needCache);

    void saveCache(String url, T baseBeanList);

    String getCacheKey(String url, Map<String, String> queryMap);

    void clearCache();

}
model的实现

拿目前公司的restful接口数据格式类型举例:

{
    "code":200,
    "msg":"请求成功",
    "data":{
        "count":10,
        "game_list":[
            {
                "gameid":362938
            }
        ]
    }
}

可以看出BaseResponce返回的泛型T对应的data数据还需要继续解析。所以以目前的IRepository<T>

public abstract class IDBFlowRespository<BeanContainer,DBBean> implements IRepository<BaseResponse<BeanContainer>>{

是代码是不能很好的封装满足需求的,所以我定义了抽象类继承IRepository。
定义了2个泛型BeanContainer和DBBean,数据库的相关操作基本由DBBean泛型实例完成,网络层的解析由BeanContainer完成。
各司其职。GameContainer对应的是上图json的data,gameList对应的是上图json的game_list。当然如果有其他类型的restful结构,我只需要在定义对应类型的repository抽象类就好了,毕竟现在返回的restful接口的json格式非常局限满世界也就那么几种,所以不用担心repository的扩展类太多的问题。

而真正的Repository实例代码非常少,只需要继承4个接口就能满足上述定义的model接口的功能,如下:

public class GameBeanRespository extends DBListRepository<GameContainerBean,GameContainerBean.GameListBean> {
    //用于Gson对泛型的解析
    @Override
    public Class getTClass() {
        return GameContainerBean.class;
    }

    //用于DB抽象类获取对数据库的引用
    @Override
    public Class getTableClass() {
        return GameContainerBean.GameListBean.class;
    }

    @Override
    public List<GameContainerBean.GameListBean> mapContainer(GameContainerBean beanContainer) {
        return beanContainer.gameList;
    }

    @Override
    public GameContainerBean mapTableBean(List<GameContainerBean.GameListBean> gameListBeen) {
        return new GameContainerBean(gameListBeen);
    }
}

BaseModel是我对实体的抽象继承的是DBflow的BaseModel可以进行数据库的增删改,很方便。

public abstract class BaseModel extends com.raizlabs.android.dbflow.structure.BaseModel{
    public static final String KEY = "keyUrl";
    @Column(name = KEY)
    public String keyUrl;
}

其中key是对每个bean对应的数据库增加的字段主要是用来根据url进行缓存查询的。
其中key是由Url拼接上queryMap的参数组成,逻辑如下:

public String getCacheKey(String url, Map<String, String> queryMap) {
        StringBuilder sb = new StringBuilder();
        sb.append(url);
        if (queryMap != null && !queryMap.isEmpty()) {
            Set<String> keys = queryMap.keySet();
            sb.append("?");
            for (String key : keys) {
                sb.append(key).append("=").append(queryMap.get(key));
            }
        }
        return sb.toString();
    }

有个小问题,因为网络数据获取是从我们定义的retrofit通用接口中返回,返回的对象是Obserable<ResponseBody>而我们的model接受的参数是Observable<BaseResponce<T>>,等于是承包了GsonConvertFactory的工作,我们把返回的Observer通过rxjava的map转成我们的Model对应的的Observer类型就行了。

@Override
    public Observable<BaseResponse<Container>> getEntryFromNet(String url, Map<String, String> queryMap, boolean needCache) {
        return HttpRequestFactory.retrofit().create(ApiService.class)
                .executeGet(url,queryMap).map(new Function<ResponseBody, BaseResponse<Container>>() {
                    @Override
                    public BaseResponse<Container> apply(@NonNull ResponseBody responseBody) throws Exception {
                        return GsonUtils.getInstance().fromJson(responseBody.string(), GsonUtils.type(BaseResponse.class,getTClass()));
                    }
                });
    }

获取model集合的的主要逻辑代码块如下:

@Override
    public Observable<BaseResponse<Container>> getEntry(final String url, Map<String, String> queryMap, final boolean needCache, boolean forceRefresh) {

        final String key = getCacheKey(url, queryMap);

        //get cache
        Observable<BaseResponse<Container>> fromCache = Observable.create(new ObservableOnSubscribe<BaseResponse<Container>>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<BaseResponse<Container>> e) throws Exception {
                final BaseResponse<Container> cacheResponce = getCache(key);
                if (cacheResponce != null) {
                    LogUtils.e("Cache hint  | key = " + key);
                    cacheResponce.fromCache = true;
                    e.onNext(cacheResponce);
                } else {
                    e.onComplete();
                }
            }
        });

        //save cache
        Observable<BaseResponse<Container>> fromNet = getEntryFromNet(url, queryMap ,needCache).map(new Function<BaseResponse<Container>, BaseResponse<Container>>() {
            @Override
            public BaseResponse<Container> apply(@NonNull BaseResponse<Container> tBaseResponse) throws Exception {
                if (needCache) saveCache(key, tBaseResponse);
                return tBaseResponse;
            }
        });

        if (forceRefresh) {
            return fromNet;
        }
        return Observable.concat(fromCache, fromNet)
                .subscribeOn(Schedulers.io())
                .unsubscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread());
    }

可以看到,Cache命中的时候会将里面BaseResponce的fromCache标记为true。这样就能和上面的提到的

BaseObserver对应的onNext逻辑相吻合了。

测试实例

new GameBeanRespository()
                .getEntry(new UrlConstant.Builder(false).shuffix().game().list().build()//参数url
                        ,new RxMap()
                                .put("page","2")
                                .put("offset","10")
                                .build())//参数query maps
                .subscribe(new BaseObserver<BaseResponse<GameContainerBean>>(this,false) {
                    @Override
                    public void onSuccess(BaseResponse<GameContainerBean> response) {
                        LogUtils.d(response.realData);
                    }
                });

GameRepository继承于DBlistRepository需要做的事情很少

public class GameBeanRespository extends DBListRepository<GameContainerBean,GameContainerBean.GameListBean> {

    @Override
    public Class getTClass() {
        return GameContainerBean.class;
    }

    @Override
    public Class getTableClass() {
        return GameContainerBean.GameListBean.class;
    }

    @Override
    public List<GameContainerBean.GameListBean> mapContainer(GameContainerBean beanContainer) {
        return beanContainer.gameList;
    }

    @Override
    public GameContainerBean mapTableBean(List<GameContainerBean.GameListBean> gameListBeen) {
        return new GameContainerBean(gameListBeen);
    }

}

上面看到的rxMap只是我写的一个链式调用的Map包装类,链式调用编写的效率和心情大家应该都理解 -3-

有兴趣的可以拿去用,也就是个小玩意儿。

public class RxMap<T,R>{

    Map<T,R> map;

    public static <T,R> RxMap<T,R> newInstance(){
        return new RxMap<>();
    }

    public RxMap() {
        this.map = new HashMap<>();
    }

    public RxMap(Map<T,R> map) {
        this.map = map;
    }

    public RxMap<T,R> put(T t, R r){
        map.put(t,r);
        return this;
    }

    public Map<T,R> build(){
        return map;
    }
}
DBflow

简单的说下DBflow,可能你直接看到了bean的实例进行了数据库的save操作,觉得很酸爽,确实很酸爽,而且DBflow继承了GreenDao和OrmLite各自的优点,简单易用上无可挑剔,自动生成数据Dao类,只需要类似于OrmLite利用注解声明各个bean之间的关系,另外继承BaseModel就让bean自己具备了增删改的能力了。

关于DBflow这个数据库的使用我就不多说了,因为太简单,学习成本低,推荐大家去用,用了感觉不爽来打我 - -!!,当然我不会告诉你我在哪里上班的。

后续我会抽出一个框架的demo的github地址补充在文章下面。

相关文章

网友评论

    本文标题:Rxjava2 + Retrofit +DBflow + 自定义

    本文链接:https://www.haomeiwen.com/subject/oyhfjxtx.html