美文网首页
消息总线那些事儿

消息总线那些事儿

作者: fdacc6a1e764 | 来源:发表于2017-01-10 00:40 被阅读0次

    项目到了一定阶段会出现一种甜蜜的负担:业务的不断发展与人员的流动性越来越大,代码维护与测试回归流程越来越繁琐。这个时候需要对项目进行两方面的重构:

    1.分层操作,方便复用

    2.模块解耦,减少影响

    小英团队经过多次调研之后有如下成果:

    分层操作

    下图是项目的分层模型:

    ProgectUI:界面展示层,包括各个Activity,Fragment页面,以及相对复杂的一些UI组件等

    Bussiness:实际业务层,比如:用户点击登录按钮,去执行登录的操作

    BussinessService:业务服务层,对外提供数据服务。比如:用户信息模块对外提供用户相关的所有信息

    Interface层:网络请求与数据缓存层;将用户的网络接口单独作为一层,并根据实际需要设置是否进行缓存。

    DbCache层:数据库与数据模型转换层;所有数据库操作都使用DbCache;

    CoreService层:功能同FrameWork层,但是较重,故拆出;CoreService与FrameWork层具备业务无关性、通用性;主要有:分发器,Hybrid,热修复以及埋点等

    FrameWork层:基础的技术组件(网络库,图片库等)、三方服务封装以及通用UI等;

    以用户信息模块为例,介绍一下具体的实现过程:

    ProgectUI对应NewLoginActivity:界面展示层,登录页面,通过ChrLoginView与业务逻辑解耦

    Bussiness对应QuickLoginPresenter:实际业务层,用户注册、登录、忘记密码等操作;依赖View与Module,处理实际的业务逻辑

    BussinessService对应UserInstance:业务服务层.存储账号信息与用户信息。通过实现接口UserInfoInterface,向外提供用户信息服务。

    Interface对应ApiService:登录相关的后台接口。

    FrameWork对应ApiUtils:封装网络库Retrofit,代理网络请求

    模块解耦

    总体采用依赖注入的方式将服务的实现与使用分离:

    UserInstance维护用户模块所有的信息,并通过UserInfoInterface与其他模块进行隔离

    ServiceManager负责维护各个模块服务的注册并提供访问的接口

    UserBean实体bean作为数据通信的格式,负责统计用户模块所有的信息

    EventBus消息总线负责向外提供本模块的方法调用

    总体思想:依赖注入负责对外暴露数据;EventBus负责对外暴露回调方法。

    接下来就来介绍本文的主体:EventBus

    简单介绍

    EventBus-Android端事件发布/订阅框架,特点如下:

    简化组件之间的通信。Android常用的通信方式:Broadcast、Listener、静态变量以及通过Handler进行线程之间的通信等。都可以统一使用EventBus。

    简化代码。不同于使用Listener通信方式,层层传递,模块之间耦合严重。EventBus使用非常简单并且无耦合

    高性能。框架中采用缓存、池化技术、细粒度锁、索引加速等方式使开发者无需关注安全性能方面的问题

    依赖包不足50k

    高级属性:线程模型、优先级以及是否接收粘性事件等

    总线的工作机制如图:

    订阅者通过EventBus订阅相关事件,并准备好回调方法

    发布者将事件发送给post出去,EventBus负责通知订阅者

    极简使用

    分为五步:导入依赖、初始化总线、定义事件、注册订阅者、发送事件

    导入依赖

    项目中Module的build.gradle中添加依赖:

    compile 'org.greenrobot:eventbus:3.0.0’

    如果不需要索引加速,直接跳到第二步。

    索引加速使用到编译时注解,所以需要在项目gradle中添加apt编译插件:

    classpath 'com.neenbedankt.gradle.plugins:android-apt:1.8’

    在Module中申请apt插件生成索引的完整类名

    apply plugin: 'com.neenbedankt.android-apt'

    apt {

    arguments {

    eventBusIndex “com.chinahrMyEventBusIndex"

    }

    }

    Module中引入EventBusAnnotationProcessor:

    apt 'org.greenrobot:eventbus-annotation-processor:3.0.1’

    编译项目之后,就可以在\app\build\generate\source\apt下看到生成的索引类

    初始化总线

    EventBus可以通过getDefault()方法获取单例

    EventBus.getDefault().register(this);

    也可以通过EventBus.builder()去构造自定义的EventBus。另外还可以通过Bulder.installDefaultEventBus()修改默认的单例

    EventBus.builder().eventInheritance(true).installDefaultEventBus();

    如果需要索引加速,将编译时生成的Index通过Builder添加进去

    EventBus.getDefault().builder().addIndex(new MyEventBusIndex());

    定义事件

    所有可以被实例化成object的类都可以作为事件

    public class MyEvent {

    }

    注册订阅者

    订阅事件的类中执行register()

    EventBus.getDefault().register(this);

    并在监听事件的回调方法上添加注解@SubScribe,可配置属性:方法执行的线程模型,分发的优先级,是否接收粘性事件

    @Subscribe(threadMode = ThreadMode.MAIN,priority = 0,sticky = true)

    public  void onEventLogin(MyEvent myEvent){

    Toast.makeText(MainActivity.this,"登录成功",Toast.LENGTH_LONG).show();

    }

    为了防止内存泄露,在总线中对订阅者进行注销,比如在Activity的OnDestroy()中:

    EventBus.getDefault().unregister(this);

    发送事件

    调用post(myEvent)或postSticky(myEvent)

    EventBus.getDefault().post(new MyEvent());

    以上完成了消息订阅/发布的整个流程。

    接下来将会说明框架内部的结构:

    整体框架

    四部分组成,如图:

    数据元素

    框架主要涉及这些数据元素:订阅者Subscriber,方法主体Method,事件event,事件类型eventType,线程模型ThreadMode,方法优先级priority,是否接收粘性事件sticky

    SubscriberMethod与订阅者subscriber组合成Subscription,即订阅方法。

    一个方法的执行需要方法主体(Method),调用方法的对象(Subscriber),参数(事件event)

    post(event)之后结合Subscription就可以完成Method的调用。

    EventBus

    框架的门面,维护三个Map,并负责分发事件与执行订阅方法。

    Map, CopyOnWriteArrayList>subscriptionsByEventType.事件类型与订阅方法列表的对应关系。注册事件,发送事件都是在操作此map.

    Map>>typesBySubscriber.订阅者与订阅事件类型的对应关系。为了方便注销订阅者。

    Map, Object>stickyEvents.粘性事件列表

    调度器

    回调方法通过四种方式进行分发,即线程模型ThreadMode:

    POSTING:默认模式,直接在当前线程执行

    MAIN:如果当前是主线程,直接执行;如果不是主线程,通过handler发送到主线程执行。

    BACKGROUND:如果是主线程,交给backgroundPoster去调度;如果不是主线程就直接执行。

    ASYN:交给asyncPoster调度。asyncPoster会直接在线程池当中开启一个线程执行。

    索引加速

    编译时,apt插件通过EventBusAnnotationProcessor提取注解@Subscribe生成索引MyEventBusIndex.索引内部维护一个Map,辅助EventBus查找方法信息

    Map, SubscriberInfo>SUBSCRIBER_INDEX.订阅者的类型与回调方法列表对应关系

    整个流程一句话总结:在订阅者准备好处理事件的回调方法之后,EventBus根据订阅者对象经过反射或者索引加速获取回调方法的信息,接收发布的事件event,按照指定的线程模型执行回调方法。

    EventBus内部设计十分精致,对于编程技能的提高有非常大的帮助。接下来介绍:

    设计思想

    作为一个框架主要有四方面的设计:门面、调度器、线程安全、性能调优等

    门面

    1.EventBus类,门面模式中的门面类对外提供集中化和简化的沟通管道。总线的所有操作(注册订阅者,发送事件,调度执行,注销订阅者等)被封装到EventBus中,有效地屏蔽实现细节,使用和维护起来非常方便。

    public classEventBus {

    /**

    * Registers the given subscriber to receive events. Subscribers must call {@link#unregister(Object)} once they

    * are no longer interested in receiving events.

    *

    * Subscribers have event handling methods that must be annotated by {@linkSubscribe}.

    * The {@linkSubscribe} annotation also allows configuration like {@link

    * ThreadMode} and priority.

    */

    public void register(Object subscriber) {Class subscriberClass = subscriber.getClass();

    List subscriberMethods =subscriberMethodFinder.findSubscriberMethods(subscriberClass);

    synchronized(this) {

    for(SubscriberMethod subscriberMethod : subscriberMethods) {

    subscribe(subscriber, subscriberMethod);

    }

    }

    }

    /** Posts the given event to the event bus. */

    public voidpost(Object event) {

    PostingThreadState postingState =currentPostingThreadState.get();

    List eventQueue = postingState.eventQueue;

    eventQueue.add(event);

    if(!postingState.isPosting) {

    postingState.isMainThread= Looper.getMainLooper() == Looper.myLooper();

    postingState.isPosting=true;

    if(postingState.canceled) {

    throw newEventBusException("Internal error. Abort state was not reset");

    }

    try{

    while(!eventQueue.isEmpty()) {

    postSingleEvent(eventQueue.remove(0), postingState);

    }

    }finally{

    postingState.isPosting=false;

    postingState.isMainThread=false;

    }

    }

    }

    voidinvokeSubscriber(Subscription subscription, Object event) {

    try{

    subscription.subscriberMethod.method.invoke(subscription.subscriber, event);

    }catch(InvocationTargetException e) {

    handleSubscriberException(subscription, event, e.getCause());

    }catch(IllegalAccessException e) {

    throw newIllegalStateException("Unexpected exception", e);

    }

    }

    /** Unregisters the given subscriber from all event classes. */

    public synchronized void unregister(Object subscriber) {List> subscribedTypes =typesBySubscriber.get(subscriber);

    if(subscribedTypes !=null) {

    for(Class eventType : subscribedTypes) {

    unsubscribeByEventType(subscriber, eventType);

    }

    typesBySubscriber.remove(subscriber);

    }else{

    Log.w(TAG,"Subscriber to unregister was not registered before: "+ subscriber.getClass());

    }

    }

    }

    2.EventBus对象的构建,采用volatile实例与双重检查加锁方式保证线程安全。

    static volatileEventBusdefaultInstance;

    /** Convenience singleton for apps using a process-wide EventBus instance. */

    public staticEventBus getDefault() {

    if(defaultInstance==null) {

    synchronized(EventBus.class) {

    if(defaultInstance==null) {

    defaultInstance=newEventBus();

    }

    }

    }

    returndefaultInstance;

    }

    3.采用Builder模式辅助构建实例,避免因构造参数多带来的构造器繁多,即避免重复重叠构造器模式;也避免了采用javaBean封装参数带来的修改一致性问题。

    public classEventBus {

    private static finalEventBusBuilderDEFAULT_BUILDER=newEventBusBuilder();

    EventBus(EventBusBuilder builder) {

    subscriptionsByEventType=newHashMap<>();

    typesBySubscriber=newHashMap<>();

    stickyEvents=newConcurrentHashMap<>();

    mainThreadPoster=newHandlerPoster(this, Looper.getMainLooper(),10);

    backgroundPoster=newBackgroundPoster(this);

    asyncPoster=newAsyncPoster(this);

    indexCount= builder.subscriberInfoIndexes!=null? builder.subscriberInfoIndexes.size() :0;

    subscriberMethodFinder=newSubscriberMethodFinder(builder.subscriberInfoIndexes,

    builder.strictMethodVerification, builder.ignoreGeneratedIndex);

    logSubscriberExceptions= builder.logSubscriberExceptions;

    logNoSubscriberMessages= builder.logNoSubscriberMessages;

    sendSubscriberExceptionEvent= builder.sendSubscriberExceptionEvent;

    sendNoSubscriberEvent= builder.sendNoSubscriberEvent;

    throwSubscriberException= builder.throwSubscriberException;

    eventInheritance= builder.eventInheritance;

    executorService= builder.executorService;

    }

    public staticEventBusBuilder builder() {

    return newEventBusBuilder();

    }

    }

    public classEventBusBuilder {

    /** Default: true */

    publicEventBusBuilder logSubscriberExceptions(booleanlogSubscriberExceptions) {

    this.logSubscriberExceptions= logSubscriberExceptions;

    return this;

    }

    /** Builds an EventBus based on the current configuration. */

    publicEventBus build() {

    return newEventBus(this);

    }

    }

    4.构造方法采用public,可以构建在项目中对不同的子模块创建消息总线。Builder中提供修改默认单例的方法installDefaultEventBus(),是全局单例更加灵活。

    public classEventBus {

    /**

    * Creates a new EventBus instance; each instance is a separate scope in which events are delivered. To use a

    * central bus, consider {@link#getDefault()}.

    */

    publicEventBus() {

    this(DEFAULT_BUILDER);

    }

    }

    public classEventBusBuilder {

    * Installs the default EventBus returned by {@linkEventBus#getDefault()} using this builders' values. Must be

    * done only once before the first usage of the default EventBus.

    *

    *@throwsEventBusException if there's already a default EventBus instance in place

    */

    publicEventBus installDefaultEventBus() {

    synchronized(EventBus.class) {

    if(EventBus.defaultInstance!=null) {

    throw newEventBusException("Default instance already exists."+

    " It may be only set once before it's used the first time to ensure consistent behavior.");

    }

    EventBus.defaultInstance= build();returnEventBus.defaultInstance;

    }

    }

    }

    总结一句话:EvntBus作为框架的门面,采用双检锁,builder模式,构造参数public,封装流程细节,使得总线可以灵活配置与统一管理。

    数据结构Map化

    利用对象的class属性作为Map的key值可以使代码更加简洁。这也是只需要传入一个对象就可以驱动整个流程运行起来的关键。

    private finalMap, CopyOnWriteArrayList>subscriptionsByEventType;

    private finalMap>>typesBySubscriber;

    private finalMap, Object>stickyEvents;

    // Must be called in synchronized block

    private voidsubscribe(Object subscriber, SubscriberMethod subscriberMethod) {

    …………

    …………

    …………

    if(subscriberMethod.sticky) {

    if(eventInheritance) {

    // Existing sticky events of all subclasses of eventType have to be considered.

    // Note: Iterating over all events may be inefficient with lots of sticky events,

    // thus data structure should be changed to allow a more efficient lookup

    // (e.g. an additional map storing sub classes of super classes: Class -> List).

    Set, Object>> entries =stickyEvents.entrySet();

    for(Map.Entry, Object> entry : entries) {

    Class candidateEventType = entry.getKey();

    if(eventType.isAssignableFrom(candidateEventType)) {

    Object stickyEvent = entry.getValue();

    checkPostStickyEventToSubscription(newSubscription, stickyEvent);

    }

    }

    }else{

    Object stickyEvent =stickyEvents.get(eventType);

    checkPostStickyEventToSubscription(newSubscription, stickyEvent);

    }

    }

    }

    public voidpostSticky(Object event) {

    synchronized(stickyEvents) {

    stickyEvents.put(event.getClass(), event);

    }

    // Should be posted after it is putted, in case the subscriber wants to remove immediately

    post(event);

    }

    这个可以借鉴到组件化设计当中的服务管理器ServiceManager,维护一个以服务接口类为key,实现类作为value的Map。简化注册调用方法。

    public classServiceManager {

    Map serviceMap=new HashMap<>();static volatileServiceManagerdefaultInstance;

    public staticServiceManager  getInstance(){

    if(defaultInstance==null){

    synchronized(ServiceManager.class){

    if(defaultInstance==null){

    defaultInstance=newServiceManager();

    }

    }

    }

    returndefaultInstance;

    }

    public voidregister(Object serviceInstance){

    serviceMap.put(serviceInstance.getClass().getInterfaces()[0],serviceInstance);

    }

    publicTgetService(Class serviceType){

    return(T)serviceMap.get(serviceType);

    }

    }

    public interfaceUserinfoInterface {

    publicUserBean getUserInfo();

    }

    注册服务

    public classUserInstanceimplementsUserinfoInterface {

    UserInstance(){

    ServiceManager.getInstance().register(this);

    }

    @Override

    publicUserBean getUserInfo() {

    return newUserBean();

    }

    }

    其他模块调用用户信息:

    public classResumeInstance {

    voiddoSomeThing(){

    UserinfoInterface userinfoInterface=ServiceManager.getInstance().getService(UserinfoInterface.class);

    if(userinfoInterface!=null)

    userinfoInterface.getUserInfo();

    }

    }

    调度器

    Android的线程有个特点:主线程不能被阻塞,UI的更新位于主线程,耗时操作如网络处理在后台线程

    框架采用下面的方式进行调度:

    每个调度器都维护一个待处理方法队列PendingPostQueue。值得一提的是:poll(intmaxMillisToWait)出队时如果当前队列为空,会释放当前对象的锁,等待队列填充。这个功能将在下面性能分析时解释

    final classPendingPostQueue {

    privatePendingPosthead;

    privatePendingPosttail;

    synchronized voidenqueue(PendingPost pendingPost) {

    if(pendingPost ==null) {

    throw newNullPointerException("null cannot be enqueued");

    }

    if(tail!=null) {

    tail.next= pendingPost;

    tail= pendingPost;

    }else if(head==null) {

    head=tail= pendingPost;

    }else{

    throw newIllegalStateException("Head present, but no tail");

    }

    notifyAll();

    }

    synchronizedPendingPost poll() {

    PendingPost pendingPost =head;

    if(head!=null) {

    head=head.next;

    if(head==null) {

    tail=null;

    }

    }

    returnpendingPost;

    }

    synchronizedPendingPost poll(intmaxMillisToWait)throwsInterruptedException {

    if(head==null) {

    wait(maxMillisToWait);

    }

    returnpoll();

    }

    }

    HandlerPoster负责在主线程中处理事件,显然它是Handler的子类

    final classHandlerPosterextendsHandler {

    private finalPendingPostQueuequeue;

    private final intmaxMillisInsideHandleMessage;

    private finalEventBuseventBus;

    private booleanhandlerActive;

    HandlerPoster(EventBus eventBus, Looper looper,intmaxMillisInsideHandleMessage) {

    super(looper);

    this.eventBus= eventBus;

    this.maxMillisInsideHandleMessage= maxMillisInsideHandleMessage;

    queue=newPendingPostQueue();

    }

    voidenqueue(Subscription subscription, Object event) {

    PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);

    synchronized(this) {

    queue.enqueue(pendingPost);

    if(!handlerActive) {

    handlerActive=true;

    if(!sendMessage(obtainMessage())) {

    throw newEventBusException("Could not send handler message");

    }

    }

    }

    }

    @Override

    public voidhandleMessage(Message msg) {

    booleanrescheduled =false;

    try{

    longstarted = SystemClock.uptimeMillis();

    while(true) {

    PendingPost pendingPost =queue.poll();

    if(pendingPost ==null) {

    synchronized(this) {

    // Check again, this time in synchronized

    pendingPost =queue.poll();

    if(pendingPost ==null) {

    handlerActive=false;

    return;

    }

    }

    }

    eventBus.invokeSubscriber(pendingPost);

    longtimeInMethod = SystemClock.uptimeMillis() - started;

    if(timeInMethod >=maxMillisInsideHandleMessage) {

    if(!sendMessage(obtainMessage())) {

    throw newEventBusException("Could not send handler message");

    }

    rescheduled =true;

    return;

    }

    }

    }finally{

    handlerActive= rescheduled;

    }

    }

    }

    BackgroundPoster继承自Runnable,某一时段内待处理都会在BackgroundPoster的run方法中排队处理。BackgroundPoster正在被线程池执行时,executorRunning==true,此时发布的事件只会进队列,不会再次调用线程池的execute方法。事件全部处理后才退出死循环,设置executorRunning=fasle,此后再发布事件才会在线程池中开辟一个新线程。

    上文提到(PendingPostQueue.poll(int))在队列为空的时候等待队列中添加元素。以及executorRunning标示位的使用,都是为了重用当前对象。

    final classBackgroundPosterimplementsRunnable {

    private finalPendingPostQueuequeue;

    private finalEventBuseventBus;

    private volatile booleanexecutorRunning;

    BackgroundPoster(EventBus eventBus) {

    this.eventBus= eventBus;

    queue=newPendingPostQueue();

    }

    public voidenqueue(Subscription subscription, Object event) {

    PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);

    synchronized(this) {

    queue.enqueue(pendingPost);

    if (!executorRunning) {

    executorRunning = true;

    eventBus.getExecutorService().execute(this);

    }

    }

    }

    @Override

    public voidrun() {

    try{

    try{

    while(true) {

    PendingPost pendingPost = queue.poll(1000);

    if(pendingPost ==null) {

    synchronized(this) {

    // Check again, this time in synchronized

    pendingPost =queue.poll();

    if(pendingPost ==null) {

    executorRunning=false;

    return;

    }

    }

    }

    eventBus.invokeSubscriber(pendingPost);

    }

    }catch(InterruptedException e) {

    Log.w("Event", Thread.currentThread().getName() +" was interruppted", e);

    }

    }finally{

    executorRunning=false;

    }

    }

    }

    AyncPoster很简单,来了事件就在线程池中开辟线程执行。

    classAsyncPosterimplementsRunnable {

    private finalPendingPostQueuequeue;

    private finalEventBuseventBus;

    AsyncPoster(EventBus eventBus) {

    this.eventBus= eventBus;

    queue=newPendingPostQueue();

    }

    public voidenqueue(Subscription subscription, Object event) {

    PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);

    queue.enqueue(pendingPost);

    eventBus.getExecutorService().execute(this);

    }

    @Override

    public voidrun() {

    PendingPost pendingPost =queue.poll();

    if(pendingPost ==null) {

    throw newIllegalStateException("No pending post available");

    }

    eventBus.invokeSubscriber(pendingPost);

    }

    }

    总体一句话:三种方式的线程模型与一个方法链表组成一个非常标准Android版线程调度器。

    线程安全

    框架在应对多线程方面做了很多设计

    1.粒度锁

    public classEventBus {

    private finalMap, CopyOnWriteArrayList>subscriptionsByEventType;

    private finalMap>>typesBySubscriber;

    private finalMap, Object>stickyEvents;

    public voidregister(Object subscriber) {

    Class subscriberClass = subscriber.getClass();

    List subscriberMethods =subscriberMethodFinder.findSubscriberMethods(subscriberClass);

    synchronized (this) {

    for (SubscriberMethod subscriberMethod : subscriberMethods) {

    subscribe(subscriber, subscriberMethod);

    }

    }

    }

    private booleanpostSingleEventForEventType(Object event, PostingThreadState postingState, Class eventClass) {

    CopyOnWriteArrayList subscriptions;

    synchronized (this) {

    subscriptions = subscriptionsByEventType.get(eventClass);

    }

    …………

    }

    public booleanhasSubscriberForEvent(Class eventClass) {

    List> eventTypes =lookupAllEventTypes(eventClass);

    if(eventTypes !=null) {

    intcountTypes = eventTypes.size();

    for(inth =0; h < countTypes; h++) {

    Class clazz = eventTypes.get(h);

    CopyOnWriteArrayList subscriptions;

    synchronized (this) {

    subscriptions = subscriptionsByEventType.get(clazz);

    }

    if(subscriptions !=null&& !subscriptions.isEmpty()) {

    return true;

    }

    }

    }

    return false;

    }

    }

    只在使用到需要同步的数据结构时,才去synchronized(this),获取当前对象锁。另外,对于在部分流程才会使用的数据结构并不会占用当前对象锁,比如:stickyEvents,但问题就出在这里,stickyEvents是ConcurrentHashMap线程安全的,并且粒度比当前对象小。

    public classEventBus {

    private finalMap, Object>stickyEvents;

    public voidpostSticky(Object event) {

    synchronized (stickyEvents) {

    stickyEvents.put(event.getClass(), event);

    }

    // Should be posted after it is putted, in case the subscriber wants to remove immediately

    post(event);

    }

    publicTgetStickyEvent(Class eventType) {

    synchronized (stickyEvents) {

    return eventType.cast(stickyEvents.get(eventType));

    }

    }

    EventBus(EventBusBuilder builder) {

    subscriptionsByEventType=newHashMap<>();

    typesBySubscriber=newHashMap<>();

    stickyEvents = new ConcurrentHashMap<>();

    mainThreadPoster=newHandlerPoster(this, Looper.getMainLooper(),10);

    backgroundPoster=newBackgroundPoster(this);

    asyncPoster=newAsyncPoster(this);

    indexCount= builder.subscriberInfoIndexes!=null? builder.subscriberInfoIndexes.size() :0;

    subscriberMethodFinder=newSubscriberMethodFinder(builder.subscriberInfoIndexes,

    builder.strictMethodVerification, builder.ignoreGeneratedIndex);

    logSubscriberExceptions= builder.logSubscriberExceptions;

    logNoSubscriberMessages= builder.logNoSubscriberMessages;

    sendSubscriberExceptionEvent= builder.sendSubscriberExceptionEvent;

    sendNoSubscriberEvent= builder.sendNoSubscriberEvent;

    throwSubscriberException= builder.throwSubscriberException;

    eventInheritance= builder.eventInheritance;

    executorService= builder.executorService;

    }

    2.数据结构的合理使用。

    方法列表采用CopyOnWriteArrayList,CopyOnWrite容器即写时复制的容器。通俗的理解是当往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行Copy,复制出一个新的容器,然后新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器。这样做的好处是可以对CopyOnWrite容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。所以CopyOnWrite容器也是一种读写分离的思想,读和写不同的容器。缺点就是会造成内存占用双份,以及数据一致性问题(CopyOnWrite容器只能保证数据的最终一致性,不能保证数据的实时一致性。)考虑到方法只会在注册的时候进行添加,因此采用偏缓存性质的数据结构更适合于当前的应用场景。

    线程状态信息采用ThreadLocal类型进行保存,ThreadLocal会为每一个线程提供一个独立的变量副本,从而隔离了多个线程对数据的访问冲突。因为每一个线程都拥有自己的变量副本,从而也就没有必要对该变量进行同步了。

    public classEventBus {

    private finalMap,CopyOnWriteArrayList>subscriptionsByEventType;

    private finalThreadLocalcurrentPostingThreadState=newThreadLocal() {

    @Override

    protectedPostingThreadState initialValue() {

    return newPostingThreadState();

    }

    };

    总结一句话:通过synchronized与ThreadLocal的数据类型,构建出线程安全以及简洁优美的代码。

    高性能

    1.索引加速

    总线注册时只传入通过注解标记回调方法的订阅者对象。EventBus把获取订阅方法的过程放在编译时,避免运行时反射带来的性能问题

    @SupportedAnnotationTypes("org.greenrobot.eventbus.Subscribe")

    @SupportedOptions(value = {"eventBusIndex", "verbose"})

    public class EventBusAnnotationProcessor extends AbstractProcessor {

    /** Found subscriber methods for a class (without superclasses). 被注解表示的方法信息 */

    private final ListMap methodsByClass = new ListMap<>();

    private final Set classesToSkip = new HashSet<>(); // checkHasErrors检查出来的异常方法

    @Override

    public boolean process(Set annotations, RoundEnvironment env) {

    Messager messager = processingEnv.getMessager();

    try {

    String index = processingEnv.getOptions().get(OPTION_EVENT_BUS_INDEX);

    if (index == null) { // 如果没有在gradle中配置apt的argument,编译就会在这里报错

    messager.printMessage(Diagnostic.Kind.ERROR, "No option " + OPTION_EVENT_BUS_INDEX +

    " passed to annotation processor");

    return false;

    }

    /** ... */collectSubscribers(annotations, env, messager);// 根据注解拿到所有订阅者的回调方法信息checkForSubscribersToSkip(messager, indexPackage);// 筛掉不符合规则的订阅者

    if (!methodsByClass.isEmpty()) {createInfoIndexFile(index);// 生成索引类

    }

    /** 打印错误 */

    }

    /** 下面这些方法就不再贴出具体实现了,我们了解它们的功能就行 */

    private void collectSubscribers // 遍历annotations,找出所有被注解标识的方法,以初始化methodsByClass

    private boolean checkHasNoErrors // 过滤掉static,非public和参数大于1的方法

    private void checkForSubscribersToSkip // 检查methodsByClass中的各个类,是否存在非public的父类和方法参数

    /** 下面这三个方法会把methodsByClass中的信息写到相应的类中 */

    private void writeCreateSubscriberMethods

    private void createInfoIndexFile

    private void writeIndexLines

    }

    获取所有订阅者的回调方法信息之后,生成Index。运行时就可直接使用Map去查找回调方法。

    packagecom.example.wangbinlong.myapplication;

    importorg.greenrobot.eventbus.meta.SimpleSubscriberInfo;

    importorg.greenrobot.eventbus.meta.SubscriberMethodInfo;

    importorg.greenrobot.eventbus.meta.SubscriberInfo;

    importorg.greenrobot.eventbus.meta.SubscriberInfoIndex;

    importorg.greenrobot.eventbus.ThreadMode;

    importjava.util.HashMap;

    importjava.util.Map;

    /** This class is generated by EventBus, do not edit. */

    public classMyEventBusIndeximplementsSubscriberInfoIndex {

    private static final Map, SubscriberInfo>SUBSCRIBER_INDEX;

    static {

    SUBSCRIBER_INDEX= new HashMap, SubscriberInfo>();

    putIndex(new SimpleSubscriberInfo(MainActivity.class, true, new SubscriberMethodInfo[] {

    new SubscriberMethodInfo("onLogin", EventBusManager.LoginEvent.class, ThreadMode.MAIN),

    }));

    putIndex(new SimpleSubscriberInfo(SecondActivity.Inner.class, true, new SubscriberMethodInfo[] {

    new SubscriberMethodInfo("onStickyEvent", EventBusManager.StickEvent.class, ThreadMode.MAIN, 0, true),

    }));

    }

    private static voidputIndex(SubscriberInfo info) {

    SUBSCRIBER_INDEX.put(info.getSubscriberClass(), info);

    }

    @Override

    publicSubscriberInfo getSubscriberInfo(Class subscriberClass) {

    SubscriberInfo info =SUBSCRIBER_INDEX.get(subscriberClass);

    if(info !=null) {

    returninfo;

    }else{

    return null;

    }

    }

    }

    总结一句话:如果项目中通过反射可以获取的对应关系,可以通过获取编译时注解的方式进行索引加速

    2.池化技术

    EventBus的设计非常精致,但是有一个明显的缺陷:产生很多中间对象。为了最大限度地减少影响,项目中多处使用缓存,对象池。

    METHOD_CACHE缓存订阅者类型与回调方法的列表,避免重复查找

    classSubscriberMethodFinder {

    List findSubscriberMethods(Class subscriberClass) {

    List subscriberMethods =METHOD_CACHE.get(subscriberClass);

    if(subscriberMethods !=null) {

    returnsubscriberMethods;

    }

    if(ignoreGeneratedIndex) {

    subscriberMethods = findUsingReflection(subscriberClass);

    }else{

    subscriberMethods = findUsingInfo(subscriberClass);

    }

    if(subscriberMethods.isEmpty()) {

    throw newEventBusException("Subscriber "+ subscriberClass

    +" and its super classes have no public methods with the @Subscribe annotation");

    }else{

    METHOD_CACHE.put(subscriberClass, subscriberMethods);

    returnsubscriberMethods;

    }

    }

    }

    private static finalMap, List>METHOD_CACHE=newConcurrentHashMap<>();

    eventTypesCache缓存事件类型与(事件类型父类和接口)的关系

    private static finalMap, List>>eventTypesCache=newHashMap<>();

    /** Looks up all Class objects including super classes and interfaces. Should also work for interfaces. */

    private staticList> lookupAllEventTypes(Class eventClass) {

    synchronized(eventTypesCache) {

    List> eventTypes =eventTypesCache.get(eventClass);

    if(eventTypes ==null) {

    eventTypes =newArrayList<>();

    Class clazz = eventClass;

    while(clazz !=null) {

    eventTypes.add(clazz);

    addInterfaces(eventTypes, clazz.getInterfaces());

    clazz = clazz.getSuperclass();

    }

    eventTypesCache.put(eventClass, eventTypes);

    }

    returneventTypes;

    }

    }

    通过反射查找订阅方法时,将所有中间对象封装成FindState.全局维护一个FindState的对象池。使用完对象之后通过recycle()擦除痕迹。

    classSubscriberMethodFinder {

    privateList findUsingInfo(Class subscriberClass) {

    FindState findState = prepareFindState();

    findState.initForSubscriber(subscriberClass);

    while(findState.clazz!=null) {

    findState.subscriberInfo= getSubscriberInfo(findState);

    if(findState.subscriberInfo!=null) {

    SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();

    for(SubscriberMethod subscriberMethod : array) {

    if(findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {

    findState.subscriberMethods.add(subscriberMethod);

    }

    }

    }else{

    findUsingReflectionInSingleClass(findState);

    }

    findState.moveToSuperclass();

    }

    returngetMethodsAndRelease(findState);

    }

    private static finalFindState[]FIND_STATE_POOL=newFindState[POOL_SIZE];

    privateList getMethodsAndRelease(FindState findState) {

    List subscriberMethods =newArrayList<>(findState.subscriberMethods);

    findState.recycle();

    synchronized(FIND_STATE_POOL) {

    for(inti =0; i

    if(FIND_STATE_POOL[i] ==null) {

    FIND_STATE_POOL[i] = findState;

    break;

    }

    }

    }

    returnsubscriberMethods;

    }

    privateFindState prepareFindState() {

    synchronized(FIND_STATE_POOL) {

    for(inti = 0; i

    FindState state =FIND_STATE_POOL[i];

    if(state !=null) {

    FIND_STATE_POOL[i] =null;

    returnstate;

    }

    }

    }

    return newFindState();

    }

    static classFindState{

    final List subscriberMethods = new ArrayList<>();

    finalMapanyMethodByEventType=newHashMap<>();

    finalMapsubscriberClassByMethodKey=newHashMap<>();

    finalStringBuildermethodKeyBuilder=newStringBuilder(128);

    ClasssubscriberClass;

    Classclazz;

    booleanskipSuperClasses;

    SubscriberInfosubscriberInfo;

    voidinitForSubscriber(Class subscriberClass) {

    this.subscriberClass=clazz= subscriberClass;

    skipSuperClasses=false;

    subscriberInfo=null;

    }

    void recycle() {

    subscriberMethods.clear();

    anyMethodByEventType.clear();

    subscriberClassByMethodKey.clear();

    methodKeyBuilder.setLength(0);

    subscriberClass = null;

    clazz = null;

    skipSuperClasses = false;

    subscriberInfo = null;

    }

    }

    回调方法队列PendPostQueue中的元素PendingPost本身就是对象池。构造器为私有,从而只能通过obtainPendingPost从对象池获取对象,对象池中有保存的对象则获取对象(ArrayList尾部获取),没有就通过new创建。releasePendingPost则将使用后的对象归还给对象池,归还的时候要将对象的使用痕迹擦除,同时要限制对象池大小为10000,防止对象池无限增大。

    final classPendingPost {

    private final static ListpendingPostPool= new ArrayList();

    Objectevent;

    Subscriptionsubscription;

    PendingPostnext;

    privatePendingPost(Object event, Subscription subscription) {

    this.event= event;

    this.subscription= subscription;

    }

    staticPendingPost obtainPendingPost(Subscription subscription, Object event) {

    synchronized(pendingPostPool) {

    intsize =pendingPostPool.size();

    if(size >0) {

    PendingPost pendingPost =pendingPostPool.remove(size -1);

    pendingPost.event= event;

    pendingPost.subscription= subscription;

    pendingPost.next=null;

    returnpendingPost;

    }

    }

    return newPendingPost(event, subscription);

    }

    static void releasePendingPost(PendingPost pendingPost) {

    pendingPost.event = null;

    pendingPost.subscription = null;

    pendingPost.next = null;

    synchronized (pendingPostPool) {

    // Don't let the pool grow indefinitely

    if (pendingPostPool.size() < 10000) {

    pendingPostPool.add(pendingPost);

    }

    }

    }

    }

    总结一句话:通过编译时注解解决反射问题,缓存与对象池解决内存开销问题。

    EventBus是通过可配置的门面来维护事件,订阅者与回调方法之间的关系。然后按照指定线程模型调度执行。的使用简洁,线程安全,高性能的消息总线。最重要的是,它带给大家一些编程上面的思考。

    相关文章

      网友评论

          本文标题:消息总线那些事儿

          本文链接:https://www.haomeiwen.com/subject/nsmhbttx.html