创建一个被观察者,发起一个网络请求。
observable
Retrofit retrofit = new Retrofit.Builder()
.baseUrl(Urls.baseUrl)
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.build();
GetRequest request=retrofit.create(GetRequest.class);
Observable<Translation> observable=request.getCall1();
GetRequest代码
public interface GetRequest {
@GET(Urls.test)
Observable<Translation> getCall();
@GET(Urls.test1)
Observable<Translation> getCall1();
Translation代码
public class Translation {
private int status;
private content content;
private static class content{
private String from;
private String to;
private String vendor;
private String out;
private int errNo;
}
public String show(){
Log.e("yzh",content.out);
return content.out;
}
}
对retrofit不太了解的可以去看一下retrofit入门
1 模拟一次重复查询的情况,使用了Rxjava的interval操作符
//最外面是一个循环的操作
Observable.interval(2,1, TimeUnit.SECONDS)
.doOnNext(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.e("yzh","第"+aLong+"次轮询");
//请求在这里执行,注意看到线程的切换
observable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Translation>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Translation translation) {
translation.show();
}
@Override
public void onError(Throwable e) {
Log.e("yzh","请求失败");
}
@Override
public void onComplete() {
}
});
}
}).subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long aLong) {
}
@Override
public void onError(Throwable e) {
Log.e("yzh","对Error事件作出响应");
}
@Override
public void onComplete() {
Log.e("yzh","对Complete事件作出响应");
}
});
打印结果
// 03-12 14:56:16.528 8733-8760/com.example.issuser.rxtest E/yzh: 第0次轮询
// 03-12 14:56:16.886 8733-8733/com.example.issuser.rxtest E/yzh: 嗨世界
// 03-12 14:56:17.528 8733-8760/com.example.issuser.rxtest E/yzh: 第1次轮询
// 03-12 14:56:17.702 8733-8733/com.example.issuser.rxtest E/yzh: 嗨世界
// 03-12 14:56:18.529 8733-8760/com.example.issuser.rxtest E/yzh: 第2次轮询
// 03-12 14:56:18.675 8733-8733/com.example.issuser.rxtest E/yzh: 嗨世界
// 03-12 14:56:19.528 8733-8760/com.example.issuser.rxtest E/yzh: 第3次轮询
// 03-12 14:56:19.674 8733-8733/com.example.issuser.rxtest E/yzh: 嗨世界
2.条件轮询 ,使用RxJava的repeatWhen
被观察者的网络请求同上
observable.repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Observable<Object> objectObservable) throws Exception {
return objectObservable.flatMap(new Function<Object, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Object o) throws Exception {
//设置轮询条件
if(i>3){
return Observable.error(new Throwable("轮询结束"));
}else{
return Observable.just(1).delay(2, TimeUnit.SECONDS);
}
}
});
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Translation>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("yzh","onSubscribe");
}
@Override
public void onNext(Translation translation) {
translation.show();
i++;
}
@Override
public void onError(Throwable e) {
Log.e("yzh","onError--"+e.toString());
}
@Override
public void onComplete() {
}
});
打印结果
03-12 15:25:00.524 12910-12910/com.example.issuser.rxtest E/yzh: onSubscribe
03-12 15:25:00.715 12910-12910/com.example.issuser.rxtest E/yzh: hi china
03-12 15:25:02.852 12910-12910/com.example.issuser.rxtest E/yzh: hi china
03-12 15:25:05.092 12910-12910/com.example.issuser.rxtest E/yzh: hi china
03-12 15:25:07.244 12910-12910/com.example.issuser.rxtest E/yzh: hi china
03-12 15:25:09.382 12910-12910/com.example.issuser.rxtest E/yzh: hi china
03-12 15:25:09.395 12910-12910/com.example.issuser.rxtest E/yzh: onError--java.lang.Throwable: 轮询结束
3 网络异常重连 这个是常见的情况但是不好模拟所以没有打印结果
用到的是RxJava的retryWhen
observable.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Throwable throwable) throws Exception {
Log.e("yzh","发生异常=="+throwable.toString());
if(throwable instanceof IOException){
Log.e("yzh","属于IO异常,重试");
if(currentRetryCount<maxConnectCount){
currentRetryCount++;
Log.e("yzh","重试的次数--"+currentRetryCount);
waitRetryTime=1000+currentRetryCount*1000;
Log.e("yzh","等待时间=="+waitRetryTime);
return Observable.just(1).delay(waitRetryTime, TimeUnit.MILLISECONDS);
}else{
return Observable.error(new Throwable("重试次数已超过设置次数 = " +currentRetryCount + ",即 不再重试"));
}
}else{
return Observable.error(new Throwable("发生了非网络异常(非I/O异常)"));
}
}
});
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Translation>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Translation translation) {
Log.e("yzh", "发送成功");
translation.show();
}
@Override
public void onError(Throwable e) {
Log.e("yzh",e.toString());
}
@Override
public void onComplete() {
}
});
4 对网络请求返回的数据再做一次处理,用到RxJava的flatmap
常见的使用场景是注册并且直接登录这个场景,注意线程的切换
observable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(new Consumer<Translation>() {
@Override
public void accept(Translation translation) throws Exception {
Log.e("yzh","doOnNextThread--"+Thread.currentThread().getName());
Log.e("yzh","第一次网络请求成功");
translation.show();
}
})
//切换观察者所在线程
.observeOn(Schedulers.io())
.flatMap(new Function<Translation, ObservableSource<Translation>>() {
@Override
public ObservableSource<Translation> apply(Translation translation) throws Exception {
Log.e("yzh","flatMapThread--"+Thread.currentThread().getName());
Log.e("yzh","flatmap--");
translation.show();
//另一个网络请求,这里就不详细列举了
return observable1;
}
})
//切换观察者所在线程
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Translation>() {
@Override
public void accept(Translation translation) throws Exception {
Log.e("yzh","acceptThread--"+Thread.currentThread().getName());
Log.e("yzh","accept--");
translation.show();
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
}
});
打印结果 注意在第二次请求时 观察者所在线程的切换
03-12 11:50:40.681 11667-11667/com.example.issuser.rxtest E/yzh: doOnNextThread--main
03-12 11:50:40.681 11667-11667/com.example.issuser.rxtest E/yzh: 第一次网络请求成功
03-12 11:50:40.681 11667-11667/com.example.issuser.rxtest E/yzh: 嗨世界
03-12 11:50:40.681 11667-11720/com.example.issuser.rxtest E/yzh: flatMapThread--RxCachedThreadScheduler-2
03-12 11:50:40.681 11667-11720/com.example.issuser.rxtest E/yzh: flatmap--
03-12 11:50:40.681 11667-11720/com.example.issuser.rxtest E/yzh: 嗨世界
03-12 11:50:40.949 11667-11667/com.example.issuser.rxtest E/yzh: acceptThread--main
03-12 11:50:40.949 11667-11667/com.example.issuser.rxtest E/yzh: accept--
03-12 11:50:40.949 11667-11667/com.example.issuser.rxtest E/yzh: hi china
5 联合判断多个条件 应用于登录注册等需要填完多个信息
a. RxTextView.textChanges()监听控件的数据变化 ,需要引入依赖:compile 'com.jakewharton.rxbinding2:rxbinding:2.0.0'
b. skip(1) 跳过控件一开始无任何输入值的情况
Observable<CharSequence> nameObservable = RxTextView.textChanges(et_name).skip(1);
Observable<CharSequence> ageObservable = RxTextView.textChanges(et_age).skip(1);
Observable<CharSequence> jobObservable = RxTextView.textChanges(et_job).skip(1);
Observable.combineLatest(nameObservable, ageObservable, jobObservable, new Function3<CharSequence, CharSequence, CharSequence, Boolean>() {
@Override
public Boolean apply(CharSequence charSequence, CharSequence charSequence2, CharSequence charSequence3) throws Exception {
boolean isUserNameValid= !TextUtils.isEmpty(et_name.getText().toString());
boolean isUserAgeValid=!TextUtils.isEmpty(et_age.getText().toString());
boolean isUserJobValid=!TextUtils.isEmpty(et_job.getText().toString());
return isUserNameValid&isUserAgeValid&isUserJobValid;
}
}).subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
Log.e("yzh","提交按钮是否可以点击--"+aBoolean);
}
});
6 使用RxJava时要注意的内存泄露,例如有时候页面结束 异步操作并未完成
// 防止activity结束 出现问题
private final CompositeDisposable disposables = new CompositeDisposable();
disposables.add(observer);
@Override
protected void onDestroy() {
super.onDestroy();
// 将所有的 observer 取消订阅
disposables.clear();
}
网友评论