概述
RxJava 操作符的类型有多种,如:创建、变换、过滤、组合、错误处理、辅助、条件和布尔操作符等,还有许多延伸操作符,这里接单记录常用操作符。
操作符官方文档
创建操作符
- create (unfaseCreate)
create 操作符创建一个Observable(被观察者)
Observable observable = Observable.unsafeCreate(new Observable.OnSubscribe() {
@Override
public void call(Object o) {
subscriber.onNext("令狐冲");
subscriber.onNext("练会了独孤九剑");
subscriber.onCompleted();
}
});
subscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
Log.e("zpan", "onCompleted =====");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
Log.e("zpan", "onNext =" + s);
}
};
observable.subscribe(subscriber);
E/zpan: onNext =令狐冲
onNext =练会了独孤九剑
E/zpan: onCompleted =====
- just
just 操作符创建一个依次将数据发射出去的Observable。
注意:最多只能是10条数据。
Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.e("zpan", " just=" + integer);
}
});
- from
from 操作符将集合/数组依次发射出去,没有数据数的限制。
ArrayList<Integer> ints = new ArrayList<>();
ints.add(1);
ints.add(2);
ints.add(3);
Observable.from(ints)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.e("zpan", " from=" + integer);
}
});
- interval
interval 创建一个按固定时间间隔发射整数序列的Observable,相当于定时器。
interval()函数有两个参数:第一个是两次发射的时间间隔,第二个是用到的时间单位。
public void setInterval(View view) {
Observable.interval(2, TimeUnit.SECONDS)
.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
Log.e("zpan", " interval = " + aLong);
}
});
}
- rang
range 发射指定范围的有序的整数序列的Observable,可以替代for循环。
range()函数有两个参数:第一个是起始值(不小于0),第二个是结束值
public void setRange(View view) {
Observable.range(1, 5)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.e("zpan", " range = " + integer);
}
});
}
- repeat
repeat 创建一个N次重复发射特定数据的Observable
repea()函数有一个参数:重复次数
public void setRepeat(View view) {
Observable.range(0, 3)
.repeat(2) // 重复 2 次
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.e("zpan", " repeat = " + integer);
}
});
}
- defer
只有当订阅者订阅才创建Observable,为每个订阅创建一个新的Observable。
String[] strings1 = {"Hello", "World"};
String[] strings2 = {"Hello", "RxJava"};
Observable<String> observable = Observable.defer(new Func0<Observable<String>>() {
@Override
public Observable<String> call() {
return Observable.from(strings1);
}
});
strings1 = strings2; //订阅之前把 strings1 修改了
observable.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.e("zpan", "defer = " + s);
}
});
E/zpan: defer = Hello
defer = RxJava
- empty
创建一个发射空数据的Observable
Observable.empty()
.subscribe(new Action1<Object>() {
@Override
public void call(Object o) {
Log.e("zpan", "有参数 - " + o);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
Log.e("zpan", "报错 - " + throwable.getMessage());
}
}, new Action0() {
@Override
public void call() {
Log.e("zpan", "无参数");
}
});
E/zpan: 无参数
- error
创建一个发射error事件的Observable
Observable.error(new Exception("错误信息"))
.subscribe(new Action1<Object>() {
@Override
public void call(Object o) {
Log.e("zpan", "有参数 - " + o);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
Log.e("zpan", "报错 - " + throwable.getMessage());
}
}, new Action0() {
@Override
public void call() {
Log.e("zpan", "无参数");
}
});
E/zpan: 报错 - 错误信息
- never
创建一个不发射任何事件也不会结束的Observable - timer
创建一个在给定的延时之后发射数据为0的Observable
Observable.timer(1000, TimeUnit.MILLISECONDS)
.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
Log.e("zpan", " timer = " + aLong);
}
});
变换操作符
变换操作符是对Observable发射的数据按照一定规则做变换,然后将变换后的数据发射出去。
- Map
map 操作符通过指定一个Func对象,将Observable转换成一个新的Observable对象并发射
Observable.just("123456")
.map(new Func1<String, String>() {
@Override
public String call(String s) {
return s + "456789";
}
})
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.e("zpan", " map = " + s);
}
});
E/zpan: map = 123456456789
- flatMap
flatMap 操作符将Observable发射的数据集合变换为Observable集合,然后将这些Observable发射的数据平坦化的放进一个单独的Observable。作用比Map强大。
注意:flatmap 的合并允许交叉,也就是说可能会交错发送事件,顺序可能会错乱。
concatMap 操作符功能与flatmap操作符一致;不过,它解决了flatmap交叉问题。用法同flatmap - cast
cast 操作符的作用是强制将Observable发射的所有数据转换为指定类型。
List<String> list = new ArrayList<>();
list.add("11111111");
list.add("22222222");
list.add("33333333");
list.add("44444444");
Observable.from(list)
.flatMap(new Func1<String, Observable<?>>() {
@Override
public Observable<?> call(String s) {
return Observable.just("新增" + s);
}
})
.cast(String.class)
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.e("zpan", " flatmap = " + s);
}
});
- flatMapIterable
flatMapIterable 操作符可以将数据包装成Iterable,在Iterable中我们就可以对数据进行处理了
Observable.just(1, 2, 3)
.flatMapIterable(new Func1<Integer, Iterable<?>>() {
@Override
public Iterable<?> call(Integer integer) {
List<Integer> list = new ArrayList<>();
list.add(integer + 2);
return list;
}
})
.cast(Integer.class)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.e("zpan", " flatMapIterable =" + integer);
}
});
- buffer
buffer 操作符将原Observable变换为一个新的Observable,这个新的Observable每次发射一组列表值而不是一个一个发射。
Observable.just(1, 2, 3, 4, 5, 6)
.buffer(3) // 缓存容量是 3
.subscribe(new Action1<List<Integer>>() {
@Override
public void call(List<Integer> integers) {
for (Integer integer : integers) {
Log.e("zpan", " buffer =" + integer);
}
Log.e("zpan", "=====华丽的分界线=====");
}
});
E/zpan: buffer =1
buffer =2
buffer =3
=====华丽的分界线=====
buffer =4
buffer =5
buffer =6
=====华丽的分界线=====
- window
window操作符和buffer操作符类似,只不过window操作符发射的是Observable而不是数据列表。
Observable.just(1, 2, 3, 4, 5, 6)
.window(3)
.subscribe(new Action1<Observable<Integer>>() {
@Override
public void call(Observable<Integer> integerObservable) {
}
});
- groupBy
groupBy 操作符用于分组元素,将原Observable变换成一个发射Observable的新Observable(分组后的)
BookInfo b1 = new BookInfo("九阳神功", "A");
BookInfo b2 = new BookInfo("西游记", "H");
BookInfo b3 = new BookInfo("语文", "G");
BookInfo b4 = new BookInfo("葵花宝典", "A");
BookInfo b5 = new BookInfo("三国演义", "H");
BookInfo b6 = new BookInfo("数学", "G");
BookInfo b7 = new BookInfo("九阴真经", "A");
Observable<GroupedObservable<String, BookInfo>> groupedObservable
= Observable.just(b1, b2, b3, b4, b5, b6, b7)
.groupBy(new Func1<BookInfo, String>() {
@Override
public String call(BookInfo bookInfo) {
return bookInfo.bookId;
}
});
// concat 组合操作符
Observable.concat(groupedObservable)
.subscribe(new Action1<BookInfo>() {
@Override
public void call(BookInfo bookInfo) {
Log.e("zpan", "groupBy = " + bookInfo.bookName + " - " + bookInfo.bookId);
Log.e("zpan", "========");
}
});
E/zpan: groupBy = 九阳神功 - A
========
groupBy = 葵花宝典 - A
========
groupBy = 九阴真经 - A
========
groupBy = 西游记 - H
========
groupBy = 三国演义 - H
========
groupBy = 语文 - G
========
groupBy = 数学 - G
========
网友评论