美文网首页Android DevAndroid android框架学习
用RxJava实现事件总线(Event Bus)

用RxJava实现事件总线(Event Bus)

作者: YoKey | 来源:发表于2015-11-11 17:30 被阅读52183次

目前大多数开发者使用EventBus或者Otto作为事件总线通信库,对于RxJava使用者来说,RxJava也可以轻松实现事件总线,因为它们都依据于观察者模式。

不多说,上代码

/**
* RxBus
* Created by YoKeyword on 2015/6/17.
*/
public class RxBus {
    private static volatile RxBus defaultInstance;

    private final Subject<Object, Object> bus;
    // PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者
    public RxBus() {
      bus = new SerializedSubject<>(PublishSubject.create());
    }
    // 单例RxBus
    public static RxBus getDefault() {
        if (defaultInstance == null) {
            synchronized (RxBus.class) {
                if (defaultInstance == null) {
                    defaultInstance = new RxBus();
                }
            }
        }
        return defaultInstance ;
    }
    // 发送一个新的事件
    public void post (Object o) {
        bus.onNext(o);
    }
    // 根据传递的 eventType 类型返回特定类型(eventType)的 被观察者
    public <T> Observable<T> toObservable (Class<T> eventType) {
        return bus.ofType(eventType);
//        这里感谢小鄧子的提醒: ofType = filter + cast
//        return bus.filter(new Func1<Object, Boolean>() {
//            @Override
//            public Boolean call(Object o) {
//                return eventType.isInstance(o);
//            }
//        }) .cast(eventType);
    }
}

注:
1、Subject同时充当了Observer和Observable的角色,Subject是非线程安全的,要避免该问题,需要将 Subject转换为一个 SerializedSubject
,上述RxBus类中把线程非安全的PublishSubject包装成线程安全的Subject。

2、PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者。

3、ofType操作符只发射指定类型的数据,其内部就是filter+cast(这里非常感谢@小鄧子 的提醒)

public final <R> Observable<R> ofType(final Class<R> klass) {
    return filter(new Func1<T, Boolean>() {
        @Override
        public final Boolean call(T t) {
            return klass.isInstance(t);
        }
    }).cast(klass);
}

filter操作符可以使你提供一个指定的测试数据项,只有通过测试的数据才会被“发射”。
cast操作符可以将一个Observable转换成指定类型的Observable。

分析:

RxBus工作流程图

1、首先创建一个可同时充当Observer和Observable的Subject;

2、在需要接收事件的地方,订阅该Subject(此时Subject是作为Observable),在这之后,一旦Subject接收到事件,立即发射给该订阅者;

3、在我们需要发送事件的地方,将事件post至Subject,此时Subject作为Observer接收到事件(onNext),然后会发射给所有订阅该Subject的订阅者。

对于RxBus的使用,就和普通的RxJava订阅事件很相似了。
先看发送事件的代码:

RxBus.getDefault().post(new UserEvent (1, "yoyo"));

userEvent是要发送的事件,如果你用过EventBus, 很容易理解,UserEvent的代码:

public class UserEvent {
    long id;
    String name;
    public UserEvent(long id,String name) {
        this.id= id;
        this.name= name;
    }
    public long getId() {
        return id;
    }
    public String getName() {
        return name;
    }
}

再看接收事件的代码:

// rxSubscription是一个Subscription的全局变量,这段代码可以在onCreate/onStart等生命周期内
rxSubscription = RxBus.getDefault().toObserverable(UserEvent.class)
        .subscribe(new Action1<UserEvent>() {
               @Override
               public void call(UserEvent userEvent) {
                   long id = userEvent.getId();
                   String name = userEvent.getName();
                   ...
               }
           },
        new Action1<Throwable>() {
            @Override
            public void call(Throwable throwable) {
                // TODO: 处理异常
            }        
        });

最后,一定要记得在生命周期结束的地方取消订阅事件,防止RxJava可能会引起的内存泄漏问题。

@Override
protected void onDestroy() {
    super.onDestroy();
    if(!rxSubscription.isUnsubscribed()) {
        rxSubscription.unsubscribe();
    }
}

这样,一个简单的Event Bus就实现了!如果你的项目已经开始使用RxJava,也许可以考虑替换掉EventBus或Otto,减小项目体积。

RxBus、EventBus因为解耦太彻底,滥用的话,项目可维护性会越来越低;一些简单场景更推荐用回调、Subject来代替事件总线。

