Rxbus的使用:
1、发送消息:
RxBus.getDefault().post(type, object);
2、接收消息:
Disposable disposable = RxBus.getDefault().register(type, object.class)
.subscribe(o -> {
//todo
});
3、注销消息:
RxBus.getDefault().unregister(disposable);
4、封装Rxbus
private static volatile RxBus mInstance;
private final FlowableProcessorbus = PublishProcessor.create().toSerialized();
public static RxBus getDefault() {
if (mInstance ==null) {
synchronized (RxBus.class) {
if (mInstance ==null) {
mInstance =new RxBus();
}
}
}
return mInstance;
}
/**
* @param tag tag必须保持一致
* @param object 发送的数据类型和接受的数据类型必须要一致,要不然接受不到消息
*/
public void post(@NonNull String tag, @NonNull Object object) {
RxBusEntity entity =new RxBusEntity();
entity.setTag(tag);
entity.setObject(object);
bus.onNext(entity);
}
/**
* 注册 rxbus
*
* @param tag
* @param tClass
* @param <T>
* @return
*/
public Flowableregister(@NonNull final String tag, @NonNull final Class tClass) {
return register(tag, tClass, AndroidSchedulers.mainThread());
}
public Flowableregister(@NonNull final String tag, final Class tClass, Scheduler scheduler) {
return bus.filter(o -> {
if (oinstanceof RxBusEntity) {
RxBusEntity entity = (RxBusEntity) o;
if (TextUtils.isEmpty(entity.getTag())) {
return false;
}
if (entity.getTag().equals(tag)) {
return tClass.isInstance(entity.getObject());
}
}
return false;
}).map(o -> ((RxBusEntity) o).getObject()).cast(tClass).observeOn(scheduler);
}
public void unregister(Disposable... disposables) {
if (disposables ==null){
return;
}
for (Disposable disposable : disposables) {
if (disposable ==null || disposable.isDisposed()) {
continue;
}
disposable.dispose();
}
}
5、RxBusEntity:
private StringmTag;
private ObjectmObject;
public StringgetTag() {
return mTag;
}
public void setTag(String tag) {
this.mTag = tag;
}
public ObjectgetObject() {
return mObject;
}
public void setObject(Object object) {
this.mObject = object;
}
6、以上就是支持背压的rxbus封装,及其好用,没有太多的花里花哨,消息稳定、灵活使用、线程切换,无所不能。
网友评论