目前大多数开发者使用EventBus或者Otto作为事件总线通信库,对于RxJava使用者来说,RxJava也可以轻松实现事件总线,因为它们都依据于观察者模式。
不多说,上代码
/**
* RxBus
* Created by YoKeyword on 2015/6/17.
*/
public class RxBus {
private static volatile RxBus defaultInstance;
private final Subject<Object, Object> bus;
// PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者
public RxBus() {
bus = new SerializedSubject<>(PublishSubject.create());
}
// 单例RxBus
public static RxBus getDefault() {
if (defaultInstance == null) {
synchronized (RxBus.class) {
if (defaultInstance == null) {
defaultInstance = new RxBus();
}
}
}
return defaultInstance ;
}
// 发送一个新的事件
public void post (Object o) {
bus.onNext(o);
}
// 根据传递的 eventType 类型返回特定类型(eventType)的 被观察者
public <T> Observable<T> toObservable (Class<T> eventType) {
return bus.ofType(eventType);
// 这里感谢小鄧子的提醒: ofType = filter + cast
// return bus.filter(new Func1<Object, Boolean>() {
// @Override
// public Boolean call(Object o) {
// return eventType.isInstance(o);
// }
// }) .cast(eventType);
}
}
注:
1、Subject同时充当了Observer和Observable的角色,Subject是非线程安全的,要避免该问题,需要将 Subject转换为一个 SerializedSubject
,上述RxBus类中把线程非安全的PublishSubject包装成线程安全的Subject。
2、PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者。
3、ofType操作符只发射指定类型的数据,其内部就是filter+cast(这里非常感谢@小鄧子 的提醒)
public final <R> Observable<R> ofType(final Class<R> klass) {
return filter(new Func1<T, Boolean>() {
@Override
public final Boolean call(T t) {
return klass.isInstance(t);
}
}).cast(klass);
}
filter操作符可以使你提供一个指定的测试数据项,只有通过测试的数据才会被“发射”。
cast操作符可以将一个Observable转换成指定类型的Observable。
分析:
RxBus工作流程图1、首先创建一个可同时充当Observer和Observable的Subject;
2、在需要接收事件的地方,订阅该Subject(此时Subject是作为Observable),在这之后,一旦Subject接收到事件,立即发射给该订阅者;
3、在我们需要发送事件的地方,将事件post至Subject,此时Subject作为Observer接收到事件(onNext),然后会发射给所有订阅该Subject的订阅者。
对于RxBus的使用,就和普通的RxJava订阅事件很相似了。
先看发送事件的代码:
RxBus.getDefault().post(new UserEvent (1, "yoyo"));
userEvent是要发送的事件,如果你用过EventBus, 很容易理解,UserEvent的代码:
public class UserEvent {
long id;
String name;
public UserEvent(long id,String name) {
this.id= id;
this.name= name;
}
public long getId() {
return id;
}
public String getName() {
return name;
}
}
再看接收事件的代码:
// rxSubscription是一个Subscription的全局变量,这段代码可以在onCreate/onStart等生命周期内
rxSubscription = RxBus.getDefault().toObserverable(UserEvent.class)
.subscribe(new Action1<UserEvent>() {
@Override
public void call(UserEvent userEvent) {
long id = userEvent.getId();
String name = userEvent.getName();
...
}
},
new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
// TODO: 处理异常
}
});
最后,一定要记得在生命周期结束的地方取消订阅事件,防止RxJava可能会引起的内存泄漏问题。
@Override
protected void onDestroy() {
super.onDestroy();
if(!rxSubscription.isUnsubscribed()) {
rxSubscription.unsubscribe();
}
}
这样,一个简单的Event Bus就实现了!如果你的项目已经开始使用RxJava,也许可以考虑替换掉EventBus或Otto,减小项目体积。
RxBus、EventBus因为解耦太彻底,滥用的话,项目可维护性会越来越低;一些简单场景更推荐用回调、Subject来代替事件总线。
感兴趣的可以阅读我另外2篇深入RxBus的文章:
[深入RxBus:[支持Sticky事件]](http://www.jianshu.com/p/71ab00a2677b)
深入RxBus:[异常处理]
参考:
http://nerds.weddingpartyapp.com/tech/2014/12/24/implementing-an-event-bus-with-rxjava-rxbus/
网友评论
不断执行onNext,然后在几千次后变得很卡的情况。
我查过是因为publishSubject中的subjectObserver在每次onNext都会变多,导致每次执行那个循环时间都增加了,但是自己想不出怎么解决。
if(mSubscription.isUnsubscribed()){
mSubscription.unsubscribe();
}
把isUnsubscribed理解成未取消订阅了
public User Event(long id,String name) {
this.id= id;
this.name= name;
}
你是写成这样的。返回是User,而且还没return,所以我理解你这个是要写构造函数,那也不应该是Event,应该是UserEvent,是不是这样
public UserEvent(long id,String name){
this.id = id;
this.name = name;
}
public void post(Object object) {
bus.onNext(object);
}
这个方法所在的线程是什么个情况,在那个线程都可以使用吗?
public static RxBus getDefault() {
if (defaultInstance == null) {
synchronized (RxBus.class) {
if (defaultInstance == null) {
defaultInstance = new RxBus();
}
}
}
return rxBus;
}
return 的rxBus哪里来的?
public void getRegister(String name, String pass) {
NetWork.getApi().getSignUp(name,pass)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<RegisterBean>() {
@Override
public void call(RegisterBean bean) {
LogUtils.d("Regis", bean.getMessage());
LogUtils.d("Regis", bean.getStatus() + "");
if (bean.getStatus() == 2000) {
RxBus.getDefaultInstance().post(new UserEvent(bean.getData().getUserId()));
mView.showLoading();
} else {
mView.showError(bean.getMessage());
}
}
});
}
RxBus.getDefault().toObservable(Event.class).subscribe(xxx); 在订阅状态中。
不满足上述条件,肯定是接受不到的
比如:class EventOne{ String str; }
class EventTwo{ String str; }
这样post对应Event就可以区别了,同时也建议post的对象尽量都封装成Event对象
1、如果我有个订阅想获取的是一个ArrayList,toObserverable的参数应该传什么Class呢?
2、如果我读取数据库读取一个ArrayList成功后,我把数据存在一个单例,但是我又想通知多个fragment呢?toObserverable又应该怎么传数据?或者说我数据存在单例,只是发送一个通知告诉他这个内容的数据单例有更新,然后fragment再访问这个数据单例拿数据
1、若我在ViewPager中使用异步的加载数据,就是真正显示给用户的时候再去渲染数据,所以我是想真正显示的时候再去订阅,而不是presenter onAttach的时候订阅,但是这样我就拿不到订阅之前发射的last数据,而且订阅的数据如果没有变化的情况下我也是presenter不需要获取到再次渲染的即是要distinct数据,我知道有这些过滤操作符,但是不知道怎么跟RxBus结合
2、是否能加一下我QQ联系方式 983064150,最喜欢认识大神了
1、建议新创建一个Event类,里面的属性就是你的ArrayList,这样是最规范最安全的;
2、各个Fragment作为订阅者 toObservable()该ArrayList事件即可,通知的话直接post(该ArrayList属的Event)
.compose(this.<OperateEvent>bindToLifecycle())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<OperateEvent>() {
@Override
public void call(OperateEvent operateEvent) {
String operate = operateEvent.getOperate();
LogUtil.d(TAG, "--------rxSub-----operate----" + operate);
}
});
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<RegisterBean>() {
@Override
public void call(RegisterBean bean) {
LogUtils.d("Regis", bean.getMessage());
LogUtils.d("Regis", bean.getStatus() + "");
if (bean.getStatus() == 2000) {
LogUtils.d("Regis2",bean.getData().getUserId()+";;;");
RxBus.getDefaultInstance().post(new UserEvent(bean.getData().getUserId()));
mView.showLoading();
} else {
mView.showError(bean.getMessage());
}
}
});
addSubscribe(subscription);
进行的发布,但无法在另外一个Activity进行接收到数据
如果你想发射完之后,后续可能还需要用到该event,你可以看下RxJava中的BehaviorSubject和ReplaySubject
http://reactivex.io/documentation/subject.html
1. 在RxBus里:
public Observable<Object> toObserverable() {
return _bus;
}
2. 在订阅接收处:
_rxBus.toObserverable()
.subscribe(new Action1<Object>() {
@Override
public void call(Object event) {
if(event instanceof TapEvent) {
_showTapText();
}else if(event instanceof SomeOtherEvent) {
_doSomethingElse();
}
}
});
参考:http://blog.kaush.co/2014/12/24/implementing-an-event-bus-with-rxjava-rxbus/
EventBus/otto 对于多event 也只能多次 @Subscribe :)
http://reactivex.io/documentation/subject.html
因为 app 如果被系统回收的, onDestroy 不会被 call
什么时候用RxBus,什么时候用接口传输数据?
可以直接在V层发送数据吗?
我在M层的订阅怎么样unsubscribe()防止内存泄露,或者不需要unsubscribe(),在M层没有OnDestroy了,所以不知道怎么unsubscribe()?
问题比较多,因为初学,谢谢@YoKey
1、这里你说的接口传输数据是指回调函数吧,比如Fragment与Activity的通信,通过Bus的方式和接口回调最终其实都是达到同样的目的,可以随意选择的,但是Bus的方式明显是比接口回调方便使用的,而且解耦更出色,这对MVP是很重要的一个点,对于这2种方式都要注意:在生命周期结束的地方记得取消订阅/注册!
2、至于能否在V层直接发送数据,不是很理解你的意思,按理来说正常的MVP的V层是通过Presenter和M层交互的。
3、如何取消订阅:在Activity/Fragment的onDestory方法里执行Presenter.onDetach(),在Presenter里的onDetach方法里,model.unsubscribe()即可,把事件通过P传递过去就好啦`
至于公有的构造函数其实是为了 在某些模块化的场景下使用EventBus提供的。 如果你有看greenrobot的EventBus的库里的源码的话,它也提供了公有的构造函数
不同Subscriber当然可以订阅同一个Observable