android第三方框架,基于事件流的链式调用,逻辑简洁,使用简单
作用:创建、变换、线程调度等。通过使用,组合操作符,能完成大部分异步场景下的功能需求。
依赖:
implementaion 'io.reactivex.rxjava2:rxandroid:2.0.1'
implementaion 'io.reactivex.rxjava2:rxjava:2.0.7'
案例:
{
//创建一个上游 Observable:
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
});
//创建一个下游 Observer
Observer<Integer> observer = new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "subscribe");
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "" + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "error");
}
@Override
public void onComplete() {
Log.d(TAG, "complete");
}
};
//建立连接
observable.subscribe(observer);
}
操作符:
-
ConcatMap 执行多个事件是按照数据源的顺序执行
-
Map 从一种形式 转换为另外一种形式,eg: string --->int
-
FlatMap 同ConcatMap类同,不按照数据源的顺序执行
{
// 被观察者
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
String result = getResponseByOk();
Log.d(TAG, "subscribe: result=" + result);
emitter.onNext(result);
emitter.onComplete();
}
}).subscribeOn(Schedulers.io()) //被观察者 在线程池中调用了
.observeOn(AndroidSchedulers.mainThread()) //观察者在主线程中实现
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "subscribe");
}
@Override
public void onNext(String value) {
Log.d(TAG, "" + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "error");
}
@Override
public void onComplete() {
Log.d(TAG, "complete");
}
});
}
flatmap的运用 concat 类同
private Observable<String> processUrlIpByOneFlatMap() {
return Observable.just(
"http://www.baidu.com/",
"http://www.google.com/",
"https://www.bing.com/")
.flatMap(new Func1<String, Observable<String>>() {
@Override
public Observable<String> call(String s) {
return createIpObservable(s);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
printLog(tvLogs, "Consume Data <- ", s);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
printErrorLog(tvLogs, "throwable call()", throwable.getMessage());
}
});
}
//根据主机获取ip
private Observable<String> createIpObservable(final String url) {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
try {
String ip = getIPByUrl(url);
subscriber.onNext(ip);
printLog(tvLogs, "Emit Data -> ",url+" : " +ip);
} catch (MalformedURLException e) {
e.printStackTrace();
//subscriber.onError(e);
subscriber.onNext(null);
} catch (UnknownHostException e) {
e.printStackTrace();
//subscriber.onError(e);
subscriber.onNext(null);
}
subscriber.onCompleted();
}
});
}
参考:https://blog.csdn.net/johnny901114/article/details/51532776
网友评论