文档网址:https://mcxiaoke.gitbooks.io/rxdocs/content/
mData.add("1");
mData.add("2");
mData.add("3");
mData.add("4");
mData.add("5");
mData.add("6");
mData.add("7");
private void window(){
/*
------------------
1
2
3
------------------
4
5
6
------------------
7
*/
Observable.fromIterable(mData).window(3).subscribe(new Consumer<Observable<String>>() {
@Override
public void accept(Observable<String> stringObservable) throws Exception {
Log.e("qwer","------------------");
stringObservable.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e("qwer",s);
}
});
}
});
}
private void scan(){
/*
1
12
123
1234
12345
123456
1234567
*/
Observable.fromIterable(mData).scan(new BiFunction<String, String, String>() {
@Override
public String apply(String s, String s2) throws Exception {
return s + s2;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e("qwer", s);
}
});
}
private void map(){
Observable.fromIterable(mData).map(new Function<String, Integer[]>() {
@Override
public Integer[] apply(String s) throws Exception {
Integer d = Integer.valueOf(s);
return new Integer[]{d, d + 100};
}
}).subscribe(new Observer<Integer[]>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer[] integers) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
private void groupBy(){
Observable.fromIterable(mData).groupBy(new Function<String, Integer>() {
@Override
public Integer apply(String s) throws Exception {
if(Integer.valueOf(s) < 4){
return 10;
}else{
return 11;
}
}
}).subscribe(new Observer<GroupedObservable<Integer, String>>() {
@Override
public void onSubscribe(Disposable d) {
}
/*
10 1
10 2
10 3
11 4
11 5
11 6
11 7
*/
@SuppressLint("CheckResult")
@Override
public void onNext(GroupedObservable<Integer, String> integerStringGroupedObservable) {
if(integerStringGroupedObservable.getKey() == 10){
integerStringGroupedObservable.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e("qwer", 10 + " " + s);
}
});
}else{
integerStringGroupedObservable.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e("qwer", 11 + " " + s);
}
});
}
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
private void flatMap(){
Observable.fromIterable(mData).flatMap(
new Function<String, ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> apply(String s) {
Integer d = Integer.valueOf(s);
Integer[] ds = new Integer[]{d, d + 100};
return Observable.fromArray(ds);
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.e("qwer", integer + "");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
private void buffer() {
Observable.fromIterable(mData).buffer(3).subscribe(new Observer<List<String>>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(List<String> strings) {
/**
3
3
1
*/
Log.e("qwer", strings.size() + "");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
网友评论