RxPermissions总结
Rxjava中知识点总结
rxjava2.pngSubscriber和Observer接口的区别:
在RxJava2版本中2.1.11中没有区别
<--特别注意:2种方法的区别,即Subscriber 抽象类与Observer 接口的区别 -->
1、 相同点:
二者基本使用方式完全一致(实质上,在RxJava的 subscribe 过程中,Observer总是会先被转换成Subscriber再使用)
2、不同点:
Observer.pngSubscriber.png
Subscriber抽象类对 Observer 接口进行了扩展,新增了两个方法:
// 1. onStart():在还未响应事件前调用,用于做一些初始化工作
// 2. unsubscribe():用于取消订阅。在该方法被调用后,观察者将不再接收 & 响应事件
// 调用该方法前,先使用 isUnsubscribed() 判断状态,确定被观察者Observable是否还持有观察者Subscriber的引用,如果引用不能及时释放,就会出现内存泄露
3、观察者实现方式:
<--方式1:采用Observer 接口 -->
// 1. 创建观察者 (Observer )对象
Observer<Integer> observer = new Observer<Integer>() {
// 2. 创建对象时通过对应复写对应事件方法 从而 响应对应事件
// 观察者接收事件前,默认最先调用复写 onSubscribe()
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "开始采用subscribe连接");
}
// 当被观察者生产Next事件 & 观察者接收到时,会调用该复写方法 进行响应
@Override
public void onNext(Integer value) {
Log.d(TAG, "对Next事件作出响应" + value);
}
// 当被观察者生产Error事件& 观察者接收到时,会调用该复写方法 进行响应
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
// 当被观察者生产Complete事件& 观察者接收到时,会调用该复写方法 进行响应
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
};
<--方式2:采用Subscriber 抽象类 -->
// 说明:Subscriber类 = RxJava 内置的一个实现了 Observer 的抽象类,对 Observer 接口进行了扩展
// 1. 创建观察者 (Observer )对象
Subscriber<String> subscriber = new Subscriber<Integer>() {
// 2. 创建对象时通过对应复写对应事件方法 从而 响应对应事件
// 观察者接收事件前,默认最先调用复写 onSubscribe(),默认都会进行调用一次
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "开始采用subscribe连接");
}
// 当被观察者生产Next事件 & 观察者接收到时,会调用该复写方法 进行响应
@Override
public void onNext(Integer value) {
Log.d(TAG, "对Next事件作出响应" + value);
}
// 当被观察者生产Error事件& 观察者接收到时,会调用该复写方法 进行响应
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
// 当被观察者生产Complete事件& 观察者接收到时,会调用该复写方法 进行响应
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
};
创建观察者最先开始调用的是onSubscribe()方法
<-- Observable.subscribe(Subscriber) 的内部实现 -->
public Subscription subscribe(Subscriber subscriber) {
subscriber.onStart();
// 步骤1中 观察者 subscriber抽象类复写的方法,用于初始化工作
onSubscribe.call(subscriber);
// 通过该调用,从而回调观察者中的对应方法从而响应被观察者生产的事件
// 从而实现被观察者调用了观察者的回调方法 & 由被观察者向观察者的事件传递,即观察者模式
// 同时也看出:Observable只是生产事件,真正的发送事件是在它被订阅的时候,即当 subscribe() 方法执行时
}
观察者 Observer的subscribe()具备多个重载的方法
public final Disposable subscribe() {}
// 表示观察者不对被观察者发送的事件作出任何响应(但被观察者还是可以继续发送事件)
public final Disposable subscribe(Consumer<? super T> onNext) {}
// 表示观察者只对被观察者发送的Next事件作出响应
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {}
// 表示观察者只对被观察者发送的Next事件 & Error事件作出响应
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
// 表示观察者只对被观察者发送的Next事件、Error事件 & Complete事件作出响应
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
// 表示观察者只对被观察者发送的Next事件、Error事件 、Complete事件 & onSubscribe事件作出响应
public final void subscribe(Observer<? super T> observer) {}
// 表示观察者对被观察者发送的任何事件都作出响应
采用 Disposable.dispose() 切断观察者 与 被观察者 之间的连接
操作符subscribeOn() & observeOn()简介
Schedulers.png常用的操作符:
创建的操作符
create icon操作符的总结:以下是一个常用的操作符和参数的介绍
// 参数说明:
// 参数1 = 第1次延迟时间;
// 参数2 = 间隔时间数字;
// 参数3 = 时间单位;
Observable.interval(3,1,TimeUnit.SECONDS)
// 该例子发送的事件序列特点:延迟3s后发送事件,每隔1秒产生1个数字(从0开始递增1,无限个)
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "开始采用subscribe连接");
}
// 默认最先调用复写的 onSubscribe()
@Override
public void onNext(Long value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
// 注:interval默认在computation调度器上执行
// 也可自定义指定线程调度器(第3个参数):interval(long,TimeUnit,Scheduler)
使用flatMap操作符进行多次网络请求封装
demo 注册登录功能的案例:
// 定义Observable接口类型的网络请求对象
Observable<Translation1> observable1;
final Observable<Translation2> observable2;
// 步骤1:创建Retrofit对象
Retrofit retrofit = new Retrofit.Builder()
.baseUrl("http://fy.iciba.com/") // 设置 网络请求 Url
.addConverterFactory(GsonConverterFactory.create()) //设置使用Gson解析(记得加入依赖)
.addCallAdapterFactory(RxJava2CallAdapterFactory.create()) // 支持RxJava
.build();
// 步骤2:创建 网络请求接口 的实例
GetRequest_Interface request = retrofit.create(GetRequest_Interface.class);
// 步骤3:采用Observable<...>形式 对 2个网络请求 进行封装
observable1 = request.getCall1();
observable2 = request.getCall_2();
observable1.subscribeOn(Schedulers.io()) // (初始被观察者)切换到IO线程进行网络请求1
.observeOn(AndroidSchedulers.mainThread()) // (新观察者)切换到主线程 处理网络请求1的结果
.doOnNext(new Consumer<Translation1>() {
@Override
public void accept(Translation1 result) throws Exception {
System.out.println("第1次网络请求成功");
System.out.println(result.getContent().getWord_mean().get(0));
// 对第1次网络请求返回的结果进行操作 = 显示翻译结果
}
})
.observeOn(Schedulers.io()) // (新被观察者,同时也是新观察者)切换到IO线程去发起登录请求
// 特别注意:因为flatMap是对初始被观察者作变换,所以对于旧被观察者,它是新观察者,所以通过observeOn切换线程
// 但对于初始观察者,它则是新的被观察者
.flatMap(new Function<Translation1, ObservableSource<Translation2>>() { // 作变换,即作嵌套网络请求
@Override
public ObservableSource<Translation2> apply(Translation1 result) throws Exception {
// 将网络请求1转换成网络请求2,即发送网络请求2
return observable2;
}
})
.observeOn(AndroidSchedulers.mainThread()) // (初始观察者)切换到主线程 处理网络请求2的结果
.subscribe(new Consumer<Translation2>() {
@Override
public void accept(Translation2 result) throws Exception {
System.out.println("第2次网络请求成功");
System.out.println(result.getContent().getWord_mean().get(0));
// 对第2次网络请求返回的结果进行操作 = 显示翻译结果
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
System.out.println("登录失败");
}
});
使用map操作符进行相关的结果集的转换:
Observable.just("1","2","3").map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
return s + "xxx" ;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("s : "+ s ) ;
}
});
使用interval字段进行网络无限次请求操作(项目中一般会使用有调条件的拉取数据的操作)
/**
* 实现网络无限次请求的操作
*/
private void getIntervalData() {
//进行网络轮训请求的操作
//该例子发送的事件特点:延迟2s后发送事件,每隔1秒产生1个数字(从0开始递增1,无限个)
Observable.interval(2, 2, TimeUnit.SECONDS)
.doOnNext(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
//在此处为无限次的调用此方法中的代码
System.out.println("第 " + aLong + " 次轮询");
Log.d(TAG, "第 " + aLong + " 次轮询");
Retrofit retrofit = new Retrofit.Builder()
.baseUrl("http://fy.iciba.com/") // 设置 网络请求 Url
.addConverterFactory(GsonConverterFactory.create()) //设置使用Gson解析(记得加入依赖)
.addCallAdapterFactory(RxJava2CallAdapterFactory.create()) // 支持RxJava
.build();
// b. 创建 网络请求接口 的实例
GetRequest_Interface request = retrofit.create(GetRequest_Interface.class);
Observable<Translation> observable = request.getCall();
observable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Translation>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Translation value) {
Log.d(TAG,value.getContent().getWord_mean().get(0));
System.out.println(value.getContent().getWord_mean().get(0));
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "请求失败");
}
@Override
public void onComplete() {
}
});
}
}).subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long value) {
Log.d(TAG, "onNext: subscribe : " + value);
System.out.println("onNext: subscribe : " + value);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
组合多个被观察者的使用:concat 和concatArray
- 作用
组合多个被观察者一起发送数据,合并后 按发送顺序串行执行
二者区别:组合被观察者的数量,即concat()组合被观察者数量≤4个,而concatArray()则可>4个
combineLatest()
- 作用
当两个Observables中的任何一个发送了数据后,将先发送了数据的Observables 的最新(最后)一个数据 与 另外一个Observable发送的每个数据结合,最终基于该函数的结果发送数据
若Observable.observeOn()多次指定观察者 接收 & 响应事件的线程,则每次指定均有效,即每指定一次,就会进行一次线程的切换
若Observable.subscribeOn()多次指定被观察者 生产事件的线程,则只有第一次指定有效,其余的指定线程无效
Rxjava2中的监听取消订阅的方法doOnDispose(),当取消后会执行这个监听的回调
Observable<Long> observable = Observable.interval(1, TimeUnit.SECONDS);
subscribe = observable.doOnDispose(new Action() {
@Override
public void run() throws Exception {
Log.d(TAG, "run: 进行取消了相关的订阅事件!");
}
}).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
textView.setText(aLong + "");
}
});
取消相关定于的逻辑代码,包括RxBinding的使用
RxView.clicks(btnAtrlDis).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
if (subscribe!=null && !subscribe.isDisposed()) {
Toast.makeText(TestRxLifeActivity.this, "取消了相关消息", Toast.LENGTH_SHORT).show();
subscribe.dispose();
}
}
});
RxBinding的使用
监听一个Button的点击事件及事件防抖
RxView.clicks(btnAmRxbindIng).throttleFirst(2,TimeUnit.SECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
startActivity(new Intent(MainActivity.this,TestRxLifeActivity.class));
}
});
throttleFirst(),方法则表示在2秒钟之内,只相应第一次点击事件
如果将水龙头拧紧直到水是以水滴的形式流出,那你会发现每隔一段时间,就会有一滴水流出。也就是会说预先设定一个执行周期,当调用动作的时刻大于等于执行周期则执行该动作,然后进入下一个新周期。
监听CheckBox的状态的改变
CheckBox checkBox = findViewById(R.id.cb_am);
RxCompoundButton.checkedChanges(checkBox).subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
btnAmRxbindIng.setEnabled(aBoolean);
btnAmRxbindIng.setBackgroundColor(aBoolean? getResources().getColor(R.color.colorAccent) : getResources().getColor(R.color.colorPrimary));
}
});
监听EditText的输入
RxTextView.textChanges(etAmTest).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<CharSequence>() {
@Override
public void accept(CharSequence charSequence) throws Exception {
btnAmRxbindIng.setText(charSequence.toString());
}
});
debounce的操作(去抖的操作)
如果特定的时间跨度已经过去而没有发射另一个物品,则只从Observable发射物品
定义:
如果用手指一直按住一个弹簧,它将不会弹起直到你松手为止。也就是说当调用动作n毫秒后,才会执行该动作,若在这n毫秒内又调用此动作则将重新计算执行时间。
RxTextView.textChanges(etAmTest)
.debounce(5000, TimeUnit.MILLISECONDS)
.map(new Function<CharSequence, String>() {
@Override
public String apply(CharSequence charSequence) throws Exception {
return charSequence.toString();
}
}).observeOn(Schedulers.io())
.map(new Function<String, List<String>>() {
@Override
public List<String> apply(String s) throws Exception {
ArrayList<String> list=new ArrayList<>();
if (mDatas.contains(s)){
list.add(s);
return list;
}else {
return mDatas;
}
}
}).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<List<String>>() {
@Override
public void accept(List<String> strings) throws Exception {
if (strings.size()==1) {
mDatas.add(0,strings.get(0));
if (adapter != null) {
adapter.notifyDataSetChanged();
}
Toast.makeText(MainActivity.this, "检索到:" + strings.get(0) +"已添加到第一项 " , Toast.LENGTH_SHORT).show();
}else{
Toast.makeText(MainActivity.this, "not find anything!", Toast.LENGTH_SHORT).show();
}
}
});
网友评论