public class RxBus {
private volatile static RxBus mDefaultInstance;
private final Subject<Object> mBus;
private final Map<Class<?>, Object> mStickyEventMap;
public RxBus() {
mBus = PublishSubject.create().toSerialized();
mStickyEventMap = new ConcurrentHashMap<>();
}
public static RxBus getInstance() {
if (mDefaultInstance == null) {
synchronized (RxBus.class) {
if (mDefaultInstance == null) {
mDefaultInstance = new RxBus();
}
}
}
return mDefaultInstance;
}
/**
* 发送事件
*/
public void post(Object event) {
mBus.onNext(event);
}
/**
* 根据传递的 eventType 类型返回特定类型(eventType)的 被观察者
*/
public <T> Observable<T> toObservable(final Class<T> eventType) {
return mBus.ofType(eventType);
}
/**
* 判断是否有订阅者
*/
public boolean hasObservers() {
return mBus.hasObservers();
}
public void reset() {
mDefaultInstance = null;
}
/**
* Stciky 相关
*/
/**
* 发送一个新Sticky事件
*/
public void postSticky(Object event) {
synchronized (mStickyEventMap) {
mStickyEventMap.put(event.getClass(), event);
}
post(event);
}
/**
* 根据传递的 eventType 类型返回特定类型(eventType)的 被观察者
*/
public <T> Observable<T> toObservableSticky(final Class<T> eventType) {
synchronized (mStickyEventMap) {
Observable<T> observable = mBus.ofType(eventType);
final Object event = mStickyEventMap.get(eventType);
if (event != null) {
return observable.mergeWith(Observable.create(new ObservableOnSubscribe<T>() {
@Override
public void subscribe(ObservableEmitter<T> subscriber) throws Exception {
subscriber.onNext(eventType.cast(event));
}
}));
} else {
return observable;
}
}
}
/**
* 根据eventType获取Sticky事件
*/
public <T> T getStickyEvent(Class<T> eventType) {
synchronized (mStickyEventMap) {
return eventType.cast(mStickyEventMap.get(eventType));
}
}
/**
* 移除指定eventType的Sticky事件
*/
public <T> T removeStickyEvent(Class<T> eventType) {
synchronized (mStickyEventMap) {
return eventType.cast(mStickyEventMap.remove(eventType));
}
}
/**
* 移除所有的Sticky事件
*/
public void removeAllStickyEvents() {
synchronized (mStickyEventMap) {
mStickyEventMap.clear();
}
}
/**
* 普通订阅解绑
* @param disposable
*/
public static void rxBusUnbund(CompositeDisposable disposable){
if (null != disposable && !disposable.isDisposed()) {
disposable.clear();
}
}
}
消息发送
RxBus.getInstance().post(new SetShoppingGoodsCount(Run.goodsCounts));
消息接收
compositeDisposable = new CompositeDisposable();
RxBus.getInstance().toObservable(SetShoppingGoodsCount.class)
.subscribe(new Observer<SetShoppingGoodsCount>() {
@Override
public void onSubscribe(Disposable d) {
compositeDisposable.add(d);
}
@Override
public void onNext(SetShoppingGoodsCount setShoppingGoodsCount) {
setShoppingCarCountWithoutAnimation(setShoppingGoodsCount.getCount());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
解绑
@Override
protected void onDestroy() {
super.onDestroy();
RxBus.rxBusUnbund(compositeDisposable);
}
volatile
https://www.jianshu.com/p/71ab00a2677b
https://blog.csdn.net/senkai123/article/details/78202217
https://blog.csdn.net/donkor_/article/details/79709366
网友评论