美文网首页Android DevAndroid android框架学习
用RxJava实现事件总线(Event Bus)

用RxJava实现事件总线(Event Bus)

作者: YoKey | 来源:发表于2015-11-11 17:30 被阅读52183次

    目前大多数开发者使用EventBus或者Otto作为事件总线通信库,对于RxJava使用者来说,RxJava也可以轻松实现事件总线,因为它们都依据于观察者模式。

    不多说,上代码

    /**
    * RxBus
    * Created by YoKeyword on 2015/6/17.
    */
    public class RxBus {
        private static volatile RxBus defaultInstance;
    
        private final Subject<Object, Object> bus;
        // PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者
        public RxBus() {
          bus = new SerializedSubject<>(PublishSubject.create());
        }
        // 单例RxBus
        public static RxBus getDefault() {
            if (defaultInstance == null) {
                synchronized (RxBus.class) {
                    if (defaultInstance == null) {
                        defaultInstance = new RxBus();
                    }
                }
            }
            return defaultInstance ;
        }
        // 发送一个新的事件
        public void post (Object o) {
            bus.onNext(o);
        }
        // 根据传递的 eventType 类型返回特定类型(eventType)的 被观察者
        public <T> Observable<T> toObservable (Class<T> eventType) {
            return bus.ofType(eventType);
    //        这里感谢小鄧子的提醒: ofType = filter + cast
    //        return bus.filter(new Func1<Object, Boolean>() {
    //            @Override
    //            public Boolean call(Object o) {
    //                return eventType.isInstance(o);
    //            }
    //        }) .cast(eventType);
        }
    }
    

    注:
    1、Subject同时充当了Observer和Observable的角色,Subject是非线程安全的,要避免该问题,需要将 Subject转换为一个 SerializedSubject
    ,上述RxBus类中把线程非安全的PublishSubject包装成线程安全的Subject。

    2、PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者。

    3、ofType操作符只发射指定类型的数据,其内部就是filter+cast(这里非常感谢@小鄧子 的提醒)

    public final <R> Observable<R> ofType(final Class<R> klass) {
        return filter(new Func1<T, Boolean>() {
            @Override
            public final Boolean call(T t) {
                return klass.isInstance(t);
            }
        }).cast(klass);
    }
    

    filter操作符可以使你提供一个指定的测试数据项,只有通过测试的数据才会被“发射”。
    cast操作符可以将一个Observable转换成指定类型的Observable。

    分析:

    RxBus工作流程图

    1、首先创建一个可同时充当Observer和Observable的Subject;

    2、在需要接收事件的地方,订阅该Subject(此时Subject是作为Observable),在这之后,一旦Subject接收到事件,立即发射给该订阅者;

    3、在我们需要发送事件的地方,将事件post至Subject,此时Subject作为Observer接收到事件(onNext),然后会发射给所有订阅该Subject的订阅者。

    对于RxBus的使用,就和普通的RxJava订阅事件很相似了。
    先看发送事件的代码:

    RxBus.getDefault().post(new UserEvent (1, "yoyo"));
    

    userEvent是要发送的事件,如果你用过EventBus, 很容易理解,UserEvent的代码:

    public class UserEvent {
        long id;
        String name;
        public UserEvent(long id,String name) {
            this.id= id;
            this.name= name;
        }
        public long getId() {
            return id;
        }
        public String getName() {
            return name;
        }
    }
    

    再看接收事件的代码:

    // rxSubscription是一个Subscription的全局变量,这段代码可以在onCreate/onStart等生命周期内
    rxSubscription = RxBus.getDefault().toObserverable(UserEvent.class)
            .subscribe(new Action1<UserEvent>() {
                   @Override
                   public void call(UserEvent userEvent) {
                       long id = userEvent.getId();
                       String name = userEvent.getName();
                       ...
                   }
               },
            new Action1<Throwable>() {
                @Override
                public void call(Throwable throwable) {
                    // TODO: 处理异常
                }        
            });
    

    最后,一定要记得在生命周期结束的地方取消订阅事件,防止RxJava可能会引起的内存泄漏问题。

    @Override
    protected void onDestroy() {
        super.onDestroy();
        if(!rxSubscription.isUnsubscribed()) {
            rxSubscription.unsubscribe();
        }
    }
    

    这样,一个简单的Event Bus就实现了!如果你的项目已经开始使用RxJava,也许可以考虑替换掉EventBus或Otto,减小项目体积。

    RxBus、EventBus因为解耦太彻底,滥用的话,项目可维护性会越来越低;一些简单场景更推荐用回调、Subject来代替事件总线。

    感兴趣的可以阅读我另外2篇深入RxBus的文章:
    [深入RxBus:[支持Sticky事件]](http://www.jianshu.com/p/71ab00a2677b)
    深入RxBus:[异常处理]

    参考:
    http://nerds.weddingpartyapp.com/tech/2014/12/24/implementing-an-event-bus-with-rxjava-rxbus/

    相关文章

      网友评论

      • WeroNG:之前没使用过eventbus,看了好多文章,终于看懂了,感谢:blush:
      • 杨晓是大V:哈哈,支持支持
      • GIndoc:博主,我遇到了一个问题:当我点击recyclerView中item的button时,post了一次,但是call方法却接收到了6次!!!而我用behaviorSubject代替publishSubject,就只接收一次!!这是为什么呢?
        GIndoc:找到问题所在了,其实还是多次subscribe的问题,我是在fragment中subscribe的,当实例化多个相同fragment时就subscribe多次了,真是悲剧:sweat: 还是要感谢作者,然后,赶紧改代码去。。
      • 性冷淡_:rxbus的线程处理,我个人感觉就是高阶接口玩法
      • 偶贱贱滴:那这个是不是类似于广播,和Activity直接的数据传递了?
      • WangGavin:可以更新为最新的rxjava2吗?
      • Poseidon_Wang:还有就是你现在是以消息体的类别来筛选,eventBus里面是以tag来筛选,我觉得后者的方式更可取,我是不是应该封装Event,加tag属性
      • Poseidon_Wang:现在设计模式来讲,我是不是只能将我的List<Observable> 维护在我的BaseActivity内,在Ondestroy的时候全部释放,作者您在项目中是怎么处理的,我先在的处理是在BaseActivity内封装一个接消息的方法,每次接的消息体不一样,就往List里面加一个Observable,但是我觉得这样就有点违反单一原则
      • 73ece15c815f:在碎片中什么时候unsubscribe(); 我已经完成加载登录后的界面了
      • 炸鸡叔:使用之后 一直内存泄漏, 是我使用姿势不对?
        YoKey:@炸鸡叔 销毁时要取消订阅~
      • 4d02f9585b9b:问个问题,有没有碰到过执行在同一个链子里。
        不断执行onNext,然后在几千次后变得很卡的情况。
        我查过是因为publishSubject中的subjectObserver在每次onNext都会变多,导致每次执行那个循环时间都增加了,但是自己想不出怎么解决。
      • 妙法莲花1234:订阅事件要先运行了吧, 这样发送事件post后才能订阅到。不像eventbus,订阅方法是孤立的,他自己去查找。
      • 5c3ad025c12e:我用LeakCanary检测一直内存泄漏,才发现我写的
        if(mSubscription.isUnsubscribed()){
        mSubscription.unsubscribe();
        }
        把isUnsubscribed理解成未取消订阅了
      • Greathfs:不错
      • bdd63c599a2c:你好 ,如果是在post(objcet o);这个方法里面加入延迟delay。需要怎么封装?谢谢
        bdd63c599a2c:@YoKey :blush: 哦这样哦。
        YoKey:@bdd63c599a2c 建议在订阅方使用delay操作符,按需自由delay
      • 青蛙要fly:我知道了。你是UserEvent里面多了个空格,变成了User Event。好吧
        YoKey:@青蛙要fly :joy: 是的 手抖了~
      • 青蛙要fly:那个UserEvent的类里面那个构造函数是不是写错了。
        public User Event(long id,String name) {
        this.id= id;
        this.name= name;
        }
        你是写成这样的。返回是User,而且还没return,所以我理解你这个是要写构造函数,那也不应该是Event,应该是UserEvent,是不是这样
        public UserEvent(long id,String name){
        this.id = id;
        this.name = name;
        }
      • leilifengxingmw:我想问一下 //发送一个事件
        public void post(Object object) {
        bus.onNext(object);
        }
        这个方法所在的线程是什么个情况,在那个线程都可以使用吗?

        YoKey:@humanheima 取决你post时所在的线程
      • cd17d98f379f:呃,突然想明白了。谢谢。我把顺序搞反了,是先有订阅动作,再有发送动作,然后才能调起订阅者的执行方法来着。按代码顺序看想当然的想成先发送,再订阅。。给绕糊涂了
      • cd17d98f379f:为什么发送消息用的是onNext()方法呢,这明明是观察者的方法啊。绕了一圈源码,还是没明白,望解答!谢谢
        YoKey:@八爪c Subject.onNext() 此时的Subject是作为Observable的 (Subject同时充当Observer和Observable的角色)
      • KingJA:发送延迟事件的时候,怎么防止其他普通事件的观察者收到延迟事件。
      • 柴泽建_Jack:看不懂这里:
        public static RxBus getDefault() {
        if (defaultInstance == null) {
        synchronized (RxBus.class) {
        if (defaultInstance == null) {
        defaultInstance = new RxBus();
        }
        }
        }
        return rxBus;
        }
        return 的rxBus哪里来的?
        YoKey:@柴泽建_Jack 手误啦~ 已经修正 :joy:
      • 李福来:我是在Presenter中 @Override
        public void getRegister(String name, String pass) {
        NetWork.getApi().getSignUp(name,pass)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Action1<RegisterBean>() {
        @Override
        public void call(RegisterBean bean) {
        LogUtils.d("Regis", bean.getMessage());
        LogUtils.d("Regis", bean.getStatus() + "");
        if (bean.getStatus() == 2000) {
        RxBus.getDefaultInstance().post(new UserEvent(bean.getData().getUserId()));
        mView.showLoading();
        } else {
        mView.showError(bean.getMessage());
        }
        }
        });


        }
        73ece15c815f:@TellH 哈哈,学习之余 遇到搞笑的咯 哈哈 在这里感谢YoKey哦,学到很多东西
        68768b474bfc:@李福来 这位叫Override的老兄估计要被@好多次 :joy:
        YoKey:@李福来 你需要确保 接收的Activity在订阅状态中,即:已经创建,并且通过
        RxBus.getDefault().toObservable(Event.class).subscribe(xxx); 在订阅状态中。

        不满足上述条件,肯定是接受不到的 :blush:
      • 李福来:我使用的RxBus无法进行传递数据
      • CY_Frank:多谢
      • 然小七:mark 谢大神分享~
      • 小蜗牛snamon:我想问下:如果我在一个Activity里面注册有二个相同的类型 比如String,这个时候我post一个string 那不是二个都可以收到 。而其实我只想要一个Obserable收到。
        小蜗牛snamon:@YoKey 还有一个问题 如果是这样不是很多这样的event?而我只想要传递一个string 。。 求解答 :fist::fist:
        小蜗牛snamon:@YoKey 也就是说post的对象 在注册的时候要保证是唯一的吗?是不是你代码都是这样封装 我现在是在rxbus里面加了一个map key是我定义的类型,value是subject对象。 eventbus是把要发送的数据都封装成了一个event吗 我没研究过
        YoKey:@小蜗牛snamon 你把这个属性封装成Event对象:
        比如:class EventOne{ String str; }
        class EventTwo{ String str; }
        这样post对应Event就可以区别了,同时也建议post的对象尽量都封装成Event对象
      • 47287ba8b36e:我想请问一下
        1、如果我有个订阅想获取的是一个ArrayList,toObserverable的参数应该传什么Class呢?
        2、如果我读取数据库读取一个ArrayList成功后,我把数据存在一个单例,但是我又想通知多个fragment呢?toObserverable又应该怎么传数据?或者说我数据存在单例,只是发送一个通知告诉他这个内容的数据单例有更新,然后fragment再访问这个数据单例拿数据
        47287ba8b36e:ReplaySubject、AsyncSubject好像有类似的功能,不知道能否用上
        47287ba8b36e:感谢您的指导,又有了新的问题,再请教一下您
        1、若我在ViewPager中使用异步的加载数据,就是真正显示给用户的时候再去渲染数据,所以我是想真正显示的时候再去订阅,而不是presenter onAttach的时候订阅,但是这样我就拿不到订阅之前发射的last数据,而且订阅的数据如果没有变化的情况下我也是presenter不需要获取到再次渲染的即是要distinct数据,我知道有这些过滤操作符,但是不知道怎么跟RxBus结合
        2、是否能加一下我QQ联系方式 983064150,最喜欢认识大神了
        YoKey:@AmosZhong
        1、建议新创建一个Event类,里面的属性就是你的ArrayList,这样是最规范最安全的;
        2、各个Fragment作为订阅者 toObservable()该ArrayList事件即可,通知的话直接post(该ArrayList属的Event) :smile:
      • cb6d8daf3ca1:我还不是不明白一个问题。rxbus是一个全局的。整个项目中都用它去发送消息。但是怎么区分那个发送是那个界面去接收呢?EventBus是根据post的对象来区分到底是那个东西接收。难道rxbus 要自己传入tag 自己标记?
        YoKey:@zouxianbincc 原理其实是一样的,都是基于观察者模式,RxBus通过post发射一个EventType类型的数据,订阅者方面通过传入的EventType.class来过滤接收该数据,如果有另一个订阅者B是传入XXType.class(不符合post的类型),那么bus. toObserverable(Class<T> eventType)就会过滤掉该事件,这样订阅者B就不会接收到任何数据 :)
      • 693105fbc9cb:我要点赞! :grin:
      • woniu0936:RxBus.getDefault().toObserverable(OperateEvent.class)
        .compose(this.<OperateEvent>bindToLifecycle())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Action1<OperateEvent>() {
        @Override
        public void call(OperateEvent operateEvent) {
        String operate = operateEvent.getOperate();
        LogUtil.d(TAG, "--------rxSub-----operate----" + operate);
        }
        });
        woniu0936:@YoKey 你说的对,检查了一下,发现好像就是RxLifecycle的问题,只要加上.compose(this.<OperateEvent>bindToLifecycle())就不起作用了,去掉一切正常,看来是和RxLifecycle的兼容有问题
        YoKey:@woniu0936 好像很神奇~ .observeOn(AndroidSchedulers.mainThread()) 这个是观察者所在的线程,在你的这个场景下肯定是无关紧要的;你检查下你用的RxLifecycle的使用
        woniu0936:这是我的订阅的写法,这样写log打印不出来,但是注释掉.compose(this.<OperateEvent>bindToLifecycle())和.observeOn(AndroidSchedulers.mainThread())之后log就能打印出来了,这是为什么呢?
      • woniu0936:有点疑问请教一下:有两个页面a和b,a用来发送event,b用来接收,我想问的是,当a页面post一个event之后,b接收到,然后处理完,这个event还存在于PublishSubject中吗?
        李福来:@woniu0936 我在 Subscription subscription=NetWork.getApi().getSignUp(name,pass)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Action1<RegisterBean>() {
        @Override
        public void call(RegisterBean bean) {
        LogUtils.d("Regis", bean.getMessage());
        LogUtils.d("Regis", bean.getStatus() + "");
        if (bean.getStatus() == 2000) {
        LogUtils.d("Regis2",bean.getData().getUserId()+";;;");
        RxBus.getDefaultInstance().post(new UserEvent(bean.getData().getUserId()));
        mView.showLoading();
        } else {
        mView.showError(bean.getMessage());
        }
        }
        });
        addSubscribe(subscription);
        进行的发布,但无法在另外一个Activity进行接收到数据
        woniu0936:@YoKey 哦,谢谢解惑,我之所以这么问就是怕出现这种情况:a,b,c三个页面,a页post事件,b接收,c是一般页面,b跳转a,a页post事件之后跳转b,b处理此事件,之后跳转c,当c再次跳转b的时候,我怕a页post的那个事件如果还留在PublishSubject中时,c跳转b会再次执行b处理a页post的那个事件。
        YoKey:@woniu0936 PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者。 文章中有个图,在你说的例子中,其实b接收到处理完,这个event就没有意义了

        如果你想发射完之后,后续可能还需要用到该event,你可以看下RxJava中的BehaviorSubject和ReplaySubject
        http://reactivex.io/documentation/subject.html :smile:
      • 谢三弟:用 ofType(eventType); 如果是有多个 event 怎么办呢?
        Genovia:多种event的话有另一种方式,不需要多次subscribe:
        1. 在RxBus里:
        public Observable<Object> toObserverable() {
        return _bus;
        }

        2. 在订阅接收处:
        _rxBus.toObserverable()
        .subscribe(new Action1<Object>() {
        @Override
        public void call(Object event) {

        if(event instanceof TapEvent) {
        _showTapText();

        }else if(event instanceof SomeOtherEvent) {
        _doSomethingElse();
        }
        }
        });

        参考:http://blog.kaush.co/2014/12/24/implementing-an-event-bus-with-rxjava-rxbus/
        谢三弟:@YoKey 谢谢 Yo哥
        YoKey:@谢三哥 那就分别.toObserverable(Event.class) 多个Event啊
        EventBus/otto 对于多event 也只能多次 @Subscribe :)
      • 牛晓伟:多谢帮助
      • __Berial___:EventBus 里有一个 StickyEvent 的概念,用 RxBus 如何实现..
        __Berial___:@YoKey 谢谢,都试了一下,还是ReplaySubject比较靠谱
        YoKey:@__Berial___ 你可以看下RxJava中的BehaviorSubject和ReplaySubject
        http://reactivex.io/documentation/subject.html :smile:
      • 1fe8a580f69f:请问 rxSubscription.unsubscribe 写在 onDestroy 是否合适?
        因为 app 如果被系统回收的, onDestroy 不会被 call
        YoKey:@JasonNi 没问题的 被回收的情况下 app的进程都被杀死了 就谈不上内存泄漏啦
      • c7f46fcf25a3:为什么订阅者在subscribe的call方法里收到了两次事件通知呢?打印了两次,好奇怪。
        woniu0936:@赎回光阴198 如果你订阅了两次,就出出现这种情况,仔细检查你的代码
        小蜗牛snamon:@赎回光阴198 估计你是string类型。然后发送了二次
        YoKey:@赎回光阴198 :) 额 应该不会吧,你是不是发送了同样类型的数据2次 =。=
      • Bigmercu:你好,我用的是MVP开发,有一点疑惑的地方。
        什么时候用RxBus,什么时候用接口传输数据?
        可以直接在V层发送数据吗?
        我在M层的订阅怎么样unsubscribe()防止内存泄露,或者不需要unsubscribe(),在M层没有OnDestroy了,所以不知道怎么unsubscribe()?
        问题比较多,因为初学,谢谢@YoKey
        爱吃鱼的外星人:我在onDestory中取消订阅,因为用的是单例,到下一个页面时候,发现下个页面也被取消订阅了,A startB->B onCreate-->A onDestory ,是不是应该在B中onResume中订阅?
        Bigmercu:@YoKey 谢谢解惑,我觉得我对BUS的用处应该仔细研究一下,刚刚问的问题有点白痴,BUS不应该用在MVP中V和M的通信吧,V和M的通信还是应该通过Presenter,第三点给了我方法,让我知道怎么取消订阅。
        YoKey:@Bigmercu :smiley:
        1、这里你说的接口传输数据是指回调函数吧,比如Fragment与Activity的通信,通过Bus的方式和接口回调最终其实都是达到同样的目的,可以随意选择的,但是Bus的方式明显是比接口回调方便使用的,而且解耦更出色,这对MVP是很重要的一个点,对于这2种方式都要注意:在生命周期结束的地方记得取消订阅/注册!
        2、至于能否在V层直接发送数据,不是很理解你的意思,按理来说正常的MVP的V层是通过Presenter和M层交互的。
        3、如何取消订阅:在Activity/Fragment的onDestory方法里执行Presenter.onDetach(),在Presenter里的onDetach方法里,model.unsubscribe()即可,把事件通过P传递过去就好啦`
      • 4afd8897e730:这个能用于activity之间传递数据吗
        75a2c378070f:我也尝试了下, 不行啊, @YoKey
        556456d1f836:我试着在两个activity之间传递对象,不行!
        YoKey:@x炳泽啧啧啧啧啧 完全可以用于Activity之间 数据的通知通信,但是如果只是类似A启动B时传递数据的话,强烈推荐用系统的intent ! 请不要轻易尝试用EventBus取代intent,因为在app因资源不足被强杀时,系统会帮你保存intent里的数据!
      • EvilsoulM:公有的构造函数也不是单例吧。。。
        YoKey:@EvilsoulM 当然不能算我们正常说的“单例”啦,这里单例是指获取单例的方式;
        至于公有的构造函数其实是为了 在某些模块化的场景下使用EventBus提供的。 如果你有看greenrobot的EventBus的库里的源码的话,它也提供了公有的构造函数 :smile:
      • ailurus:前两天看的时候还没有改成 ofType ,刚才一看发现了改了,果真是每次看都会有新收获啊。
        小鄧子:@ailurus 涛哥 :kissing_heart:
        ailurus:@YoKeyword 我鄧威武!
        YoKey:@ailurus 哈哈,@小鄧子 给的提示,多多分享,多多交流!
      • RidingWind2023:刚开始用简书 关注博主中 有收获 谢博主
      • 牛晓伟:请问 您是否已经应用到自己的项目中,是否遇到啥问题?我也正想用RxBUs
        YoKey:@牛晓伟 在上一个项目中用了,暂时没发现问题。 现在项目是改版老项目,用的EventBus :joy:
      • 皮球二二:你好,请问下一个subject可以有多个订阅者同时存在吗?这里如果频繁的调用toObservable方法,会造成覆盖吗?
        YoKey:@ac2403e3a6d6 嗯嗯 是的 除了Subject还有ConnectableObservable :smile:
        tanghuailong:@YoKey :smile: 补充一下,普通的Observable 是不支持多播的。一般用这俩ConnectableObservable 和 Subject才支持多播
        YoKey:@r17171709 可以有多个订阅者
        不同Subscriber当然可以订阅同一个Observable
      • 小鄧子:可是我们还是需要手动处理Scheduler和异常。而且这样还是需要RxJava基础。
      • Lollo:Yo神棒棒的。。。 :sunglasses:
        YoKey:@Lollo :joy: :joy:

      本文标题:用RxJava实现事件总线(Event Bus)

      本文链接:https://www.haomeiwen.com/subject/otphhttx.html