(1)Cold Observable
定义两个观察者:
Consumer consumer1 = new Consumer<CountBean>() {
@Override
public void accept(CountBean countBean) throws Exception {
System.out.println("consumer1:"+"threadname:"+countBean.getThreadName()+"-----hashcode:"+countBean.hashCode());
}
};
Consumer consumer2 = new Consumer<CountBean>() {
@Override
public void accept(CountBean countBean) throws Exception {
System.out.println("consumer2:"+"threadname:"+countBean.getThreadName()+"-----hashcode:"+countBean.hashCode());
}
};
定义被观察者:
Observable observable = Observable.create(new ObservableOnSubscribe<CountBean>() {
@Override
public void subscribe(ObservableEmitter<CountBean> emitter) {
CountBean countBean = new CountBean();
countBean.setThreadName(Thread.currentThread().getName());
emitter.onNext(countBean);
}
}).observeOn(Schedulers.newThread());
observable.subscribe(consumer1);
observable.subscribe(consumer2);
以上代码就是典型的Cold Observable
图片.png由执行效果得知两次订阅CountBean的哈希值不同,即操作的CountBean对象不同。
(2)Hot Observable
定义两个观察者:
Consumer consumer1 = new Consumer<CountBean>() {
@Override
public void accept(CountBean countBean) throws Exception {
System.out.println("consumer1:"+"threadname:"+countBean.getThreadName()+"-----hashcode:"+countBean.hashCode());
}
};
Consumer consumer2 = new Consumer<CountBean>() {
@Override
public void accept(CountBean countBean) throws Exception {
System.out.println("consumer2:"+"threadname:"+countBean.getThreadName()+"-----hashcode:"+countBean.hashCode());
}
};
定义被观察者:(与Cold Observable不同)
-
使用publish关键字
ConnectableObservable observable = Observable.create(new ObservableOnSubscribe<CountBean>() { @Override public void subscribe(ObservableEmitter<CountBean> emitter) { CountBean countBean = new CountBean(); countBean.setThreadName(Thread.currentThread().getName()); emitter.onNext(countBean); } }).observeOn(Schedulers.newThread()).publish(); observable.connect();//必须添加 observable.subscribe(consumer1); observable.subscribe(consumer2);
-
使用publish和refCount关键字
Observable observable = Observable.create(new ObservableOnSubscribe<CountBean>() {
@Override
public void subscribe(ObservableEmitter<CountBean> emitter) {
CountBean countBean = new CountBean();
countBean.setThreadName(Thread.currentThread().getName());
emitter.onNext(countBean);
}
}).observeOn(Schedulers.newThread()).publish().refCount();
observable.subscribe(consumer1);
observable.subscribe(consumer2);
比第一种方式简化了很多。
refcount本质上在后台维护着一个引用计数器,当一个subscription需要取消订阅或者销毁的时候,发出一个正确的动作。
-
使用share关键字
Observable observable = Observable.create(new ObservableOnSubscribe<CountBean>() { @Override public void subscribe(ObservableEmitter<CountBean> emitter) { CountBean countBean = new CountBean(); countBean.setThreadName(Thread.currentThread().getName()); emitter.onNext(countBean); } }).observeOn(Schedulers.newThread()).share(); observable.subscribe(consumer1); observable.subscribe(consumer2);
看一下share的源码得知,其实share就是publish().refCount()。
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<T> share() {
return publish().refCount();
}
-
使用Subject关键字
Observable observable = Observable.create(new ObservableOnSubscribe<CountBean>() { @Override public void subscribe(ObservableEmitter<CountBean> emitter) { CountBean countBean = new CountBean(); countBean.setThreadName(Thread.currentThread().getName()); emitter.onNext(countBean); } }).observeOn(Schedulers.newThread()); PublishSubject subject = PublishSubject.create(); observable.subscribe(subject); subject.subscribe(consumer1); subject.subscribe(consumer2);
PublishSubject的父类是Subject, 查看Subject的源码可知,Subject继承了Observable,同时又实现了Observer,也就是说,Subject同时兼备了观察者和被观察者的特性,了解这个特性之后再去理解以上代码就容易多了。
以上说明了4种Hot Observable的写法,他们的执行效果都是
图片.png即,不管订阅多少个观察者,操作的对象是一样的,并且它是线程安全的。
网友评论