感兴趣的可以阅读我另外2篇深入RxBus的文章:
[深入RxBus:[支持Sticky事件]](http://www.jianshu.com/p/71ab00a2677b)
深入RxBus:[异常处理]

参考:
http://nerds.weddingpartyapp.com/tech/2014/12/24/implementing-an-event-bus-with-rxjava-rxbus/

相关文章

网友评论

  • WeroNG:之前没使用过eventbus,看了好多文章,终于看懂了,感谢:blush:
  • 杨晓是大V:哈哈,支持支持
  • GIndoc:博主,我遇到了一个问题:当我点击recyclerView中item的button时,post了一次,但是call方法却接收到了6次!!!而我用behaviorSubject代替publishSubject,就只接收一次!!这是为什么呢?
    GIndoc:找到问题所在了,其实还是多次subscribe的问题,我是在fragment中subscribe的,当实例化多个相同fragment时就subscribe多次了,真是悲剧:sweat: 还是要感谢作者,然后,赶紧改代码去。。
  • 性冷淡_:rxbus的线程处理,我个人感觉就是高阶接口玩法
  • 偶贱贱滴:那这个是不是类似于广播,和Activity直接的数据传递了?
  • WangGavin:可以更新为最新的rxjava2吗?
  • Poseidon_Wang:还有就是你现在是以消息体的类别来筛选,eventBus里面是以tag来筛选,我觉得后者的方式更可取,我是不是应该封装Event,加tag属性
  • Poseidon_Wang:现在设计模式来讲,我是不是只能将我的List<Observable> 维护在我的BaseActivity内,在Ondestroy的时候全部释放,作者您在项目中是怎么处理的,我先在的处理是在BaseActivity内封装一个接消息的方法,每次接的消息体不一样,就往List里面加一个Observable,但是我觉得这样就有点违反单一原则
  • 73ece15c815f:在碎片中什么时候unsubscribe(); 我已经完成加载登录后的界面了
  • 炸鸡叔:使用之后 一直内存泄漏, 是我使用姿势不对?
    YoKey:@炸鸡叔 销毁时要取消订阅~
  • 4d02f9585b9b:问个问题,有没有碰到过执行在同一个链子里。
    不断执行onNext,然后在几千次后变得很卡的情况。
    我查过是因为publishSubject中的subjectObserver在每次onNext都会变多,导致每次执行那个循环时间都增加了,但是自己想不出怎么解决。
  • 妙法莲花1234:订阅事件要先运行了吧, 这样发送事件post后才能订阅到。不像eventbus,订阅方法是孤立的,他自己去查找。
  • 5c3ad025c12e:我用LeakCanary检测一直内存泄漏,才发现我写的
    if(mSubscription.isUnsubscribed()){
    mSubscription.unsubscribe();
    }
    把isUnsubscribed理解成未取消订阅了
  • Greathfs:不错
  • bdd63c599a2c:你好 ,如果是在post(objcet o);这个方法里面加入延迟delay。需要怎么封装?谢谢
    bdd63c599a2c:@YoKey :blush: 哦这样哦。
    YoKey:@bdd63c599a2c 建议在订阅方使用delay操作符,按需自由delay
  • 青蛙要fly:我知道了。你是UserEvent里面多了个空格,变成了User Event。好吧
    YoKey:@青蛙要fly :joy: 是的 手抖了~
  • 青蛙要fly:那个UserEvent的类里面那个构造函数是不是写错了。
    public User Event(long id,String name) {
    this.id= id;
    this.name= name;
    }
    你是写成这样的。返回是User,而且还没return,所以我理解你这个是要写构造函数,那也不应该是Event,应该是UserEvent,是不是这样
    public UserEvent(long id,String name){
    this.id = id;
    this.name = name;
    }
  • leilifengxingmw:我想问一下 //发送一个事件
    public void post(Object object) {
    bus.onNext(object);
    }
    这个方法所在的线程是什么个情况,在那个线程都可以使用吗?

    YoKey:@humanheima 取决你post时所在的线程
  • cd17d98f379f:呃,突然想明白了。谢谢。我把顺序搞反了,是先有订阅动作,再有发送动作,然后才能调起订阅者的执行方法来着。按代码顺序看想当然的想成先发送,再订阅。。给绕糊涂了
  • cd17d98f379f:为什么发送消息用的是onNext()方法呢,这明明是观察者的方法啊。绕了一圈源码,还是没明白,望解答!谢谢
    YoKey:@八爪c Subject.onNext() 此时的Subject是作为Observable的 (Subject同时充当Observer和Observable的角色)
  • KingJA:发送延迟事件的时候,怎么防止其他普通事件的观察者收到延迟事件。
  • 柴泽建_Jack:看不懂这里:
    public static RxBus getDefault() {
    if (defaultInstance == null) {
    synchronized (RxBus.class) {
    if (defaultInstance == null) {
    defaultInstance = new RxBus();
    }
    }
    }
    return rxBus;
    }
    return 的rxBus哪里来的?
    YoKey:@柴泽建_Jack 手误啦~ 已经修正 :joy:
  • 李福来:我是在Presenter中 @Override
    public void getRegister(String name, String pass) {
    NetWork.getApi().getSignUp(name,pass)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Action1<RegisterBean>() {
    @Override
    public void call(RegisterBean bean) {
    LogUtils.d("Regis", bean.getMessage());
    LogUtils.d("Regis", bean.getStatus() + "");
    if (bean.getStatus() == 2000) {
    RxBus.getDefaultInstance().post(new UserEvent(bean.getData().getUserId()));
    mView.showLoading();
    } else {
    mView.showError(bean.getMessage());
    }
    }
    });


    }
    73ece15c815f:@TellH 哈哈,学习之余 遇到搞笑的咯 哈哈 在这里感谢YoKey哦,学到很多东西
    68768b474bfc:@李福来 这位叫Override的老兄估计要被@好多次 :joy:
    YoKey:@李福来 你需要确保 接收的Activity在订阅状态中,即:已经创建,并且通过
    RxBus.getDefault().toObservable(Event.class).subscribe(xxx); 在订阅状态中。

    不满足上述条件,肯定是接受不到的 :blush:
  • 李福来:我使用的RxBus无法进行传递数据
  • CY_Frank:多谢
  • 然小七:mark 谢大神分享~
  • 小蜗牛snamon:我想问下:如果我在一个Activity里面注册有二个相同的类型 比如String,这个时候我post一个string 那不是二个都可以收到 。而其实我只想要一个Obserable收到。
    小蜗牛snamon:@YoKey 还有一个问题 如果是这样不是很多这样的event?而我只想要传递一个string 。。 求解答 :fist::fist:
    小蜗牛snamon:@YoKey 也就是说post的对象 在注册的时候要保证是唯一的吗?是不是你代码都是这样封装 我现在是在rxbus里面加了一个map key是我定义的类型,value是subject对象。 eventbus是把要发送的数据都封装成了一个event吗 我没研究过
    YoKey:@小蜗牛snamon 你把这个属性封装成Event对象:
    比如:class EventOne{ String str; }
    class EventTwo{ String str; }
    这样post对应Event就可以区别了,同时也建议post的对象尽量都封装成Event对象
  • 47287ba8b36e:我想请问一下
    1、如果我有个订阅想获取的是一个ArrayList,toObserverable的参数应该传什么Class呢?
    2、如果我读取数据库读取一个ArrayList成功后,我把数据存在一个单例,但是我又想通知多个fragment呢?toObserverable又应该怎么传数据?或者说我数据存在单例,只是发送一个通知告诉他这个内容的数据单例有更新,然后fragment再访问这个数据单例拿数据
    47287ba8b36e:ReplaySubject、AsyncSubject好像有类似的功能,不知道能否用上
    47287ba8b36e:感谢您的指导,又有了新的问题,再请教一下您
    1、若我在ViewPager中使用异步的加载数据,就是真正显示给用户的时候再去渲染数据,所以我是想真正显示的时候再去订阅,而不是presenter onAttach的时候订阅,但是这样我就拿不到订阅之前发射的last数据,而且订阅的数据如果没有变化的情况下我也是presenter不需要获取到再次渲染的即是要distinct数据,我知道有这些过滤操作符,但是不知道怎么跟RxBus结合
    2、是否能加一下我QQ联系方式 983064150,最喜欢认识大神了
    YoKey:@AmosZhong
    1、建议新创建一个Event类,里面的属性就是你的ArrayList,这样是最规范最安全的;
    2、各个Fragment作为订阅者 toObservable()该ArrayList事件即可,通知的话直接post(该ArrayList属的Event) :smile:
  • cb6d8daf3ca1:我还不是不明白一个问题。rxbus是一个全局的。整个项目中都用它去发送消息。但是怎么区分那个发送是那个界面去接收呢?EventBus是根据post的对象来区分到底是那个东西接收。难道rxbus 要自己传入tag 自己标记?
    YoKey:@zouxianbincc 原理其实是一样的,都是基于观察者模式,RxBus通过post发射一个EventType类型的数据,订阅者方面通过传入的EventType.class来过滤接收该数据,如果有另一个订阅者B是传入XXType.class(不符合post的类型),那么bus. toObserverable(Class<T> eventType)就会过滤掉该事件,这样订阅者B就不会接收到任何数据 :)
  • 693105fbc9cb:我要点赞! :grin:
  • woniu0936:RxBus.getDefault().toObserverable(OperateEvent.class)
    .compose(this.<OperateEvent>bindToLifecycle())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Action1<OperateEvent>() {
    @Override
    public void call(OperateEvent operateEvent) {
    String operate = operateEvent.getOperate();
    LogUtil.d(TAG, "--------rxSub-----operate----" + operate);
    }
    });
    woniu0936:@YoKey 你说的对,检查了一下,发现好像就是RxLifecycle的问题,只要加上.compose(this.<OperateEvent>bindToLifecycle())就不起作用了,去掉一切正常,看来是和RxLifecycle的兼容有问题
    YoKey:@woniu0936 好像很神奇~ .observeOn(AndroidSchedulers.mainThread()) 这个是观察者所在的线程,在你的这个场景下肯定是无关紧要的;你检查下你用的RxLifecycle的使用
    woniu0936:这是我的订阅的写法,这样写log打印不出来,但是注释掉.compose(this.<OperateEvent>bindToLifecycle())和.observeOn(AndroidSchedulers.mainThread())之后log就能打印出来了,这是为什么呢?
  • woniu0936:有点疑问请教一下:有两个页面a和b,a用来发送event,b用来接收,我想问的是,当a页面post一个event之后,b接收到,然后处理完,这个event还存在于PublishSubject中吗?
    李福来:@woniu0936 我在 Subscription subscription=NetWork.getApi().getSignUp(name,pass)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Action1<RegisterBean>() {
    @Override
    public void call(RegisterBean bean) {
    LogUtils.d("Regis", bean.getMessage());
    LogUtils.d("Regis", bean.getStatus() + "");
    if (bean.getStatus() == 2000) {
    LogUtils.d("Regis2",bean.getData().getUserId()+";;;");
    RxBus.getDefaultInstance().post(new UserEvent(bean.getData().getUserId()));
    mView.showLoading();
    } else {
    mView.showError(bean.getMessage());
    }
    }
    });
    addSubscribe(subscription);
    进行的发布,但无法在另外一个Activity进行接收到数据
    woniu0936:@YoKey 哦,谢谢解惑,我之所以这么问就是怕出现这种情况:a,b,c三个页面,a页post事件,b接收,c是一般页面,b跳转a,a页post事件之后跳转b,b处理此事件,之后跳转c,当c再次跳转b的时候,我怕a页post的那个事件如果还留在PublishSubject中时,c跳转b会再次执行b处理a页post的那个事件。
    YoKey:@woniu0936 PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者。 文章中有个图,在你说的例子中,其实b接收到处理完,这个event就没有意义了

    如果你想发射完之后,后续可能还需要用到该event,你可以看下RxJava中的BehaviorSubject和ReplaySubject
    http://reactivex.io/documentation/subject.html :smile:
  • 谢三弟:用 ofType(eventType); 如果是有多个 event 怎么办呢?
    Genovia:多种event的话有另一种方式,不需要多次subscribe:
    1. 在RxBus里:
    public Observable<Object> toObserverable() {
    return _bus;
    }

    2. 在订阅接收处:
    _rxBus.toObserverable()
    .subscribe(new Action1<Object>() {
    @Override
    public void call(Object event) {

    if(event instanceof TapEvent) {
    _showTapText();

    }else if(event instanceof SomeOtherEvent) {
    _doSomethingElse();
    }
    }
    });

    参考:http://blog.kaush.co/2014/12/24/implementing-an-event-bus-with-rxjava-rxbus/
    谢三弟:@YoKey 谢谢 Yo哥
    YoKey:@谢三哥 那就分别.toObserverable(Event.class) 多个Event啊
    EventBus/otto 对于多event 也只能多次 @Subscribe :)
  • 牛晓伟:多谢帮助
  • __Berial___:EventBus 里有一个 StickyEvent 的概念,用 RxBus 如何实现..
    __Berial___:@YoKey 谢谢,都试了一下,还是ReplaySubject比较靠谱
    YoKey:@__Berial___ 你可以看下RxJava中的BehaviorSubject和ReplaySubject
    http://reactivex.io/documentation/subject.html :smile:
  • 1fe8a580f69f:请问 rxSubscription.unsubscribe 写在 onDestroy 是否合适?
    因为 app 如果被系统回收的, onDestroy 不会被 call
    YoKey:@JasonNi 没问题的 被回收的情况下 app的进程都被杀死了 就谈不上内存泄漏啦
  • c7f46fcf25a3:为什么订阅者在subscribe的call方法里收到了两次事件通知呢?打印了两次,好奇怪。
    woniu0936:@赎回光阴198 如果你订阅了两次,就出出现这种情况,仔细检查你的代码
    小蜗牛snamon:@赎回光阴198 估计你是string类型。然后发送了二次
    YoKey:@赎回光阴198 :) 额 应该不会吧,你是不是发送了同样类型的数据2次 =。=
  • Bigmercu:你好,我用的是MVP开发,有一点疑惑的地方。
    什么时候用RxBus,什么时候用接口传输数据?
    可以直接在V层发送数据吗?
    我在M层的订阅怎么样unsubscribe()防止内存泄露,或者不需要unsubscribe(),在M层没有OnDestroy了,所以不知道怎么unsubscribe()?
    问题比较多,因为初学,谢谢@YoKey
    爱吃鱼的外星人:我在onDestory中取消订阅,因为用的是单例,到下一个页面时候,发现下个页面也被取消订阅了,A startB->B onCreate-->A onDestory ,是不是应该在B中onResume中订阅?
    Bigmercu:@YoKey 谢谢解惑,我觉得我对BUS的用处应该仔细研究一下,刚刚问的问题有点白痴,BUS不应该用在MVP中V和M的通信吧,V和M的通信还是应该通过Presenter,第三点给了我方法,让我知道怎么取消订阅。
    YoKey:@Bigmercu :smiley:
    1、这里你说的接口传输数据是指回调函数吧,比如Fragment与Activity的通信,通过Bus的方式和接口回调最终其实都是达到同样的目的,可以随意选择的,但是Bus的方式明显是比接口回调方便使用的,而且解耦更出色,这对MVP是很重要的一个点,对于这2种方式都要注意:在生命周期结束的地方记得取消订阅/注册!
    2、至于能否在V层直接发送数据,不是很理解你的意思,按理来说正常的MVP的V层是通过Presenter和M层交互的。
    3、如何取消订阅:在Activity/Fragment的onDestory方法里执行Presenter.onDetach(),在Presenter里的onDetach方法里,model.unsubscribe()即可,把事件通过P传递过去就好啦`
  • 4afd8897e730:这个能用于activity之间传递数据吗
    75a2c378070f:我也尝试了下, 不行啊, @YoKey
    556456d1f836:我试着在两个activity之间传递对象,不行!
    YoKey:@x炳泽啧啧啧啧啧 完全可以用于Activity之间 数据的通知通信,但是如果只是类似A启动B时传递数据的话,强烈推荐用系统的intent ! 请不要轻易尝试用EventBus取代intent,因为在app因资源不足被强杀时,系统会帮你保存intent里的数据!
  • EvilsoulM:公有的构造函数也不是单例吧。。。
    YoKey:@EvilsoulM 当然不能算我们正常说的“单例”啦,这里单例是指获取单例的方式;
    至于公有的构造函数其实是为了 在某些模块化的场景下使用EventBus提供的。 如果你有看greenrobot的EventBus的库里的源码的话,它也提供了公有的构造函数 :smile:
  • ailurus:前两天看的时候还没有改成 ofType ,刚才一看发现了改了,果真是每次看都会有新收获啊。
    小鄧子:@ailurus 涛哥 :kissing_heart:
    ailurus:@YoKeyword 我鄧威武!
    YoKey:@ailurus 哈哈,@小鄧子 给的提示,多多分享,多多交流!
  • RidingWind2023:刚开始用简书 关注博主中 有收获 谢博主
  • 牛晓伟:请问 您是否已经应用到自己的项目中,是否遇到啥问题?我也正想用RxBUs
    YoKey:@牛晓伟 在上一个项目中用了,暂时没发现问题。 现在项目是改版老项目,用的EventBus :joy:
  • 皮球二二:你好,请问下一个subject可以有多个订阅者同时存在吗?这里如果频繁的调用toObservable方法,会造成覆盖吗?
    YoKey:@ac2403e3a6d6 嗯嗯 是的 除了Subject还有ConnectableObservable :smile:
    tanghuailong:@YoKey :smile: 补充一下,普通的Observable 是不支持多播的。一般用这俩ConnectableObservable 和 Subject才支持多播
    YoKey:@r17171709 可以有多个订阅者
    不同Subscriber当然可以订阅同一个Observable
  • 小鄧子:可是我们还是需要手动处理Scheduler和异常。而且这样还是需要RxJava基础。
  • Lollo:Yo神棒棒的。。。 :sunglasses:
    YoKey:@Lollo :joy: :joy:

本文标题:用RxJava实现事件总线(Event Bus)

本文链接:https://www.haomeiwen.com/subject/otphhttx.html