一.发送与回调
//正常的被观察者处理事件,观察者不完整回调
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.d("rxfz", "发onNext:fz1 ");
emitter.onNext("fz1");
Log.d("rxfz", "发onNext:fz2 " );
emitter.onNext("fz2");
Log.d("rxfz", "发onNext:fz3 " );
emitter.onNext("fz3");
Log.d("rxfz", "发onComplete " );
emitter.onComplete();
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d("rxfz", "收onNext: " + s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.d("rxfz", "收onError: ");
}
}, new Action() {
@Override
public void run() throws Exception {
Log.d("rxfz", "收onComplete: " );
}
}
);
//观察者完整回调
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.d("rxfz", "发onNext:fz1 ");
emitter.onNext("fz1");
Log.d("rxfz", "发onNext:fz2 " );
emitter.onNext("fz2");
Log.d("rxfz", "发onNext:fz3 " );
emitter.onNext("fz3");
Log.d("rxfz", "发onComplete " );
emitter.onComplete();
}
}).subscribe(new Observer<String>() {
Disposable disposable;
int i;
@Override
public void onSubscribe(Disposable d) {
disposable = d;
}
@Override
public void onNext(String s) {
Log.d("rxfz: ", "onNext=="+ s);
i++;
if (i == 1) {
Log.d("rxfz", "dispose");
disposable.dispose();
Log.d( "isDisposed : ", "=="+ disposable.isDisposed());
}
}
@Override
public void onError(Throwable e) {
Log.d("rxfz: ", "error");
}
@Override
public void onComplete() {
Log.d("rxfz: ", "onComplete");
}
}
);
二.Subject
1.参考:https://www.jianshu.com/p/1257c8ba7c0c
![S.AsyncSubject.png](https://img.haomeiwen.com/i4093303/afbe795df8aa5eca.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![S.BehaviorSubject.png](https://img.haomeiwen.com/i4093303/c1f80f8e7e8e4c8a.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![S.PublishSubject.png](https://img.haomeiwen.com/i4093303/cc51cfc352ac801f.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
三.操作符
A.创建操作符(create,just,from,defer,range,interval,repeat,start,timer)
//1.interval定时发送
Observable<Long> observable=Observable.interval(3, TimeUnit.SECONDS);
observable.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.d("rxfz", "收2onNext: " + aLong);
}
});
//2.range发送指定范围整数序列,repeat重复次数
Observable<Integer> observable=Observable.range(0,5).repeat(2);
observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d("rxfz", "收2onNext: " + integer);
}
});
B.变换操作符(map,flatMap,concatMap,switchMap,flatMapIterable,buffer,groupBy,cast,window,scan)
//1.map
Observable.just("fz1","fz2").map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
return s+"==ok";
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String integer) throws Exception {
Log.d("rxfz", "收2onNext: " + integer);
}
});
//2.flatmap
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
}
}).flatMap(new Function<Integer, Observable<String>>() {
@Override
public Observable<String> apply(Integer integer) throws Exception {
ArrayList<String> arrayList = new ArrayList<>();
for (int i = 0; i < 5; i++) {
String iStr = "flatMap value" + integer;
arrayList.add(iStr);
}
return Observable.fromIterable(arrayList).delay(10, TimeUnit.MICROSECONDS);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d( "rxfz:收2onNext " , s);
}
});
//3.concatMap和flatMap的作用是一样的,它的结果是严格按照上游发送的顺序来发送的
//4.scan
Observable.just(1, 2, 3, 4, 5)
.scan(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "accept: " + integer);
}
});
//结果:1,3,6,10,15
//5.zip在Android中的使用,可以适用于如下场景,一个界面需要展示用户的一些信息,这些信息分别要从两个服务器接口中获取,只有当两个数据都获取后才能进行展示。这类同时的信息请求比较适用zip
Observable<UserBaseInfoResponse> observable1 =
api.getUserBaseInfo(new UserBaseInfoRequest()).subscribeOn(Schedulers.io());
Observable<UserExtraInfoResponse> observable2 =
api.getUserExtraInfo(new UserExtraInfoRequest()).subscribeOn(Schedulers.io());
Observable.zip(observable1, observable2,
new BiFunction<UserBaseInfoResponse, UserExtraInfoResponse, UserInfo>() {
@Override
public UserInfo apply(UserBaseInfoResponse baseInfo,
UserExtraInfoResponse extraInfo) throws Exception {
return new UserInfo(baseInfo, extraInfo);
}
}).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<UserInfo>() {
@Override
public void accept(UserInfo userInfo) throws Exception {
//do something;
}
});
在RxJava中, 已经内置了很多线程选项供我们选择, 例如有
Schedulers.io() 代表io操作的线程, 通常用于网络,读写文件等io密集型的操作
Schedulers.computation() 代表CPU计算密集型的操作, 例如需要大量计算的操作
Schedulers.newThread() 代表一个常规的新线程
AndroidSchedulers.mainThread() 代表Android的主线程
//retrofit
compile 'com.squareup.retrofit2:retrofit:2.1.0'
//Gson converter
compile 'com.squareup.retrofit2:converter-gson:2.1.0'
//RxJava2 Adapter
compile 'com.jakewharton.retrofit:retrofit2-rxjava2-adapter:1.0.0'
//okhttp
compile 'com.squareup.okhttp3:okhttp:3.4.1'
compile 'com.squareup.okhttp3:logging-interceptor:3.4.1'
网友评论