本篇文章只对RxJava的实际用法做一个简单的说明,并不会对其原理进行详细分析,如果想了解更多,可以去看一下扔物线的文章。给 Android 开发者的 RxJava 详解
在本文中与RxJava一起使用的网络框架是Retrofit,如果有不了解的同学可以先去学习一下。我会把网络请求分别用Retrofit和Retrofit+RxJava请求,让大家看到Rxjava的优势。(有一种被人喂了翔的感觉,RxJava刚整明白,居然又出RxJava2了,好多类都没有了)
public static Retrofit getInatance(String url) {
if (retrofit == null) {
synchronized (RetrofitHelper.class) {
if (retrofit == null) {
OkHttpClient.Builder builder = new OkHttpClient.Builder();
builder.connectTimeout(15, TimeUnit.SECONDS);
retrofit = new Retrofit.Builder().baseUrl(url)
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
.client(builder.build())
.build();
}
}
}
return retrofit;
}
public static ApiManger getManger() {
return getInatance(ApiManger.BASE_URL).create(ApiManger.class);
}
public interface ApiManger {
String BASE_URL = "http://198.56.247.61:8089/";
@GET()
Call<StateModel> getState();
@GET()
Call<ChargeModel> getCharge();
}
上面是用到的一些封装以及模拟的Restful接口。
ok,先来一种简单的请求,在我的项目中,有一个是请求对方在线状态的,如果只用Retrofit需要这么写:
ApiManger apiManger = RetrofitHelper.getManger();
apiManger.getState().enqueue(new Callback<StateModel>() {
@Override
public void onResponse(Call<StateModel> call, Response<StateModel> response) {
StateModel stateModel = response.body();
if (stateModel.getState() == 0) {
Logger.e("online");
} else {
Logger.e("offline");
}
}
@Override
public void onFailure(Call<StateModel> call, Throwable t) {
}
});
这里的response.body()是Retrofit的固定用法,在这里暂且不提,貌似这个形式和我们之前用过的xUtils或者okHttp没有什么不同。确实如此,接下来我们结合RxJava使用一下,使用RxJava时,Retrofit的返回类型需要把Call换成Observable。
ApiManger apiManger = RetrofitHelper.getManger();
apiManger.getState()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<StateModel>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(StateModel stateModel) {
if (stateModel.getState() == 0) {
Logger.e("online");
} else {
Logger.e("offline");
}
}
});
好像区别还是不很大,只不过回调中多了一个方法,我们先来分析一下这段代码的意思,首先调用获取状态的方法,返回的是Observable<StateModel>,RxJava的思想就是everything is a stream,个人理解调用这个方法就好比启动一条流水线,而这条流水线传递的就是StateModel这个类型的对象,这个对象在订阅之后会传递到onNext方法中,subscribeOn指定了前面方法运行所在的线程,后面的observeOn指定了后面观察的线程,如果请求成功则会先执行onNext方法,之后再执行onCompleted,如果失败则直接执行onError。
ApiManger apiManger = RetrofitHelper.getManger();
apiManger.getState()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.filter(new Func1<StateModel, Boolean>() {
@Override
public Boolean call(StateModel stateModel) {
return stateModel.getCode() == 0;
}
})
.subscribe(new Subscriber<StateModel>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(StateModel stateModel) {
if (stateModel.getState() == 0) {
Logger.e("online");
} else {
Logger.e("offline");
}
}
});
为了向大家更好的说明流水线这个概念,我们可以再添加一个操作符filter,即便filter中Func1的方法返回值是boolean类型,但是调用filter方法后返回的类型依然还是Observable<StateModel>,传递的类型依然是StateModel,只不过是把state非0的对象过滤了一下。这就好像流水线中的产品一步一步向后传递,中间经过了多条工序一样。
或许到这里你并不认为RxJava有什么优点,我们再加大一下业务难度,前面我们是查询了对方在线状态,然后用户给对方打电话,但我们的app是需要余额充值的,所以检查成功并且对方是在线状态的话我们需要继续查询用户的余额,如果用Retrofit我们需要这么做:
ApiManger apiManger = RetrofitHelper.getManger();
apiManger.getState().enqueue(new Callback<StateModel>() {
@Override
public void onResponse(Call<StateModel> call, Response<StateModel> response) {
StateModel stateModel = response.body();
if (stateModel.getState() == 0) {
apiManger.getCharge().enqueue(new Callback<ChargeModel>() {
@Override
public void onResponse(Call<ChargeModel> call, Response<ChargeModel> response) {
//拨打电话
//hideLoading();
}
@Override
public void onFailure(Call<ChargeModel> call, Throwable t) {
//hideLoading();
}
});
} else {
//hideLoading();
Logger.e("offline");
}
}
@Override
public void onFailure(Call<StateModel> call, Throwable t) {
//hideLoading();
}
});
看到这么多的缩进是不是有点凌乱,如果想要体验好一些的话,我们还需要加上正在等待的对话框,而隐藏的时候也会很麻烦,那我们看一下RxJava是怎么优雅地处理这个问题的。
ApiManger apiManger = RetrofitHelper.getManger();
apiManger.getState()
.flatMap(new Func1<StateModel, Observable<ChargeModel>>() {
@Override
public Observable<ChargeModel> call(StateModel stateModel) {
return stateModel.getState() != 0 ? Observable.empty() : apiManger.getCharge();
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<ChargeNodel>() {
@Override
public void onCompleted() {
//hideLoading();
}
@Override
public void onError(Throwable e) {
//hideLoading();
}
@Override
public void onNext(ChargeModel chargeModel) {
if(chargeModel.getCharge() > 0){
//拨打电话
}
}
});
看一下代码的简洁性,高下立判。当然YY之后我们还是需要分析一下这段代码是什么意思,首先调用getState方法启动流水线,获取对方状态,获取之后将产品(stateModel对象)向后传递,产品传递到flatMap方法中(flatMap的意思个人理解为重新启动了一条流水线并接着之前的流水线继续传递),我们拿到getState这条流水线的产品判断一下他是否符合标准(状态为在线),如果在线那就请求getCharge并重新发射一个流(重新启动流水线),如果不在线就什么都不干,这里的Observable.empty()方法就是直接结束流水线最后会调用onCompleted方法。重新发射流之后流水线的产品就会变换为新的产品,所以onNext中的对象是chargeNodel。
但有人说RxJava代码量太大,其实如果对RxJava比较熟悉之后我们会发现,各个操作符之间传递的就是我们需要的对象,我们可以不需要了解Func1等内部类的逻辑,所以我们可以用retrolambda简化一下,简化之后的代码是这个样子的:
ApiManger apiManger = RetrofitHelper.getManger();
apiManger.getState()
.flatMap(stateModel -> stateModel.getState() != 0 ? Observable.empty() : apiManger.getCharge())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(chargeModel -> {
if(chargeModel.getCharge() > 0){
//拨打电话
}
}, throwable -> {
});
由于个人表达能力有限,所以只能提供代码来说明,希望能给正在学习RxJava的同学提供一些帮助,当然也希望抛砖引玉能够得到更多大神指点。
网友评论