美文网首页
Android实现消息总线的几种方式,你都会吗?

Android实现消息总线的几种方式,你都会吗?

作者: 蜗牛是不是牛 | 来源:发表于2022-06-14 10:46 被阅读0次

    Android中消息总线的几种实现方式

    前言

    消息总线又叫事件总线,为什么我们需要一个消息总线呢?是因为随着项目变大,页面变多,我们可能出现跨页面、跨组件、跨线程、跨进程传递消息与数据,为了更方便的直接通知到指定的页面实现具体的逻辑,我们需要消息总线来实现。

    从最基本的 BroadcastReceiver 到 EventBus 再到RxBus ,后来官方出了AndroidX jetpack 我们开始使用LiveDataBus,最后到Kotlin的流行出来了FlowBus。我们看看他们是怎么一步一步演变的。

    一、BroadcastReceiver 广播

    我们再初入 Android 的时候都应该学过广播接收者,分为静态广播和动态注册广播,在高版本的 Android 中限制了我们一些静态广播的使用,不过我们还是能通过动态注册的方式获取一些系统的状态改变。像常用的电量变化、网络状态变化、短信发送接收的状态等等。

    比如网络变化的监听:

        IntentFilter intentFilter = new IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION);
        application.getApplicationContext().registerReceiver(InstanceHolder.INSTANCE, intentFilter);
    
    

    在消息中线中,我们可以使用本地广播来实现 LocalBroadcastManager 消息的通知。

        LocalBroadcastManager mLocalBroadcastManager = LocalBroadcastManager.getInstance(mContext);
        
        BroadcastReceiver  mLoginReceiver = new LoginSuccessReceiver();
        mLocalBroadcastManager.registerReceiver(mLoginReceiver, new IntentFilter(Constants.ACTION_LOGIN_SUCCESS));
    
        private class LoginSuccessReceiver extends BroadcastReceiver {
            @Override
            public void onReceive(Context context, Intent intent) {
                //刷新Home界面
                refreshHomePage();
            
                //刷新未读信息
                requestUnreadNum();
            }
        }
    
        //记得要解绑对应的接收器
        mLocalBroadcastManager.unregisterReceiver(mLoginReceiver);
    
    

    这样就可以实现一个消息通知了。相比 EventBus 它的性能和空间的消耗都是较大的,并且只能固定在主线程运行。

    二、EventBus

    EventBus最大的特点就是简洁、解耦,可以直接传递我们自定义的消息Message。EventBus简化了应用程序内各组件间、组件与后台线程间的通信。记得2015年左右是非常火爆的。

    EventBus的调度灵活,不依赖于 Context,使用时无需像广播一样关注 Context 的注入与传递。可继承、优先级、粘滞,是 EventBus 比之于广播的优势。几乎可以满足我们全部的需求。

    最初的EventBus其实就是一个方法的集合与查找,核心是通过register方法把带有@Subscrib注解的方法和参数之类的东西全部放入一个List集合,然后通过post方法去这个list循环查找到符合条件的方法去执行。

    如何使用EventBus,一共分5步:

      @Override
        protected void onCreate(Bundle savedInstanceState) {
            super.onCreate(savedInstanceState);
            setContentView(R.layout.activity_event_bus);
    
            EventBus.getDefault().register(MainActivity.this);  //1.注册广播
        }
    
    
      @Override
        protected void onDestroy() {
            super.onDestroy();
            EventBus.getDefault().unregister(MainActivity.this); //2.解注册广播
        }
    
    
    /**
     * 3.传递什么类型的。定义一个消息类
     */
    public class MessageEvent {
        public String name;
    
        public MessageEvent(String name) {
            this.name = name;
        }
    }
    
    
        @OnClick({R.id.bt_eventbus_send_main, R.id.bt_eventbus_send_sticky})
        public void onClick(View view) {
            switch (view.getId()) {
                case R.id.bt_eventbus_send_main:
                    //4.发送消息
                    EventBus.getDefault().post(new MessageEvent("我是主页面发送过来的消息"));
                    finish();
                    break;
            }
        }
    
    
       /**
         * 5.接受到消息。需要注解
         *
         * @param event
         */
        @Subscribe(threadMode = ThreadMode.MAIN)   //主线程执行
        public void MessageEventBus(MessageEvent event) {
            //5。显示接受到的消息
            mTvEventbusResult.setText(event.name);
        }
    
    

    EventBus的性能开销其实不大,EventBus2.4.0 版是利用反射来实现的,后来改成 APT 实现之后会好很多。主要问题是需要定义很多的消息对象,消息太多之后就感觉管理起来很麻烦。当消息太多之后容器内部的查找会出现性能瓶颈。

    就算如此 EventBus 也是值得大家使用的。

    三、RxBus

    RxBus是基于RxJava实现的,强大是强大,但是学习成本比较高,需要额外导入RxJava RxAndroid等库,这些库体积还是较大的。可以实现异步的消息等。

    本身的实现是很简单的:

    public class RxBus {
        private volatile static RxBus mDefaultInstance;
        private final Subject<Object> mBus;
    
        private RxBus() {
            mBus = PublishSubject.create().toSerialized();
        }
    
        public static RxBus getInstance() {
            if (mDefaultInstance == null) {
                synchronized (RxBus.class) {
                    if (mDefaultInstance == null) {
                        mDefaultInstance = new RxBus();
                    }
                }
            }
            return mDefaultInstance;
        }
    
        /**
         * 发送事件
         */
        public void post(Object event) {
            mBus.onNext(event);
        }
    
        /**
         * 根据传递的 eventType 类型返回特定类型(eventType)的 被观察者
         */
        public <T> Observable<T> toObservable(final Class<T> eventType) {
            return mBus.ofType(eventType);
        }
    
        /**
         * 判断是否有订阅者
         */
        public boolean hasObservers() {
            return mBus.hasObservers();
        }
    
        public void reset() {
            mDefaultInstance = null;
        }
    
    }
    
    
    

    定义消息对象:

    public class MsgEvent {
        private String msg;
    
        public MsgEvent(String msg) {
            this.msg = msg;
        }
    
        public String getMsg() {
            return msg;
        }
    
        public void setMsg(String msg) {
            this.msg = msg;
        }
    }
    
    

    发送与接收:

    RxBus.getInstance().toObservable(MsgEvent.class).subscribe(new Observer<MsgEvent>() {
                @Override
                public void onSubscribe(Disposable d) {
                    
                }
    
                @Override
                public void onNext(MsgEvent msgEvent) {
                    //处理事件
                }
    
                @Override
                public void onError(Throwable e) {
                      
                }
    
                @Override
                public void onComplete() {
    
                }
            });
    
            
    RxBus.getInstance().post(new MsgEvent("Java"));
    
    
    

    缺点是容易内存泄露,我们需要使用rxlifecycle 或者使用CompositeDisposable 自己对生命周期进行处理解绑。

    四、LiveDataBus

    官方出了AndroidX jetpack 内部包含LiveData,它可以感知并遵循Activity、Fragment或Service等组件的生命周期。

    为什么要使用LiveDataBus,正是基于LiveData对组件生命周期可感知的特点,因此可以做到仅在组件处于生命周期的激活状态时才更新UI数据。

    一个简单的LiveDataBus的实现:

    public final class LiveDataBus {
     
        private final Map<String, BusMutableLiveData<Object>> bus;
     
        private LiveDataBus() {
            bus = new HashMap<>();
        }
     
        private static class SingletonHolder {
            private static final LiveDataBus DEFAULT_BUS = new LiveDataBus();
        }
     
        public static LiveDataBus get() {
            return SingletonHolder.DEFAULT_BUS;
        }
     
        public <T> MutableLiveData<T> with(String key, Class<T> type) {
            if (!bus.containsKey(key)) {
                bus.put(key, new BusMutableLiveData<>());
            }
            return (MutableLiveData<T>) bus.get(key);
        }
     
        public MutableLiveData<Object> with(String key) {
            return with(key, Object.class);
        }
     
        private static class ObserverWrapper<T> implements Observer<T> {
     
            private Observer<T> observer;
     
            public ObserverWrapper(Observer<T> observer) {
                this.observer = observer;
            }
     
            @Override
            public void onChanged(@Nullable T t) {
                if (observer != null) {
                    if (isCallOnObserve()) {
                        return;
                    }
                    observer.onChanged(t);
                }
            }
     
            private boolean isCallOnObserve() {
                StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
                if (stackTrace != null && stackTrace.length > 0) {
                    for (StackTraceElement element : stackTrace) {
                        if ("android.arch.lifecycle.LiveData".equals(element.getClassName()) &&
                                "observeForever".equals(element.getMethodName())) {
                            return true;
                        }
                    }
                }
                return false;
            }
        }
     
        private static class BusMutableLiveData<T> extends MutableLiveData<T> {
     
            private Map<Observer, Observer> observerMap = new HashMap<>();
     
            @Override
            public void observe(@NonNull LifecycleOwner owner, @NonNull Observer<T> observer) {
                super.observe(owner, observer);
                try {
                    hook(observer);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
     
            @Override
            public void observeForever(@NonNull Observer<T> observer) {
                if (!observerMap.containsKey(observer)) {
                    observerMap.put(observer, new ObserverWrapper(observer));
                }
                super.observeForever(observerMap.get(observer));
            }
     
            @Override
            public void removeObserver(@NonNull Observer<T> observer) {
                Observer realObserver = null;
                if (observerMap.containsKey(observer)) {
                    realObserver = observerMap.remove(observer);
                } else {
                    realObserver = observer;
                }
                super.removeObserver(realObserver);
            }
     
            private void hook(@NonNull Observer<T> observer) throws Exception {
                //get wrapper's version
                Class<LiveData> classLiveData = LiveData.class;
                Field fieldObservers = classLiveData.getDeclaredField("mObservers");
                fieldObservers.setAccessible(true);
                Object objectObservers = fieldObservers.get(this);
                Class<?> classObservers = objectObservers.getClass();
                Method methodGet = classObservers.getDeclaredMethod("get", Object.class);
                methodGet.setAccessible(true);
                Object objectWrapperEntry = methodGet.invoke(objectObservers, observer);
                Object objectWrapper = null;
                if (objectWrapperEntry instanceof Map.Entry) {
                    objectWrapper = ((Map.Entry) objectWrapperEntry).getValue();
                }
                if (objectWrapper == null) {
                    throw new NullPointerException("Wrapper can not be bull!");
                }
                Class<?> classObserverWrapper = objectWrapper.getClass().getSuperclass();
                Field fieldLastVersion = classObserverWrapper.getDeclaredField("mLastVersion");
                fieldLastVersion.setAccessible(true);
                //get livedata's version
                Field fieldVersion = classLiveData.getDeclaredField("mVersion");
                fieldVersion.setAccessible(true);
                Object objectVersion = fieldVersion.get(this);
                //set wrapper's version
                fieldLastVersion.set(objectWrapper, objectVersion);
            }
        }
    }
    
    

    注册与发送:

    LiveDataBus.get()
            .with("key_test", String.class)
            .observe(this, new Observer<String>() {
                @Override
                public void onChanged(@Nullable String s) {
                }
            });
    
    LiveDataBus.get().with("key_test").setValue(s);
    
    

    LiveDataBus已经算是很好用的,自动注册解绑,根据Key传递泛型T对象,容易查找对应的接收者,也可以实现可见的触发和直接触发,可以实现跨进程,

    LiveData有几点不足,只能在主线程更新数据,操作符无法转换数据,基于 Android Api 实现的,换一个平台无法适应,基于这几点又开发出了FlowBus。

    五、FlowBus

    很多人都说Flow 的出现导致 LiveData 没那么重要了,就是因为 LiveData 的场景 都可以使用 Flow 平替了,还能更为的强大和灵活。

    StateFlow 可以 替代ViewModel中传递数据,SharedFlow 可以实现事件总线。(这两者的异同如果大家有兴趣,我可以单独开一篇讲下)。

    SharedFlow 就是一种热流,可以实现一对多的关系,其构造方法支持天然支持普通的消息发送与粘性的消息发送。一般我们FlowBus都是基于 SharedFlow 来实现:

    object FlowBus {
        private val busMap = mutableMapOf<String, EventBus<*>>()
        private val busStickMap = mutableMapOf<String, StickEventBus<*>>()
    
        @Synchronized
        fun <T> with(key: String): EventBus<T> {
            var eventBus = busMap[key]
            if (eventBus == null) {
                eventBus = EventBus<T>(key)
                busMap[key] = eventBus
            }
            return eventBus as EventBus<T>
        }
    
        @Synchronized
        fun <T> withStick(key: String): StickEventBus<T> {
            var eventBus = busStickMap[key]
            if (eventBus == null) {
                eventBus = StickEventBus<T>(key)
                busStickMap[key] = eventBus
            }
            return eventBus as StickEventBus<T>
        }
    
        //真正实现类
        open class EventBus<T>(private val key: String) : LifecycleObserver {
    
            //私有对象用于发送消息
            private val _events: MutableSharedFlow<T> by lazy {
                obtainEvent()
            }
    
            //暴露的公有对象用于接收消息
            val events = _events.asSharedFlow()
    
            open fun obtainEvent(): MutableSharedFlow<T> = MutableSharedFlow(0, 1, BufferOverflow.DROP_OLDEST)
    
            //主线程接收数据
            fun register(lifecycleOwner: LifecycleOwner, action: (t: T) -> Unit) {
                lifecycleOwner.lifecycle.addObserver(this)
                lifecycleOwner.lifecycleScope.launch {
                    events.collect {
                        try {
                            action(it)
                        } catch (e: Exception) {
                            e.printStackTrace()
                            YYLogUtils.e("FlowBus - Error:$e")
                        }
                    }
                }
            }
    
            //协程中发送数据
            suspend fun post(event: T) {
                _events.emit(event)
            }
    
            //主线程发送数据
            fun post(scope: CoroutineScope, event: T) {
                scope.launch {
                    _events.emit(event)
                }
            }
    
            //自动销毁
            @OnLifecycleEvent(Lifecycle.Event.ON_DESTROY)
            fun onDestroy() {
                YYLogUtils.w("FlowBus - 自动onDestroy")
                val subscriptCount = _events.subscriptionCount.value
                if (subscriptCount <= 0)
                    busMap.remove(key)
            }
        }
    
        class StickEventBus<T>(key: String) : EventBus<T>(key) {
            override fun obtainEvent(): MutableSharedFlow<T> = MutableSharedFlow(1, 1, BufferOverflow.DROP_OLDEST)
        }
    
    }
    
    

    发送与接收消息

        // 主线程-发送消息
        FlowBus.with<String>("test-key-01").post(this@Demo11OneFragment2.lifecycleScope, "Test Flow Bus Message")
    
    
        // 接收消息
        FlowBus.with<String>("test-key-01").register(this) {
                LogUtils.w("收到FlowBus消息 - " + it)
            }
    
    

    发送粘性消息

     FlowBus.withStick<String>("test-key-02").post(lifecycleScope, "Test Stick Message")
    
    
       FlowBus.withStick<String>("test-key-02").register(this){
                LogUtils.w("收到粘性消息:$it")
            }
    
    

    Log如下:

    总结

    其实这么多消息总线框架,我更推荐EventBus LiveDataBus FlowBus这三种。

    总的来说,我们尽量不依赖第三方的框架来实现,那么 FlowBus 是语言层级的,基于Kotlin的特性实现,比较推荐了,我本人是喜欢用LiveDataBus,基于 Android 开发场景来看,几乎能满足全部要求了。

    如果大家有源码方面的需求可以看看这里,上面的源码也都贴出来了,不过更推荐大家根据不同的类型自行去 Github 上面找对应的实现封装,功能会更多,健壮性更好。

    好了,关于消息总线就说到这了,如果觉得不错还请点赞支持哦!

    完结!

    相关文章

      网友评论

          本文标题:Android实现消息总线的几种方式,你都会吗?

          本文链接:https://www.haomeiwen.com/subject/tgxzmrtx.html