美文网首页
使用Retrofit2+RxJava2+ProtoBuf实现网络

使用Retrofit2+RxJava2+ProtoBuf实现网络

作者: fomin | 来源:发表于2018-12-14 10:54 被阅读13次

    引言

    Retrofit 是一个用于 Android 和 Java 平台的类型安全的,底层使用OkHttp实现网络请求框架。Retrofit 通过将 API 抽象成 Java 接口而让我们连接到 REST web 服务变得很轻松。
    RxJava 提供一套异步编程的 API,这套 API 是基于观察者模式的,而且是链式调用的。
    Protocol Buffers 是一种轻便高效的结构化数据存储格式,可以用于结构化数据串行化,或者说序列化。它很适合做数据存储或 RPC 数据交换格式。
    主要讲解如何使用各个库封装网络请求,不讲解各库如何使用,具体可查看Rxjava2Retrofit2ProtoBuf

    依赖

    implementation 'com.squareup.retrofit2:retrofit:2.5.0'
    implementation 'io.reactivex.rxjava2:rxjava:2.2.4'
    implementation 'com.squareup.wire:wire-runtime:2.3.0-RC1'
    implementation 'com.squareup.retrofit2:retrofit-adapters:2.5.0'
    implementation 'com.squareup.retrofit2:converter-wire:2.5.0'
    implementation 'com.squareup.retrofit2:adapter-rxjava2:2.5.0'
    implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'
    

    其中RxAndroid是为优雅地处理异步请求和解决线程调度问题;Wire用于将请求结果转换为实体类型,并且wire是生成ProtoBuf文件的一种,没有用官方的protobuf生成java文件,主要是为了解决64k限制,减少生成java的代码量,但需要注意的是wire生成的java文件需要判断null,而官方的protobuf生成java文件是有默认值的,无需判断null。

    proto文件

    在客户端可能需要相关客户端信息,例如设备信息、设备类型,app信息、网络信息等。

    // 设备类型
    enum PBDeviceType {
        DEVICE_ANDROID = 0;                      // 安卓
        DEVICE_IOS = 1;                          // 苹果
        DEVICE_PC = 2;                           // PC
    }
    
    // 设备
    message PBDevice {
        string deviceId = 1;                    // 设备ID
        string deviceOs = 2;                    // 设备操作系统
        string deviceModel = 3;                 // 设备模型
        PBDeviceType deviceType = 4;             // 设备类型,参考PBDeviceType
    }
    
    // 网络类型
    enum PBNetworkType {
        NET_UNKNOWN = 0;                         // 未知网络
        NET_WIFI = 1;                            // WIFI
        NET_2G = 2;                              // 2G网络
        NET_3G = 3;                              // 3G网络
        NET_4G = 4;                              // 4G网络
    }
    
    // APP信息
    message PBAppInfo {
        string versionName = 1;                 // 应用程序版本名
        uint32 versionCode = 2;                 // 应用程序版本号
        PBNetworkType network = 3;               // 网络信息
        PBDevice device = 4;                     // 设备信息
    }
    
    

    定义Request和Response

    // 消息请求包
    message PBRequest {
        uint32 type = 1;                        // 消息类型
        bytes messageData = 2;                  // 请求数据
        uint64 timestamp = 3;                   // 客户端时间戳
        PBAppInfo appInfo = 4;                   // APP信息
    }
    
    // 消息响应包
    message PBResponse {
        uint32 type = 1;                        // 消息类型
        bytes messageData = 2;                  // 返回数据
        uint32 resultCode = 3;                  // 返回的结果码
        string resultInfo = 4;                  // 返回的结果消息提示文本(用于错误提示)
    }
    

    使用命令生成相关的Java文件

    java -jar ${wire_compiler} --proto_path=${protoPath} --java_out=${modelPath} $1
    
    • wire_compiler:wire.jar的存放路径
    • protoPath:proto文件存放路径
    • modelPath:生成java文件存放路径


      图片.png

    请求接口

    定义一个Service的接口类,管理所有请求接口

    public interface Service {
    
        @POST("users/new")
        Observable<Response<PBResponse>> sendMessage(@Body PBRequest request);
    }
    

    请求处理

    定义一个RetrofitHandler类,处理retrofit的初始化和发送请求,该类使用静态单例进行初始化。

    private Service mService;           // 请求接口
    private static String mServiceUrl;  // 请求url
    
    private static class Holder {
        private static final RetrofitHandler INSTANCE = new RetrofitHandler();
    }
    
    private RetrofitHandler() {
        init();
    }
    
    public static RetrofitHandler getInstance(String serviceUrl) {
        mServiceUrl = serviceUrl;
        return Holder.INSTANCE;
    }
    

    retrofit的初始化,使用wire的factory进行数据转换,使用rxjava2进行适配。

    private void init() {
        assert mServiceUrl != null;
        Retrofit retrofit = new Retrofit.Builder()
                .baseUrl(mServiceUrl)
                .client(createClient())
                .addConverterFactory(WireConverterFactory.create())
                .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
                .build();
        mService = retrofit.create(Service.class);
    }
    
    private OkHttpClient createClient() {
        return new OkHttpClient.Builder()
                .retryOnConnectionFailure(true)
                .connectionPool(new ConnectionPool(5, 1, TimeUnit.MINUTES))
                .connectTimeout(10, TimeUnit.SECONDS)
                .readTimeout(20, TimeUnit.SECONDS)
                .writeTimeout(5, TimeUnit.SECONDS)
                .build();
    
    }
    

    retrofit进行接口调用发送请求

    public Observable<PBResponse> send(final PBRequest request) {
        return mService.sendMessage(request)
                .map(new Function<Response<PBResponse>, PBResponse>() {
                    @Override
                    public PBResponse apply(Response<PBResponse> response) throws Exception {
                        if (response == null) {
                            return failed(request.type, 1001, "未知错误");
                        }
                        if (response.code() != 200 || response.body() == null) {
                            return failed(request.type, response.code(), response.message());
                        }
                        return response.body();
                    }
                });
    }
    
    private PBResponse failed(int type, int code, String info) {
        return new PBResponse.Builder()
                .type(type)
                .resultCode(code)
                .resultInfo(info)
                .build();
    }
    

    返回处理

    定义ApiResult的泛型返回模型,对返回数据进行二次处理

    public class ApiResult<T extends Message> {
        private int code;        // 返回码
        private String message;  // 返回信息
        private T response;      // 返回数据
    
        public ApiResult(int code, String message, T response) {
            this.code = code;
            this.message = message;
            this.response = response;
        }
    
        public int getCode() {
            return code;
        }
    
        public String getMessage() {
            return message;
        }
    
        public T getResponse() {
            return response;
        }
    }
    

    定义ApiCallback泛型回调处理类,给业务层进行相关的业务处理,如刷新UI等。

    public class ApiCallback<T extends Message> {
        private boolean isDisposed = false;  // 返回处理标志
    
        public boolean isDisposed() {
            return isDisposed;
        }
    
        public void setDisposed(boolean disposed) {
            isDisposed = disposed;
        }
    
        public void onStart() {
           // 请求开始
        }
    
        public void onSuccess(T response) {
    
        }
    
        public void onFailure(int code, String message) {
    
        }
    
        public void onCompleted() {
            // 请求完成
        }
    }
    

    定义IRquest接口,给业务端初始化相关客户端信息

    public interface IRequest {
        String getVersionName();
    
        Integer getVersionCode();
    
        PBNetworkType getNetworkType();
    
        PBDevice getDevice();
    
    }
    

    定义HttpManager类,使用单例进行实例化

    private static class Holder {
        private static final HttpManager INSTANCE = new HttpManager();
    }
    
    private HttpManager() {
    
    }
    
    public static HttpManager getInstance() {
        return HttpManager.Holder.INSTANCE;
    }
    

    定义一个初始化方法,可以在application进行初始化

    public void init(IRequest request, String url){
        mRequest = request;
        mHandler = RetrofitHandler.getInstance(url);
    }
    

    使用Rxjava2进行返回处理

    @SuppressLint("CheckResult")
    public <T extends Message, P extends Message> Disposable request(final T request, final int messageType,
                                                                     final Class<P> pClass, final ApiCallback<P> apiCallback) {
        
        Observable observable = mHandler.send(buildBody(request, messageType))  // 发送请求
                .map(new Function<PBResponse, ApiResult<? extends Message>>() {
                    @Override
                    public ApiResult<? extends Message> apply(PBResponse pbResponse) throws Exception {
                        return apiResult(pClass, pbResponse);
                    }
                });
        observable.doOnDispose(new Action() { // 业务端取消订阅时调用
            @Override
            public void run() throws Exception {
                apiCallback.setDisposed(true);
            }
        });
        return toSubscribe(observable, new Consumer<ApiResult<P>>() {
            @Override
            public void accept(ApiResult<P> apiResult) throws Exception {
                if (apiCallback == null || apiCallback.isDisposed()) return;
                try {
                    if (apiResult.getCode() == 200) {
                        apiCallback.onSuccess(apiResult.getResponse());
                    } else {
                        apiCallback.onFailure(apiResult.getCode(), apiResult.getMessage());
                    }
                } catch (Exception ex) {
                    apiCallback.onFailure(1004, "客户端处理异常");
                } finally {
                    apiCallback.onCompleted();
                }
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) {
                if (apiCallback == null || apiCallback.isDisposed()) return;
                //客户端本地的异常,如断网等
                try {
                    apiCallback.onFailure(1005, "网络连接错误");
                } catch (Exception e) {
                } finally {
                    apiCallback.onCompleted();
                }
            }
        }, new Action() {
            @Override
            public void run() {
                if (apiCallback == null || apiCallback.isDisposed()) return;
    
                //onCompleted will not be called when occurs network exception, like disconnected/timeout, replace invoking at onNext/onError
            }
        }, new Consumer<Disposable>() {
            @Override
            public void accept(Disposable disposable) {
                if (apiCallback == null || apiCallback.isDisposed()) return;
                apiCallback.onStart();
            }
        });
    }
    
    // 异步订阅
    private <T> Disposable toSubscribe(Observable<T> o, Consumer<T> onNext, Consumer<Throwable> onError, Action onComplete, Consumer<Disposable> onSubscribe) {
        return o.subscribeOn(Schedulers.io())// io线程处理
                .unsubscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())// 主线程处理
                .subscribe(onNext, onError, onComplete, onSubscribe);
    }
    

    使用

    初始化

    HttpManager.getInstance().init(new RequestInfo(), mServerUrl);
    

    使用MVP或者MVVM的架构,在P或者VM层调用请求

    HttpManager.getInstance().request(new PBAppInfo.Builder().build(), 1001, PBResponse.class, new ApiCallback<PBResponse>() {
    
        @Override
        public void onSuccess(PBResponse response) {
            super.onSuccess(response);
        }
    
        @Override
        public void onFailure(int code, String message) {
            super.onFailure(code, message);
        }
    });
    

    相关文章

      网友评论

          本文标题:使用Retrofit2+RxJava2+ProtoBuf实现网络

          本文链接:https://www.haomeiwen.com/subject/vovwhqtx.html