(1)没有压背处理的Rxbus
public class RxBus {
private Subject<Object> bus;
private RxBus() {
//把非线程安全的PublishSubject包装成线程安全的SerializedSubject
bus = PublishSubject.create().toSerialized();
}
public static RxBus getDefault() {
return SingletonHolder.INSTANCE;
}
private static class SingletonHolder {
public static volatile RxBus INSTANCE = new RxBus();
}
/**
* 发送事件
*
* @param event 事件对象
*/
public void post(Object event) {
if (bus.hasObservers()) {
bus.onNext(event);
}
}
/**
* 监听事件
*
* @return 特定类型的Observable
*/
public Observable<Object> observe() {
return bus;
}
/**
* 监听事件
*
* @param event 事件对象
* @param <T> 事件类型
* @return 特定类型的Observable
*/
public <T> Observable<T> observe(Class<T> event) {
return bus.ofType(event);
}
}
注册:
RxBus.getDefault().observe(CountBean.class).subscribe(new Consumer<CountBean>() {
@Override
public void accept(CountBean countBean) throws Exception {
System.out.println("count:"+countBean.getCount());
}
});
发送数据:
CountBean countBean = new CountBean();
countBean.setCount(1);
RxBus.getDefault().post(countBean);
(2)有压背处理的Rxbus
public class RxBus {
private FlowableProcessor<Object> bus;
private RxBus() {
//把非线程安全的PublishSubject包装成线程安全的SerializedSubject
bus = PublishProcessor.create().toSerialized();
}
public static RxBus getDefault() {
return SingletonHolder.INSTANCE;
}
private static class SingletonHolder {
public static volatile RxBus INSTANCE = new RxBus();
}
/**
* 发送事件
*
* @param event 事件对象
*/
public void post(Object event) {
if (bus.hasSubscribers()) {
bus.onNext(event);
}
}
/**
* 监听事件
*
* @return 特定类型的Observable
*/
public Flowable<Object> observe() {
return bus;
}
/**
* 监听事件
*
* @param event 事件对象
* @param <T> 事件类型
* @return 特定类型的Observable
*/
public <T> Flowable<T> observe(Class<T> event) {
return bus.ofType(event);
}
}
网友评论