美文网首页技术干货儿安卓之美Rxjava
[深入RxBus]:支持Sticky事件

[深入RxBus]:支持Sticky事件

作者: YoKey | 来源:发表于2016-07-21 12:45 被阅读9679次

RxBus、EventBus因为解耦太彻底,滥用的话,项目可维护性会越来越低;一些简单场景更推荐用回调、Subject来代替事件总线。

实际使用场景,如果RxBus,EventBus二选一,我更倾向于使用EventBus, RxJava专注工作流,EventBus专注事件总线,职责更清晰

有段时间没更了,几个月前,我写过一篇实现简单的RxBus文章: 用RxJava实现事件总线

在实际环境中,你会发现RxBus还是有一些问题的。

  • 你需要RxBus支持Sticky功能。
  • 你会发现在你订阅了某个事件后,在后续接收到该事件时,处理的过程中发生了异常,你可能会发现后续的事件都接收不到了!

我将分2篇文章分别给出其方案,这篇介绍如何实现Sticky,另外一篇介绍RxBus中的异常处理方案:
深入RxBus:[异常处理]


什么是Sticky事件?

在Android开发中,Sticky事件只指事件消费者在事件发布之后才注册的也能接收到该事件的特殊类型。Android中就有这样的实例,也就是Sticky Broadcast,即粘性广播。正常情况下如果发送者发送了某个广播,而接收者在这个广播发送后才注册自己的Receiver,这时接收者便无法接收到刚才的广播,为此Android引入了StickyBroadcast,在广播发送结束后会保存刚刚发送的广播(Intent),这样当接收者注册完Receiver后就可以接收到刚才已经发布的广播。这就使得我们可以预先处理一些事件,让有消费者时再把这些事件投递给消费者。

Subject

我们在实现简单的RxBus时使用了PublishSubject,其实RxJava提供给开发者4种Subject
PublishSubject,BehaviorSubject ,BehaviorSubject,AsyncSubject。

  1. PublishSubject只会给在订阅者订阅的时间点之后的数据发送给观察者。
  • BehaviorSubject在订阅者订阅时,会发送其最近发送的数据(如果此时还没有收到任何数据,它会发送一个默认值)。

  • ReplaySubject在订阅者订阅时,会发送所有的数据给订阅者,无论它们是何时订阅的。

  • AsyncSubject只在原Observable事件序列完成后,发送最后一个数据,后续如果还有订阅者继续订阅该Subject, 则可以直接接收到最后一个值。

Subject

从上图来看,似乎BehaviorSubjectReplaySubject具备Sticky的特性。

BehaviorSubject方案

BehaviorSubject似乎完全符合Sticky的定义,但是你发现了它只能保存最近的那个事件。

有这样的场景:如果订阅者A订阅了Event1,订阅者B订阅了Event2,而此时BehaviorSubject事件队列里是[..., Event2, Event1],当订阅者订阅时,因为保存的是最近的事件:即Event1,所以订阅者B是接收不到Event2的。

解决办法就是:
每个Event类型都各自创建一个对应的BehaviorSubject,这样的话资源开销比较大,并且该Sticky事件总线和普通的RxBus事件总线不能共享,即:普通事件和Sticky事件是独立的,因为普通事件是基于PublishSubject, 暂时放弃该方案!

ReplaySubject方案

ReplaySubject可以保存发送过的所有数据事件。

因为保存了所有的数据事件,所以不管什么类型的Event,我们只要过滤该类型,并让其发送最近的那个Event即可满足Sticky事件了。但是获取最近的对应事件是个难点,因为最符合需求的操作符takeLast()仅在订阅事件结束时(即:onCompleted())才会发送最后的那个数据事件,而我们的RxBus正常情况下应该是尽量避免其订阅事件结束的。(我没能找到合适的操作符,如果你知道,请告知我)

所以BehaviorSubject也是比较难实现Sticky特性的。

并且,不管是BehaviorSubject还是ReplaySubject,它们还有一个棘手的问题:它们实现的EventBus和普通的RxBus(基于PublishSubject)之间的数据是相互独立的!

