前言
根据Froussios英文版的学习笔记。
一、publish()
平常使用observable,都是在subscribe的时候发射数据,但是使用publish()的话,就不会在被订阅时开始发射数据,而是直到使用了Connect操作符时才会开始。
ConnectableObservable<Long> publish = Observable.interval(100, TimeUnit.MILLISECONDS, Schedulers.computation())
.publish();
publish.connect();
System.out.println("connect");
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
Disposable subscribe = publish.subscribe(aLong -> System.out.println("first: " + aLong));
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
Disposable secondDis = publish.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
System.out.println("second: " + aLong);
}
});
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("first dispose ");
subscribe.dispose();
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("second dispose ");
secondDis.dispose();
结果为:
connect
first: 3
first: 4
first: 5
first: 6
second: 6
first: 7
second: 7
first: 8
second: 8
first dispose
second: 9
second: 10
second: 11
second dispose
二、reply
保证所有的观察者收到相同的数据序列,即使它们在Observable开始发射数据之后才订阅
ConnectableObservable<Long> publish = Observable.interval(100, TimeUnit.MILLISECONDS, Schedulers.computation())
.replay();
publish.connect();
System.out.println("connect");
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
Disposable subscribe = publish.subscribe(aLong -> System.out.println("first: " + aLong));
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
Disposable secondDis = publish.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
System.out.println("second: " + aLong);
}
});
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("first dispose ");
subscribe.dispose();
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("second dispose ");
secondDis.dispose();
结果为:
connect
first: 0
first: 1
first: 2
first: 3
first: 4
first: 5
second: 0
second: 1
second: 2
second: 3
second: 4
second: 5
first: 6
second: 6
first: 7
second: 7
first: 8
second: 8
first dispose
second: 9
second: 10
second: 11
second dispose
如果改成replay(2),那么就是之后的订阅会收到订阅前的最后两个数据。
三、cache()
和reply差不多,但是他是一个observable,只有在第一个subscribe的时候,才会发射数据
Observable<Long> cache = Observable.interval(100, TimeUnit.MILLISECONDS, Schedulers.computation())
.cache();
网友评论