RxJava
观看菜鸟窝视频RxJava2整理
我的github整理学习rxjava
RxJava是一种响应式编程
采用观察者模式
好处:异步、简洁
RxJava 在 GitHub 主页上的自我介绍是 "a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库)。这就是 RxJava ,概括得非常精准。
观察者模式
观察者(Observer)模式:是对象的行为模式,又叫发布-订阅(Publish/Subscribe)模式、模型-视图(Model/View)模式、源-监听(Source/Listener)模式或者从属(Dependents)模式
观察者模式定义了一种一对多的依赖关系,让多个观察者对象同时监听某一个主题对象,这个主题对象再状态上发生变化时,会通知所有观察者对象,使它们能够自动更新自己。
- 抽象主题(Subject)角色:
抽象主题角色把所有对观察者对象的引用保存在一个聚集(比如ArrayList对象)里,每个主题都可以有任何数量的观察者。抽象主题提供一个接口,可以增加和删除观察者对象,抽象主题角色又叫抽象被观察者角色。 - 具体主题(ConcreteSubject)角色:
将有关状态存入具体观察者对象;在具体主题的内部状态改变时,给所有登记过的观察者发出通知。具体主题角色又叫做具体被观察者角色。 - 抽象观察者(Observer)角色:
为所有的具体观察者定义一个接口,在得到主题通知时更新自己,这个接口叫做更新接口。 - 具体观察者(ConcreteObserver)角色:
存储与主题的状态自怡的状态。具体观察者角色实现抽象观察者角色所有要求的更新接口、以便使本身的状态与主题的状态协调。如果需要,具体观察者角色可以保持一个指向具体主题对象的引用。
基本使用
public class MainActivity extends AppCompatActivity {
private static final String TAG = "123===";
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
}
public void click(View view) {
//点击按钮
//第一种创建方法,最基本的使用
// firstMethod();
//第二种创建方法,简单的方法
secondMethod();
}
private void secondMethod() {
// Observable<String> observable = Observable.just("1", "2", "3");
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("1");
e.onNext("2");
e.onNext("3");
e.onComplete();
// e.onError(new Throwable("----onError"));//如果是错误结束,在subscribe订阅的时候必须加上第二个参数
}
});
observable.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "accept: "+s);
}
}/*, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.d(TAG, "accept: "+throwable.getLocalizedMessage());
}
}, new Action() {
@Override
public void run() throws Exception {
Log.d(TAG, "run: ");
}
}*/);
}
private void firstMethod() {
//第一步创建Observable, 被观察这
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
Log.d(TAG, "subscribe: 1--");
e.onNext("1");//发送数据
Log.d(TAG, "subscribe: 2--");
e.onNext("2");
Log.d(TAG, "subscribe: 3--");
e.onNext("3");
Log.d(TAG, "subscribe: 完成--");
e.onComplete();//完成是调用
// e.onError(new Throwable("我是错误信息"));//在错误的时候调用(完成或错误只能调用一个,不可以两个同时调用)
}
});
//第二部创建observer,观察者
Observer<String> observer = new Observer<String>() {
private Disposable dd;
@Override
public void onSubscribe(@NonNull Disposable d) {
//
dd = d;
// d.dispose();//移除订阅关系
// d.isDisposed();//是否移除订阅关系
Log.d(TAG, "onSubscribe: " + d.isDisposed());
}
@Override
public void onNext(@NonNull String s) {
//事件
Log.d(TAG, "onNext: " + s);
if ("2".equals(s)) {
dd.dispose();
}
}
@Override
public void onError(@NonNull Throwable e) {
//错误
Log.d(TAG, "onError: " + e.getLocalizedMessage());
}
@Override
public void onComplete() {
//完成
Log.d(TAG, "onComplete: ");
}
};
//实现订阅关系
observable.subscribe(observer);
}
}
Scheduler线程控制
- Schedulers.immediate():直接在当前线程运行,相当于不指定线程。这是默认的Scheduler。
- Schedulers.newThread():总是启用新线程,并在新线程执行操作。
- Schedulers.io():i/o操作(读写文件、读写数据库、网络信息交互等)所使用的Scheduler。行为模式和newThread()差不多,区别在于io()比newThread()更有效率。不要把计算工作放入io()中,可以避免创建不必要的线程。
- Schedulers.computatuion():计算所使用的Scheduler。这个计算指的是CPU密集型计算,即不会被i/o等操作限制性能的操作,例如图形的计算。这个Scheduler使用的时固定的线程池,大小为CPU核数。不要把I/O操作放在computation()中,否则I/O操作的等待时间会浪费CPU。
- AndroidSchedulers.mainThread():它指定的操作将在Android主线程运行。
Observable.create(new ObservableOnSubscribe<User>() {
@Override
public void subscribe(@NonNull ObservableEmitter<User> e) throws Exception {
//请求网络
User user = new User("com.thor.wdd", "1", "wdd");
e.onNext(user);
}
}).subscribeOn(Schedulers.io())//Observable切换到子线程
.observeOn(AndroidSchedulers.mainThread())//Observer切换到主线程
.subscribe(new Observer<User>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull User user) {
Log.d(TAG, "onNext: " + user.toString());
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
subscribeOn(Schedulers.io())//Observable、Observer切换到子线程
observeOn(AndroidSchedulers.mainThread())//Observer切换到主线程
操作符介绍
操作符分类跳转到官翻
创建操作符
creat;
just;//快速创建对象,最多发送十个对象
fromArray;//数组遍历
fromIterable;//遍历
empty;//仅发送Complete事件,直接通知完成
error;//仅发送Error事件,直接通知异常
never;//不发送任何事件
defer;//直到有观察者observer订阅时,才动态创建被观察者对象 并 发送事件
timer;//延迟指定时间后,调用一次onNext(0)
interval;//每隔指定事件发送事件,无限叠加发送,从0开始
intervalRange;//每隔指定时间 就发送事件,并指定发送事件数量
range;//连续发送一个事件序列,可指定次数
变换操作符
map;//数据转换
flatMap;//将被观察者发送的事件序列进行拆分,并且单独转换,再合并成一个新的事件序列,最后进行发送
concatMap;//类似FlatMap()操作符;区别:拆分并且重新合并生成的事件序列的顺序 等于 被观察者旧序列生产的顺序
buffer;//缓存区大小 = 每次从被观察者中获取的事件数量, 步长 = 从当前位置向后移动几位
组合/合并操作符
concat;//concat组合被观察者数量<=4,顺序执行
concatArray;//concatArray组合观察者数量>4,顺序执行
merge;//组合被观察者数量<=4,非顺序执行
mergeArray;//组合被观察者数量>4,非顺序执行
concatArrayDelayError;//第1个被观察者的Error事件将在第2个被观察者发送完事件后再继续发送,mergeDelayError()操作符同理
mergeDelayError;
zip;//严格按照原先事件序列 进行对位合并,最终合并的事件数量 = 多个被观察者(Observable)中数量最少的数量
combineLatest;//当两个Observables中的任何一个发送了数据后,将先发送了数据的Observables 的最新(最后)一个数据与 另外一个Observable发送的每个数据结合,最终基于该函数的结果发送数据,与reduce区别就是接收的事件是每次都有而reduce只有最后一次输出信息
reduce;//把前2个数据聚合,然后与后1个数据继续进行聚合,依次类推
collect;//将被观察者Observable发送的数据事件收集到一个数据结构里
startWith;//在一个被观察者发送事件前,追加发送一些数据/ 一个新的被观察者, 后调用的startWith先追加
startWithArray;
count;//统计被观察者发送事件的数量
功能性操作符
delay;//指定延迟时间
do;//在某个事件的生命周期中调用
retry;//重试,即出现错误,让被观察者重新发送数据
retryWhen;//出现错误后,判断是否需要重新发送数据
repeat;//重复不断地发送被观察者事件
repeatWhen;//有条件地、重复发送 被观察者事件
过滤操作符
filter;过滤 特定条件的事件
ofType;过滤 特定类型的数据
skip;跳过指定事件,也可跳过指定时间时间
skipLast;跳过指定最后事件,也可跳过最后时间时间
distinct;去重
distinctUntilChanged;连续重复才去重
take;接收事件数量
takeLast;只接收最后几个事件数量
throttleFirst;指定时间内只接收第一次事件
throttleLast/sample;指定时间内只接收最后一次次事件
throttleWithTimeout/debounce;采样频率,指定时间只接收最新事件
firstElement;//获取第一个事件
lastElement;//获取最后一个事件
elementAt;//获取指定位置事件,如果超出位置没有任何提示,如果需要提示则调用elementAtOrError(),如超出位置需要指定默认值则调用elementAt 2个参数的方法
布尔操作符
takeWhile;判断发送的每项数据是否满足设置的函数条件
skipWhile;跳过满足条件的那些数据,发送不满足那些条件的数据
takeUntil;Predicate参数时:执行到条件成立时,停止发送事件,但本次事件会发送出去;observable参数时:第二个observable开始发送数据时,原始的observable停止发送事件
sequenceEqual;判定两个Observables需要发送的数据是否相同
contains;判断发送的数据中是否包含指定数据
isEmpty;判断发送的数据是否为空
defaultIfEmpty;在不发送任何有效事件( Next事件)、仅发送了 Complete 事件的前提下,发送一个默认值
amb;当需要发送多个 Observable时,只发送 先发送数据的Observable的数据,而其余 Observable则被丢弃
网友评论