总结:BehaviorSubjectBehaviorSubject都不能天然适合Sticky事件......

使用Map实现Sticky

该方法思路是在原来PublishSubject实现的RxBus基础上,使用ConcurrentHashMap<事件类型,事件>保存每个事件的最近事件,不仅能实现Sticky特性,最重要的是可以和普通RxBus的事件数据共享,不独立

因为我们的RxBus是基于PublishSubject的,而RxJava又有4种Subject,而且其中的BehaviorSubjectReplaySubject看起来又符合Sticky特性,所以我们可能会钻这个牛角尖,理所当然的认为实现Sticky需要通过其他类型的Subject.... (好吧,我钻进去了...)

这个方案的思路我是根据EventBus的实现想到的,下面是大致流程:

  1. Map的初始化:
   private final Map<Class<?>, Object> mStickyEventMap;

    public RxBus() {
        mBus = new SerializedSubject<>(PublishSubject.create());
        mStickyEventMap = new ConcurrentHashMap<>();
    }

ConcurrentHashMap是一个线程安全的HashMap, 采用stripping lock(分离锁),效率比HashTable高很多。

  • 在我们postSticky(Event)时,存入Map中:
public void postSticky(Object event) {
     synchronized (mStickyEventMap) {
         mStickyEventMap.put(event.getClass(), event);
     } 
     post(event); 
}
  • 订阅时toObservableSticky(Class<T> eventType),先从Map中寻找是否包含该类型的事件,如果没有,则说明没有Sticky事件要发送,直接订阅Subject(此时作为被观察者Observable);如果有,则说明有Sticky事件需要发送,订阅merge(Subject 和 Sticky事件)
  public <T> Observable<T> toObservableSticky(final Class<T> eventType) {
        synchronized (mStickyEventMap) {
            Observable<T> observable = mBus.ofType(eventType);
            final Object event = mStickyEventMap.get(eventType);

            if (event != null) {
                return observable.mergeWith(Observable.create(new Observable.OnSubscribe<T>() {
                    @Override
                    public void call(Subscriber<? super T> subscriber) {
                        subscriber.onNext(eventType.cast(event));
                    }
                }));
            } else {
                return observable;
            }
        }
    }

merge操作符:可以将多个Observables合并,就好像它们是单个的Observable一样。

这样,Sticky的核心功能就完成了,使用上和普通RxBus一样,通过postSticky()发送事件,toObservableSticky()订阅事件。

除此之外,我还提供了getStickyEvent(Class<T> eventType),removeStickyEvent(Class<T> eventType),removeAllStickyEvents()方法,供查找、移除对应事件类型的事件、移除全部Sticky事件。

重要的事

在使用Sticky特性时,在不需要某Sticky事件时, 通过removeStickyEvent(Class<T> eventType)移除它,最保险的做法是:在主Activity的onDestroyremoveAllStickyEvents()
因为我们的RxBus是个单例静态对象,再正常退出app时,该对象依然会存在于JVM,除非进程被杀死,这样的话导致StickyMap里的数据依然存在,为了避免该问题,需要在app退出时,清理StickyMap。

// 主Activity(一般是栈底Activity)
@Override
protected void onDestroy() {
    super.onDestroy();
    // 移除所有Sticky事件
    RxBus.getDefault().removeAllStickyEvents();
}

完整代码

下面是支持Sticky的完整RxBus代码:

/**
 * PublishSubject: 只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者
 * <p>
 * Created by YoKeyword on 2015/6/17.
 */
public class RxBus {
    private static volatile RxBus mDefaultInstance;
    private final Subject<Object, Object> mBus;

    private final Map<Class<?>, Object> mStickyEventMap;

    public RxBus() {
        mBus = new SerializedSubject<>(PublishSubject.create());
        mStickyEventMap = new ConcurrentHashMap<>();
    }

    public static RxBus getDefault() {
        if (mDefaultInstance == null) {
            synchronized (RxBus.class) {
                if (mDefaultInstance == null) {
                    mDefaultInstance = new RxBus();
                }
            }
        }
        return mDefaultInstance;
    }

