RxJava和java8的Stream类似 都是对数据流进行操作
基于rxJava版本rxjava2-2.0.1
Rxjava常用操作符
创建
create
通过创建回调函数自定义subscribe发送
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
subscriber.onNext(2);
subscriber.onNext(3);
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.print(integer + ",");
}
});
//结果
1,2,3,
just
将单个item作为输入,然后单项发送给subscribe
Observable.just(1, 2, 3).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.print(integer + ",");
}
});
//结果
1,2,3,
from
将数组作为输入,然后依次单个发送给subscribe
Observable.from(Arrays.asList(1, 2, 3)).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.print(integer + ",");
}
});
//结果
1,2,3,
Empty/Never/Throw
Empty:创建一个不发射任何数据但是正常终止的Observable(调用complete)
Never:创建一个不发射数据也不终止的Observable(不调用complete)
Throw:创建一个不发射数据以一个错误终止的Observable(调用error)
System.out.println("\nEmpty:");
Observable.empty().subscribe(new Subscriber<Object>() {
@Override
public void onCompleted() {
System.out.print("onCompleted");
}
@Override
public void onError(Throwable e) {
System.out.print("onError");
}
@Override
public void onNext(Object o) {
System.out.print("onNext");
}
});
System.out.println("\nNever:");
Observable.never().subscribe(new Subscriber<Object>() {
@Override
public void onCompleted() {
System.out.print("onCompleted");
}
@Override
public void onError(Throwable e) {
System.out.print("onError");
}
@Override
public void onNext(Object o) {
System.out.print("onNext");
}
});
//结果
Empty:
onCompleted
Never:
Interval
创建一个按固定时间间隔发射整数序列的Observable(异步的)
Timer效果与其类似,不建议使用
Observable.interval(1, TimeUnit.MILLISECONDS).subscribe(new Subscriber<Object>() {
@Override
public void onCompleted() {
System.out.print("onCompleted");
}
@Override
public void onError(Throwable e) {
System.out.print("onError");
}
@Override
public void onNext(Object o) {
System.out.print("onNext");
}
});
//结果 每隔一秒生成一个next
Range
Range操作符发射一个范围内的有序整数序列,你可以指定范围的起始和长度。
Observable.range(7, 3).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.print(integer + ",");
}
});
//结果
7,8,9,
repeat
重复数据发送n遍
repeatWhen指定条件才重复发送
Observable.just(1, 2)
.repeat(2)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.print(integer + " ");
}
});
//结果
1 2 1 2
转换
Buffer
定期收集Observable的数据放进一个数据包裹,然后发射这些数据集合。(感觉应用场景:将数组按照拆分成大小相同的多个子数组)
Observable.range(2, 6).buffer(3)
.subscribe(new Subscriber<List<Integer>>() {
@Override
public void onCompleted() {
System.out.printf("onCompleted\n");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(List<Integer> integers) {
System.out.printf(integers.toString());
}
});
//结果
[2, 3, 4][5, 6, 7]onCompleted
window
定期将来自原始Observable的数据分解为一个Observable窗口,发射这些窗口,而不是每次发射一项数据
Observable.range(2, 6).window(3)
.subscribe(new Subscriber<Observable<Integer>>() {
@Override
public void onCompleted() {
System.out.printf("w-Completed\n");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Observable<Integer> integerObservable) {
integerObservable.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.printf("inner-Completed\n");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
System.out.printf(integer + ",");
}
});
}
});
//结果
2,3,4,inner-Completed
5,6,7,inner-Completed
w-Completed
cast
在Observable发射数据前将所有数据类型进行强转为指定类型
Observable.just(2.5F, 3, 5F, 0.6F).cast(Object.class).subscribe(new Action1<Object>() {
@Override
public void call(Object s) {
System.out.print(s + ",");
}
});
//结果
2.5,3,5.0,0.6,
Map
仅仅是将数据进行转换
Observable.range(2, 5).map(new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
return integer + "call";
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.printf(s + ",");
}
});
//结果
2call,3call,4call,5call,6call,
FlatMap
将Observable发射的数据变换为Observables集合,然后将这些Observable发射的数据平坦化的放进一个单独的Observable,内部采用merge合并。(就是将数据转化为Observable,实际应用场景:将数组数据项通过from转成Observable,然后通过Merge将Observables中的数据项都平展开到新的Observable中,成为了将多维数组降维的目的)
Observable.create(new Observable.OnSubscribe<List<Integer>>() {
@Override
public void call(Subscriber<? super List<Integer>> subscriber) {
subscriber.onNext(Arrays.asList(1, 2, 3, 4, 5));
}
}).flatMap(new Func1<List<Integer>, Observable<Integer>>() {
@Override
public Observable<Integer> call(List<Integer> integers) {
return Observable.from(integers);
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.printf(integer + ",");
}
});
//结果
1,2,3,4,5,
GroupBy
将Observable分拆为Observable集合,将原始Observable发射的数据按Key分组,每一个Observable发射一组不同的数据。(其实就是将数据项对应生成Key处理后转为一个Observable)
Observable.range(0, 10).groupBy(new Func1<Integer, Integer>() {
@Override
public Integer call(Integer integer) {
return integer % 3;
}
}).subscribe(new Action1<GroupedObservable<Integer, Integer>>() {
@Override
public void call(final GroupedObservable<Integer, Integer> item) {
item.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.print(item.getKey() + ":" + "onCompleted");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
System.out.printf(item.getKey() + ":" + integer + ",");
}
});
}
});
//结果
0:0,1:1,2:2,0:3,1:4,2:5,0:6,1:7,2:8,0:9,0:onCompleted1:onCompleted2:onCompleted
Scan
对Observable发射的每一项数据应用一个函数,然后按顺序依次发射每一个值。
Observable.range(0, 5).scan(new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer sum, Integer item) {
return sum + item;
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.printf(integer + ",");
}
});
//结果
0,1,3,6,10,
过滤
Debounce
过滤特定时间内最近的数据发送,其他数据不发送。
Observable.just(1, 2, 3)
.debounce(400, TimeUnit.MILLISECONDS)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.print("integer=" + integer);
}
});
//结果:过滤400毫秒后最近的一个
integer=3
Sample
周期性发送最近的数据
Observable.interval(300, TimeUnit.MILLISECONDS)
.sample(500, TimeUnit.MILLISECONDS)
.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
System.out.print("s=" + aLong + ",");
}
});
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//结果
s=0,s=2,s=4,
Distinct
过滤排除重复的数据
Observable.just(1, 2, 4, 1, 2, 1)
.distinct()
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.print("integer=" + integer + ",");
}
});
//结果
integer=1,integer=2,integer=4,
ElementAt
只过滤发送Observable中指定位置的数据
Observable.just(1, 2, 3, 4, 5)
.elementAt(3)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.print("integer=" + integer + ",");
}
});
//结果
ElementAt
integer=4,
Filter
定义过滤函数来过滤发送的数据
Observable.just(1, 2, 3, 4)
.filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer == 3||integer==4;
}
})
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.print("integer=" + integer + ",");
}
});
//结果
integer=3,integer=4,
First
过滤第一个数据或者符合过滤条件的第一个数据
Observable.just(1, 2, 3, 4)
.first(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer % 2 == 0;
}
})
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.print("integer=" + integer + ",");
}
});
//结果
integer=2,
Last
过滤最后一个数据或者符合过滤条件的最后一个数据
Observable.just(1, 2, 3, 4)
.last(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer % 2 == 0;
}
})
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.print("integer=" + integer + ",");
}
});
//结果
integer=4,
IgnoreElements
忽略不让原Observable所有数据发送,但允许发送终止通知。如onError和onComplete
Observable
.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
subscriber.onCompleted();
subscriber.onNext(2);
subscriber.onNext(3);
}
})
.ignoreElements()
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.print("onCompleted");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
System.out.print("integer=" + integer + ",");
}
});
//结果
onCompleted
Skip
跳过头到第n个数据发送
Observable.range(0, 10)
.skip(5)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.print("integer=" + integer + ",");
}
});
//结果
integer=5,integer=6,integer=7,integer=8,integer=9,
SkipLast
跳过从尾数到第n个数据发送
Observable.range(0, 10)
.skipLast(5)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.print("integer=" + integer + ",");
}
});
//结果
integer=0,integer=1,integer=2,integer=3,integer=4,
Take
从头开始过滤出n个数据发送
Observable.range(0, 10)
.take(5)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.print("integer=" + integer + ",");
}
});
//结果
integer=0,integer=1,integer=2,integer=3,integer=4,
TakeLast
从尾开始过滤出n个数据发送
Observable.range(0, 10)
.takeLast(5)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.print("integer=" + integer + ",");
}
});
//结果
integer=5,integer=6,integer=7,integer=8,integer=9,
组合
merge
合并两个Observable的数据结果,顺序不定(猜测是异步发送)
Observable<Long> interval1 = Observable.interval(100, TimeUnit.MILLISECONDS)
.map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
return aLong - 5L;
}
}).take(5);//抑制定时整数序列数量5个
Observable<Long> interval2 = Observable.interval(100, TimeUnit.MILLISECONDS).take(5);//抑制定时整数序列数量5个
Observable.merge(interval1, interval2).subscribe(new Action1<Number>() {
@Override
public void call(Number number) {
System.out.print(number + " ");
}
});
//结果
0 -5 -4 1 -3 2 -2 3 -1 4
Concat
合并两个Observable的数据结果,顺序前后确定(同步发送,第一个发送完毕才会发送第二个)
Observable<Long> interval1 = Observable.interval(100, TimeUnit.MILLISECONDS)
.map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
return aLong - 5L;
}
}).take(5);//抑制定时整数序列数量5个
Observable<Long> interval2 = Observable.interval(100, TimeUnit.MILLISECONDS).take(5);//抑制定时整数序列数量5个
Observable.concat(interval1, interval2).subscribe(new Action1<Number>() {
@Override
public void call(Number number) {
System.out.print(number + " ");
}
});
//结果
-5 -4 -3 -2 -1 0 1 2 3 4
startWith
在observable的数据源前面插入数据
Observable<Integer> range = Observable.range(0, 10);
range.startWith(-2, -1).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.print(integer + " ");
}
});
//结果
-2 -1 0 1 2 3 4 5 6 7 8 9
zip
合并Observables的数据,使用发射的顺序作为合并的标记
Javadoc: zip(Iterable,FuncN)
Javadoc: zip(Observable,FuncN)
Javadoc: zip(Observable,Observable,Func2) (最多可以有九个Observables参数)
Observable<Long> interval5 = Observable.interval(100, TimeUnit.MILLISECONDS)
.map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
return aLong - 5L;
}
}).take(6);//抑制定时整数序列数量5个
Observable<Long> interval6 = Observable.interval(100, TimeUnit.MILLISECONDS).take(5);//抑制定时整数序列数量5个
Observable.zip(interval5, interval6, new Func2<Long, Long, String>() {
@Override
public String call(Long aLong, Long aLong2) {
return aLong + "~" + aLong2;
}
}).subscribe(new Action1<String>() {
@Override
public void call(String item) {
System.out.print(item + " ");
}
});
//结果
-5~0 -4~1 -3~2 -2~3 -1~4
CombineLatest
根据每个Observable的发送时间作为合并标记,任何一个Observable数据发送之后就和其他的Observable最近发送的数据进行合并。
Observable<Long> interval7 = Observable.interval(100, TimeUnit.MILLISECONDS)
.map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
return aLong - 5L;
}
}).take(6);//抑制定时整数序列数量5个
Observable<Long> interval8 = Observable.interval(200, TimeUnit.MILLISECONDS).take(5);//抑制定时整数序列数量5个
Observable.combineLatest(interval7, interval8, new Func2<Long, Long, String>() {
@Override
public String call(Long aLong, Long aLong2) {
return aLong + "~" + aLong2;
}
}).subscribe(new Action1<String>() {
@Override
public void call(String item) {
System.out.print(item + " ");
}
});
//结果
-4~0 -3~0 -3~1 -2~1 -1~1 -1~2 0~2 0~3 0~4
join
合并的两个Observable的数据,合并标记基于对Observable数据定义的生命周期,如果数据过了生命周期则就不会和其他Observable的数据进行合并,否则就排列组合形式合并。(如果不懂建议点击链接直接看图)
Observable<Long> observable1 = Observable.interval(300, TimeUnit.MILLISECONDS).delay(400, TimeUnit.MILLISECONDS).take(5);//延迟1个多身位
Observable<Long> observable2 = Observable.interval(300, TimeUnit.MILLISECONDS).take(5);
observable1.join(observable2, new Func1<Long, Observable<Long>>() {
@Override
public Observable<Long> call(Long aLong) {
return Observable.just(aLong).delay(100, TimeUnit.MILLISECONDS);//每个数据100毫秒生命
}
}, new Func1<Long, Observable<Long>>() {
@Override
public Observable<Long> call(Long aLong) {
return Observable.just(aLong).delay(150, TimeUnit.MILLISECONDS);//每个数据150毫秒生命
}
}, new Func2<Long, Long, String>() {
@Override
public String call(Long aLong, Long aLong2) {
return "A-" + aLong + ":B-" + aLong2;
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.print(s+" ");
}
});
//结果
A-0:B-1 A-1:B-2 A-2:B-3 A-3:B-4
错误处理
catch
拦截原始Observable的onError通知,将它替换为其它的数据项或数据序列,让产生的Observable能够正常终止或者根本不终止。
-
onErrorReturn
让Observable遇到错误时发射一个特殊的项并且正常终止。
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onError(new IllegalAccessException("no access"));
}
}).onErrorReturn(new Func1<Throwable, Integer>() {
@Override
public Integer call(Throwable throwable) {
return 10001;
}
}).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
System.out.print("onError");
}
@Override
public void onNext(Integer integer) {
System.out.print("error_code:" + integer);
}
});
//结果
error_code:10001
-
onErrorResumeNext
让Observable在遇到错误时开始发射指定的第二个Observable的数据序列。
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onError(new IllegalAccessException("no access"));
}
}).onErrorResumeNext(new Func1<Throwable, Observable<? extends Integer>>() {
@Override
public Observable<? extends Integer> call(Throwable throwable) {
return Observable.just(1, 2);
}
}).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
System.out.print("onError");
}
@Override
public void onNext(Integer integer) {
System.out.print(",value:" + integer);
}
});
//结果
,value:1,value:2
-
onExceptionResumeNext
和onErrorResumeNext类似,onExceptionResumeNext方法返回一个备用Observable,不同的是,如果onError收到的Throwable不是一个Exception,它会将错误传递给观察者的onError方法,不会使用备用的Observable。
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onError(null);
}
}).onExceptionResumeNext(Observable.just(1, 2, 3, 4))
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
System.out.print("onError:");
}
@Override
public void onNext(Integer integer) {
System.out.print(",value:" + integer);
}
});
//结果
onError:
重试
如果原始Observable遇到错误,重新订阅它期望它能正常终止,数据会有部分重复。
- retry
Javadoc: retry()
Javadoc: retry(long)错误次数
Javadoc: retry(Func2) 异常判断
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onError(new IllegalAccessException("IllegalAccessException"));
}
}).retry(new Func2<Integer, Throwable, Boolean>() {
@Override
public Boolean call(Integer integer, Throwable throwable) {
System.out.println("num:" + integer + " throwable:" + throwable.getMessage());
return !(throwable instanceof IllegalAccessException);
}
}).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
System.out.println("onError:" + e.getMessage());
}
@Override
public void onNext(Integer integer) {
}
});
//结果
num:1 throwable:IllegalAccessException
onError:IllegalAccessException
-
retrywhen
不太理解...(主要作用可以和retorfit一起用)
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
System.out.println("subscribing");
subscriber.onError(new RuntimeException("always fails"));
}
}).retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Throwable> observable) {
return observable.zipWith(Observable.range(1, 3), new Func2<Throwable, Integer, Integer>() {
@Override
public Integer call(Throwable throwable, Integer integer) {
return integer;
}
}).flatMap(new Func1<Integer, Observable<?>>() {
@Override
public Observable<?> call(Integer integer) {
System.out.println("delay retry by " + integer + " second(s)");
return Observable.timer(integer, TimeUnit.SECONDS);
}
});
}
}).toBlocking().forEach(new Action1<Object>() {
@Override
public void call(Object o) {
System.out.println("foreach:"+o);
}
});
//结果
subscribing
delay retry by 1 second(s)
subscribing
delay retry by 2 second(s)
subscribing
delay retry by 3 second(s)
subscribing
除了这些还有辅助、条件和布尔、算术和聚合、连接、转换、操作符决策树操作符
其他操作符太零散了,直接参考下面
中文翻译
官网-操作符介绍
操作符doc文档
网友评论