RxBus : 通过RxJava实现Rxbus。
目前大多数开发者使用EventBus或者Otto作为事件总线通信库,对于RxJava使用者来说,RxJava也可以轻松实现事件总线,因为它们都依据于观察者模式。
最近正在使用Rxjava,想想使用RxBus来代替EventBus应该是不错的选择,这样即可少complie 一个jar。
RxJava最核心的两个东西是Observables(被观察者,事件源)和Subscribers(观察者)。Observables发出一系列事件,Subscribers处理这些事件。
- 先分析一个RxBus类
getInstance()首先 这是一个单例模式
SerializedSubject extends Subject extends Observable implements Observer既是观察内容,又是观察者,起到桥梁/数据转发的作用
**PublishSubject **主题 含义是:在订阅者订阅的时间点之后的数据发送给观察者
** post** 方法发布一个 Event 对象给 bus,然后由 bus 转发给订阅者们。
** toObserverable ** 方法能够获得一个包含目标事件的 Observable,订阅者对其订阅即可响应。
**bus.ofType() ** 等效于 bus.filter(eventType::isInstance).cast(eventType) ,即先过滤事件类型,然后发射给订阅者。
public class RxBus {
private static volatile RxBus instance;//volatile 保证instance可见性 禁止指令重排
private final Subject<Object, Object> bus;
private RxBus() {
bus = new SerializedSubject<>(PublishSubject.create());
}
/**
* 单例RxBus
*
* @return RxBus
*/
public static RxBus getInstance() {
if (null == instance) {
synchronized (RxBus.class) {
if (null == instance) {
instance = new RxBus();
}
}
}
return instance;
}
/**
* 发送一个新事件
*
* @param o
*/
public void send(Object o) {
bus.onNext(o);
}
/**
* 返回特定类型的被观察者
*
* @param eventType eventType
* @param <T>
* @return
*/
public <T> Observable<T> toObservable(Class<T> eventType) {
return bus.ofType(eventType);
}
}
- MainActivity
CompositeSubscription该对象作为subscription的容器,方便统一取消订阅
** click** 点击事件 发送一个新事件
subscribeDownloadEvent() 订阅事件
destroy取消订阅事件
public class MainActivity extends AppCompatActivity {
/**
* 订阅事件
*/
private CompositeSubscription rxSubscriptions = new CompositeSubscription();
private TextView mTvResult;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
mTvResult = (TextView) findViewById(R.id.tv_result);
//订阅下载事件
subscribeDownloadEvent();
}
public void click(View view){
RxBus.getInstance().send(new MsgEvent("欢迎使用rxbus"));
}
/**
* RxBus subscribeDownloadEvent()
*/
private void subscribeDownloadEvent() {
rxSubscriptions.add(RxBus.getInstance().toObservable(MsgEvent.class)
.map(new Func1<Object, MsgEvent>() {
@Override
public MsgEvent call(Object o) {
return (MsgEvent) o;
}
})
.subscribe(new Action1<MsgEvent>() {
@Override
public void call(MsgEvent calculateEvent) {
mTvResult.setText(calculateEvent.getResult());
}
}));
}
@Override
protected void onDestroy() {
super.onDestroy();
//取消订阅,防止内存泄漏
if (!rxSubscriptions.isUnsubscribed()) {
rxSubscriptions.unsubscribe();
}
}
}
- MsgEvent 方便给观察者提供数据 通知
public class MsgEvent {
String result;
public MsgEvent(String result) {
this.result = result;
}
public String getResult() {
return result;
}
public void setResult(String result) {
this.result = result;
}
@Override
public String toString() {
return "MsgEvent{" +
"result='" + result + '\'' +
'}';
}
}
网友评论