    /**
     * 发送事件
     */
    public void post(Object event) {
        mBus.onNext(event);
    }

    /**
     * 根据传递的 eventType 类型返回特定类型(eventType)的 被观察者
     */
    public <T> Observable<T> toObservable(Class<T> eventType) {
        return mBus.ofType(eventType);
    }

    /**
     * 判断是否有订阅者
     */
    public boolean hasObservers() {
        return mBus.hasObservers();
    }

    public void reset() {
        mDefaultInstance = null;
    }

    /**
     * Stciky 相关
     */

    /**
     * 发送一个新Sticky事件
     */
    public void postSticky(Object event) {
        synchronized (mStickyEventMap) {
            mStickyEventMap.put(event.getClass(), event);
        }
        post(event);
    }

    /**
     * 根据传递的 eventType 类型返回特定类型(eventType)的 被观察者
     */
    public <T> Observable<T> toObservableSticky(final Class<T> eventType) {
        synchronized (mStickyEventMap) {
            Observable<T> observable = mBus.ofType(eventType);
            final Object event = mStickyEventMap.get(eventType);

            if (event != null) {
                 return observable.mergeWith(Observable.create(new Observable.OnSubscribe<T>() {
                    @Override
                    public void call(Subscriber<? super T> subscriber) {
                        subscriber.onNext(eventType.cast(event));
                    }
                }));
            } else {
                return observable;
            }
        }
    }

    /**
     * 根据eventType获取Sticky事件
     */
    public <T> T getStickyEvent(Class<T> eventType) {
        synchronized (mStickyEventMap) {
            return eventType.cast(mStickyEventMap.get(eventType));
        }
    }

    /**
     * 移除指定eventType的Sticky事件
     */
    public <T> T removeStickyEvent(Class<T> eventType) {
        synchronized (mStickyEventMap) {
            return eventType.cast(mStickyEventMap.remove(eventType));
        }
    }

    /**
     * 移除所有的Sticky事件
     */
    public void removeAllStickyEvents() {
        synchronized (mStickyEventMap) {
            mStickyEventMap.clear();
        }
    }
}

虽然使用了线程安全的ConCurrentHashMap,但是依然大量使用了synchronized,可以发现锁住的是mStickyEventMap对象,这是为了保证对于读、写、查、删同步,即读时不能写,写时不能读...

最后

附上一个Demo:

  • 提供了使用Sticky特性的示例
  • 异常处理的示例:让其在发生异常后,仍能正确接收到后续的Event。

参考源码,传送门

Demo

参考:

ReactiveX: http://reactivex.io/
EventBus: https://github.com/greenrobot/EventBus

相关文章

