使用
1.简单使用(类似Rx1)
- 创建被观察者:
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("onNext");
e.onError(Exception);
e.onComplete();
}
});
- 创建观察者:
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
//这个地方可以拿到水管的开关d,可以在适当的时候用d.dispose();来切断连接
}
@Override
public void onNext(@NonNull String s) {
Log.e(TAG, "onNext: "+s);
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
};
- 将观察者绑定到被观察者:
observable.subscribe(observer);
- 把代码连起来就是链式操作
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "subscribe");
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "" + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "error");
}
@Override
public void onComplete() {
Log.d(TAG, "complete");
}
});
2.注意:
2.1. observable的onNext、onComplete、onError发送会在observer的onNext、onComplete、onError接受。
2.2. 当上游发送了一个onError或者oncomplete后, 上游onError或者oncomplete之后的事件将继续发送, 而下游不再继续接收事件。
2.3. subscribe有多个重载方法,可以选择观察者之关注onSubscribe、onNext、onComplete、onError哪一个或者那几个方法
public final Disposable subscribe() {}
public final Disposable subscribe(Consumer<? super T> onNext) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
public final void subscribe(Observer<? super T> observer) {}
使用起来就像这样
observable.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
}
});
关于线程
(默认上下游也就是观察者和被观察者实在同一个线程的)
1.RxJava提供的线程:
Schedulers.io() 代表io操作的线程, 通常用于网络,读写文件等io密集型的操作
Schedulers.computation() 代表CPU计算密集型的操作, 例如需要大量计算的操作
Schedulers.newThread() 代表一个常规的新线程
AndroidSchedulers.mainThread() 代表Android的主线程
2.改变线程的两个方法:
.subscribeOn(Schedulers.newThread())
- 指定上游(被观察者)的线程,多次调用subscribeOn() 只有第一次的有效, 其余的会被忽略.
.observeOn(AndroidSchedulers.mainThread())
- 指定下游(观察者)的线程,每调用一次observeOn() , 下游的线程就会切换一次.
3.例子:读写数据库:
public Observable<List<Record>> readAllRecords() {
return Observable.create(new ObservableOnSubscribe<List<Record>>() {
@Override
public void subscribe(ObservableEmitter<List<Record>> emitter) throws Exception {
Cursor cursor = null;
try {
cursor = getReadableDatabase().rawQuery("select * from " + TABLE_NAME, new String[]{});
List<Record> result = new ArrayList<>();
while (cursor.moveToNext()) {
result.add(Db.Record.read(cursor));
}
emitter.onNext(result);
emitter.onComplete();
} finally {
if (cursor != null) {
cursor.close();
}
}
}
}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
}
操作符
其他操作符在本人githubRx下有一个Rxutils文件,那里面是rx1的操作符,2应该也有。
- FlatMap,
这个返回一个 ObservableSource(相当于一个Observable),将一个Observable的结果转换为另一个Observable
例子(结合retrofit先注册再登陆):
retrofit的接口操作
public interface Api {
@GET
Observable<LoginResponse> login(@Body LoginRequest request);
@GET
Observable<RegisterResponse> register(@Body RegisterRequest request);
}
----------
api.register(new RegisterRequest()) //发起注册请求
.subscribeOn(Schedulers.io()) //在IO线程进行网络请求
.observeOn(AndroidSchedulers.mainThread()) //回到主线程去处理请求注册结果
.doOnNext(new Consumer<RegisterResponse>() {
@Override
public void accept(RegisterResponse registerResponse) throws Exception {
//先根据注册的响应结果去做一些操作
}
})
.observeOn(Schedulers.io()) //回到IO线程去发起登录请求
.flatMap(new Function<RegisterResponse, ObservableSource<LoginResponse>>() {
@Override
public ObservableSource<LoginResponse> apply(RegisterResponse registerResponse) throws Exception {
return api.login(new LoginRequest());//这里返回的是一个Observable
}
})
.observeOn(AndroidSchedulers.mainThread()) //回到主线程去处理请求登录的结果
.subscribe(new Consumer<LoginResponse>() {
@Override
public void accept(LoginResponse loginResponse) throws Exception {
Toast.makeText(MainActivity.this, "登录成功", Toast.LENGTH_SHORT).show();
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Toast.makeText(MainActivity.this, "登录失败", Toast.LENGTH_SHORT).show();
}
});
2.zip操作符
将两个或多个Observable发来的事件转换为一个Observable,再发送出去。
应用场景:比如一个界面需要展示用户的一些信息, 而这些信息分别要从两个服务器接口中获取, 而只有当两个都获取到了之后才能进行展示, 这个时候就可以用Zip了。
例子:
Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "emit 1");
emitter.onNext(1);
Thread.sleep(1000);
Log.d(TAG, "emit 2");
emitter.onNext(2);
Thread.sleep(1000);
Log.d(TAG, "emit 3");
emitter.onNext(3);
Thread.sleep(1000);
Log.d(TAG, "emit 4");
emitter.onNext(4);
Thread.sleep(1000);
Log.d(TAG, "emit complete1");
emitter.onComplete();
}
}).subscribeOn(Schedulers.io());
Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.d(TAG, "emit A");
emitter.onNext("A");
Thread.sleep(1000);
Log.d(TAG, "emit B");
emitter.onNext("B");
Thread.sleep(1000);
Log.d(TAG, "emit C");
emitter.onNext("C");
Thread.sleep(1000);
Log.d(TAG, "emit complete2");
emitter.onComplete();
}
}).subscribeOn(Schedulers.io());
Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
@Override
public String apply(Integer integer, String s) throws Exception {
return integer + s;
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(String value) {
Log.d(TAG, "onNext: " + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
OOM水缸 Backpressure Flowable等
1.简介:这个问题的由来是因为上下游发送接收速度不匹配导致。
如果上下游在同一个线程,那么上游发一个要等下游处理完成以后才会继续发送。
如果上下游在不同的线程,那么如果上游一直发,下游却处理的非常慢,上游会把多余的事件放到一个水缸里面,当这个水缸满了以后就会OOM了。
手动解决:
一是从数量上进行治理, 减少发送进水缸里的事件:
可以用filter去滤掉一些数据,只让一部分进入水缸,但是这种,数据会丢失。
二是从速度上进行治理, 减缓事件发送进水缸的速度
可以让上游发送数据的时候,做一定的延时,给下游充分的处理时间。
2.Flowable与Backpressure
注意:使用Flowable时上下游从Observable和Observer变成了Flowable和Subscriber。
Backpressure策略:(Flowable的默认水缸大小为128)
BackpressureStrategy.ERROR:这种方式会在出现上下游流速不均衡的时候直接抛出一个异常,这个异常就是著名的MissingBackpressureException。
BackpressureStrategy.BUFFER:使用一个新水缸,这个水缸大小无限制。
BackpressureStrategy.DROP:当事件用不下时,多余的事件直接丢弃掉了。
BackpressureStrategy.LATEST:当水缸满了以后还是会存新的事件,但是相对较老的那个事件会被抛弃。(水缸就是一个队列)
对于Flowable.create可以在参数里面设置Backpressure策略,对于Flowable.interval这些没有这个参数可以
onBackpressureBuffer()
onBackpressureDrop()
onBackpressureLatest()
Subscription.cancel():切断水管和Disposable.dispose()效果产不多。
Subscription.request(Long.MAX_VALUE);表明下游想要处理的事件数量,这里穿进去的int值,上游就会从水缸中发出来对应的个数,然后发完以后如果下游要继续处理事件,还可以在onNext里面在request一次,这样上游又从水缸里面取出事件发过来。
emitter.requested()上游可以这样拿到下游请求的数量。上游每发送一个事件,这个数量就减一,下游每调用一次Subscription.request(n),就在这个数量的基础上加上n。
Flowable<Integer> upstream = Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "emit 1");
emitter.onNext(1);
Log.d(TAG, "emit 2");
emitter.onNext(2);
Log.d(TAG, "emit 3");
emitter.onNext(3);
Log.d(TAG, "emit complete");
emitter.onComplete();
}
}, BackpressureStrategy.ERROR); //增加了一个参数
Subscriber<Integer> downstream = new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
s.request(Long.MAX_VALUE); //注意这句代码
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
};
upstream.subscribe(downstream);
最后一个例子:异步线程读取文本文件,读一行打印一行。
public static void main(String[] args) {
practice1();
try {
Thread.sleep(10000000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void practice1() {
Flowable
.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> emitter) throws Exception {
try {
FileReader reader = new FileReader("test.txt");
BufferedReader br = new BufferedReader(reader);
String str;
while ((str = br.readLine()) != null && !emitter.isCancelled()) {
while (emitter.requested() == 0) {
if (emitter.isCancelled()) {
break;
}
}
emitter.onNext(str);
}
br.close();
reader.close();
emitter.onComplete();
} catch (Exception e) {
emitter.onError(e);
}
}
}, BackpressureStrategy.ERROR)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.subscribe(new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
mSubscription = s;
s.request(1);
}
@Override
public void onNext(String string) {
System.out.println(string);
try {
Thread.sleep(2000);
mSubscription.request(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void onError(Throwable t) {
System.out.println(t);
}
@Override
public void onComplete() {
}
});
}
网友评论