网友评论

  • dhhDev:mergewith 换成startwith 了解一下
  • 杨晓是大V:老哥,明天来UC上班
  • iot_xc:老哥 有个问题,使用你的RxBus粘性事件,当注册的activity销毁时Event中的数据却还是存在的,每次进来时数据不是新的,而是一直在叠加
  • boboyuwu:public class RxBus <T>{


    private static RxBus mRxBus;
    private FlowableProcessor<T> mFlowableProcessor;
    private ConcurrentHashMap<Class,T> mConcurrentHashMap;
    private RxBus(){
    mFlowableProcessor= (FlowableProcessor<T>) PublishProcessor.create().toSerialized();
    mConcurrentHashMap=new ConcurrentHashMap();
    }

    public static RxBus getInstance(){
    if(mRxBus==null){
    synchronized (RxBus.class){
    if(mRxBus==null){
    mRxBus=new RxBus();
    }
    }
    }
    return mRxBus;
    }

    //register
    public <U>Flowable<U> registerEvent(Class<U> clazz){
    return mFlowableProcessor.ofType(clazz);
    }

    public void post(T event){
    mFlowableProcessor.onNext(event);
    }

    public Flowable registerStickEvent(final Class<T> clazz){
    final T t = mConcurrentHashMap.get(clazz);
    if(t!=null){
    return mFlowableProcessor.mergeWith(new Publisher<T>() {
    @Override
    public void subscribe(Subscriber<? super T> s) {
    s.onNext(t);
    }
    });
    }

    return mFlowableProcessor;

    }

    public void postStick(T event){
    T t = mConcurrentHashMap.get(event.getClass());
    if(t==null){
    mConcurrentHashMap.put(event.getClass(),event);
    }
    post(event);
    }

    }
  • boboyuwu:




    activityb

    public class SecondActivity extends BaseActivity{

    @Override
    protected void onCreate(@Nullable Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.second_activity);
    Flowable <Boolean>flowable = RxBus.getInstance().registerStickEvent(Boolean.class);
    addDispose( flowable.subscribe(new Consumer<Boolean>() {
    @Override
    public void accept(Boolean aBoolean) throws Exception {
    Log.e("wwwwwwwwww",aBoolean+"");
    }
    }));

    }
    }
    为啥发送一次收到这么多06-03 10:49:34.594 27705-27705/? E/wwwwwwwwww: false
    06-03 10:49:40.504 27705-27705/? E/wwwwwwwwww: false
    06-03 10:49:40.504 27705-27705/? E/wwwwwwwwww: false
    06-03 10:49:40.505 27705-27705/? E/wwwwwwwwww: false
    06-03 10:49:40.535 27705-27705/? E/wwwwwwwwww: false
    大神求救
  • W_BinaryTree:Subject可以换成Relay。Relay就是没有onComplete和onError的Subject
  • a770b0f00951:博主,你好,看了很多篇rxbus文章,就你写得比较清晰,通俗易懂哈哈,辛苦了,我想把你这代码改为rxjava2版本的,其中遇到一个问题就是rxjava1中,你源代码是这样定义的【 private final Subject<Object, Object> mBus = new SerializedSubject<>(PublishSubject.create());】而我在rxjava2中是这样定义的【private final FlowableProcessor<Object> _bus = PublishProcessor.create().toSerialized();】请问rxjava2哪个是对应rxjava1中的new SerializedSubject<>呢?,望答复,谢谢。
    W_BinaryTree:PublishProcessor和PublishSubject区别大概是Processor是继承于Flowable的,Subject是Observable。
    W_BinaryTree: PublishSubject.create().toSerialized();
  • 73ece15c815f:今天刚遇到这样的问题,就看到博主写的了,哈哈
  • lshilll:我问个问题,我有在项目里使用一开始那个很简单的rxbus,今天发现如果后面有switchmap之类的操作,切换到另一个observable如果会调用onComplete的话,就会使整个事件停掉,是不是只能在onComplete里面重新订阅了。
    YoKey: @15112675b137 onComplete onError都会导致整个订阅的结束
    YoKey: @15112675b137 是的 onComplete() onError()
  • 谷歌清洁工:其实我可以认为toObservableSticky中的merge方法是把一冷一热的两个Observable合并起来了吗,在subscribe的时候冷的发送数据并且结束而热的可以保持持续的输出直到完毕.
    YoKey:@宋昊中 嗯 可以这么理解的~
  • linheimx:好棒,谢谢楼主分享
  • 3Zero:很好
  • 叨码:“并且,不管是BehaviorSubject还是BehaviorSubject”有笔误,应该是BehaviorSubject和ReplaySubject吧。
    YoKey:@izen :joy: 感谢提醒
  • 4672b0b08300:activity间传递数据进行预加载,用intent还是用这个或者其它?
    4672b0b08300:@YoKey 传递List<Object>进行数据显示,现在用的是intent,正在考虑要不要换成数据总线,刚开始订阅者Activity还没打开,sticky是需要的
    YoKey: @SamBoLuong 建议简单的场景用intent
    复杂场景考虑事件总线eventbus rxbus,对于特殊场景sticky是需要的
  • 明朗__:我可以这样理解吗:1.根据map的特性 当key一样时会覆盖Value的值,那么如果发生2个相保存件会返回最近发生的事件。
    2.根据mergeWith操作符会将两个Observabl合并,那么若发生了新的事件,并且Map里面也保存有该事件,通过合得到一个新的observable;
    这时是新旧数据合并,若以JavaBean来讲 就是返回2个java对象。我实在想不到这样的应用场景。3.如果曾经发生过该事件,且没有被订阅,这时是保存到map里面的,这样再去订阅就能获取到已经发生过的事件,这样好理解,但是同key的事件在map里面是最新的,也就是说无论该事件发生多少次,但map里面只有一份最后的事件。
    不知道这样理解对吗.

    那么问题来了:在
    YoKey:@明朗__ 是的 就是这样
    Sticky本身就是保留最近事件的一种特性
  • linheimx:谢谢分享
  • OneBelowZero:楼主你好 我使用的时候有个困惑 你的demo我运行过 如果不是先订阅的话 发送就接收不到 但是eventbus确实 可以post 然后 注册的时候可以获取到。 RxBus 如果在这种场景中使用呢? 比如 A 获取到了数据 send B 只要 订阅立马接收到 而不是只能接收最后一次。谢谢
    YoKey: @OneBelowZero demo中的Sticky可以啊 和EventBus的sticky一样的 你再看下demo
  • 9bf29c4b2118:赞,我还在想几个问题:
    RxBus有必要用onBackpressureBuffer吗?
    onError事件怎么透传给observer?
    使用ConcurrentHashMap就不需要synchronize了
    YoKey:@皙华 synchronize问题 我回头好好研究下,当时看EventBus源码是加锁的,也研究了下,得到上述结论,可能未必正确,回头再好好看下。

    “onError事件怎么透传给observer?” try catch我那篇文章确实未必会透给observer,取决于try catch外返回的什么;如果你想必透给obsever,那就不catch...自然会传递到observer.... :sweat:
    9bf29c4b2118:@YoKey 事实上,你只是直接用到了ConcurrentHashMap提供的put/get,并没有进行这些操作的组合,所以synchronize实例没有必要的;
    我看了下RxBus异常处理那篇,对异常try,这个还是没法将onError通知通过RxBus透给observer,『If the Observable calls onError method, it will not thereafter call Observer.onNext(T) or Observer.onCompleted().』 这个还是困扰了我
    YoKey:@皙华
    onBackpressureBuffer看你具体需求,一般没有,可以在订阅的地方做onBackpressureBuffer相关的处理

    “onError事件怎么透传给observer?” 可以看下我之前的关于Rx的文章~

    使用ConcurrentHashMap还使用synchronize , 我是看到EventBus的是这样处理的,原因的话,我查了下应该是:因为锁住的是mStickyEventMap对象,这保证对于读、写、查、删同步,即读时不能写,写时不能读...
  • lsxiao:还有一个建议,Observable.merge(Observable,Observable)方法可以改用Observable1.mergeWith(Observable2)形式,这种链式的调用可读性更好点。 :smile:
    OneBelowZero:楼主你好 我使用的时候有个困惑 你的demo我运行过 如果不是先订阅的话 发送就接收不到 但是eventbus确实 可以post 然后 注册的时候可以获取到。 RxBus 如果在这种场景中使用呢? 比如 A 获取到了数据 send B 只要 订阅立马接收到 而不是只能接收最后一次。谢谢
    YoKey: @lsxiao Nice
  • lsxiao:Awesome!
  • cc8367cdc9fa:楼主你好,有个问题不明白,当调用toObservableSticky方法时,当event不为null的时候,为什么不单独发只有这个event的observable,而要用merge合并Observable<T> observable = mBus.ofType(eventType);这个observable
    YoKey:@放慢心跳111 不管是普通事件还是Sticky类型的事件都需要通过mBus.ofType(eventType)来订阅,而Sticky类型的话在订阅时还需要保证有该event的情况下立即可以接收到,
    综合起来就是用merge来合并这个observable了~ 如果只单独发那个event,而不订阅mBus.ofType(),那后续就收不到RxBus发送的事件啦 :joy:

本文标题:[深入RxBus]:支持Sticky事件

本文链接:https://www.haomeiwen.com/subject/qwsbjttx.html