美文网首页
探究EventBus的运行原理

探究EventBus的运行原理

作者: 零星瓢虫 | 来源:发表于2020-03-22 19:40 被阅读0次

EventBus简介
EventBus是由greenrobot组织贡献的一个Android事件发布/订阅轻量级框架。它的github地址是:
https://github.com/greenrobot/EventBus

看看github上面的简(吹)介(捧):

EventBus...
simplifies the communication between components
decouples event senders and receivers
performs well with Activities, Fragments, and background threads
avoids complex and error-prone dependencies and life cycle issues
makes your code simpler
is fast
is tiny (~60k jar)
is proven in practice by apps with 1,000,000,000+ installs
has advanced features like delivery threads, subscriber priorities, etc.

EventBus简化了应用程序内各个组件之间进行通信的复杂度,尤其是碎片之间进行通信的问题,可以避免由于使用广播通信而带来的诸多不便。而且它很快,包只有60k已经安装在了1,000,000,000+个应用中广泛使用了。

EventBus的简单使用
1 gradle中添加依赖

implementation 'org.greenrobot:eventbus:3.2.0'

火星瓢虫_001.png

2 添加混淆到混淆文件中。
*-keepattributes Annotation
-keepclassmembers class * {
@org.greenrobot.eventbus.Subscribe <methods>;
}
-keep enum org.greenrobot.eventbus.ThreadMode { *; }

如果你有用到AsyncExecutor类
-keepclassmembers class * extends org.greenrobot.eventbus.util.ThrowableFailureEvent {
<init>(java.lang.Throwable);
}*

3 开始使用:
定义一个事件:

public  class MessageEvent {
    /* Additional fields if needed */
    public String message;

    public MessageEvent(String message) {
        this.message = message;
    }
 
}

准备订阅者的事件接收方法:

    @Subscribe(threadMode = ThreadMode.MAIN)
    public void onMessageEvent(MessageEvent event) {
        Toast.makeText(this,event.message,Toast.LENGTH_LONG).show();
    }

注册和解除订阅者的订阅事件:

  @Override
    protected void onStart() {
        super.onStart();
        EventBus.getDefault().register(this);
    }

    @Override
    protected void onStop() {
        super.onStop();
        EventBus.getDefault().unregister(this);
    }

发送事件:

findViewById(R.id.hello_eventBus).setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View v) {
                EventBus.getDefault().post(new MessageEvent("this is  an event"));
            }
        });

现在我们点击hello_eventBus这个控件,就是弹出Toast对话框。下面我们来一步步分析EventBus是如何实现这样的工作的:
先看一下EventBus.getDefault().register(this);这个方法;
进入EventBus类中:

 //使用单例模式构造EventBus  
    public static EventBus getDefault() {
        EventBus instance = defaultInstance;
        if (instance == null) {
            synchronized (EventBus.class) {
                instance = EventBus.defaultInstance;
                if (instance == null) {
                    instance = EventBus.defaultInstance = new EventBus();
                }
            }
        }
        return instance;
    }
 // 构建者模式
    private static final EventBusBuilder DEFAULT_BUILDER = new EventBusBuilder();

    public EventBus() {
        this(DEFAULT_BUILDER);
    }

    //初始化EventBus内部的一些组件 
    EventBus(EventBusBuilder builder) {

        logger = builder.getLogger();
        // Event为key,Subscription为Value的发布者
        subscriptionsByEventType = new HashMap<>();
        //Subscriber为key,Event为Value
        typesBySubscriber = new HashMap<>();
        //维护的粘性事件
        stickyEvents = new ConcurrentHashMap<>();
        // 提供主线程的相关组件支持
        mainThreadSupport = builder.getMainThreadSupport();
        // 下面重点分析
        mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;
        backgroundPoster = new BackgroundPoster(this);
        asyncPoster = new AsyncPoster(this);
        indexCount = builder.subscriberInfoIndexes != null ? builder.subscriberInfoIndexes.size() : 0;
        //注解方法找寻器      
        subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes,
                builder.strictMethodVerification, builder.ignoreGeneratedIndex);
        //异常打印器        
        logSubscriberExceptions = builder.logSubscriberExceptions;
        //没有订阅者是否打印日志      
        logNoSubscriberMessages = builder.logNoSubscriberMessages;
        // 是否发送异常处理  事件   
        sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent;
        //没有事件处理 是否发送      NoSubscriberEvent事件  
        sendNoSubscriberEvent = builder.sendNoSubscriberEvent;
        //是否抛出异常      
        throwSubscriberException = builder.throwSubscriberException;
        eventInheritance = builder.eventInheritance;
        executorService = builder.executorService;
    }

可以看到EventBus使用了单例模式创建,同时使用了构建者模式创建了一些必要的组件集合,接下来我们逐步分析一下一些重要的组件:
mainThreadSupport = builder.getMainThreadSupport();
可以看出是对主线程的一些组件的支持。

// 获取looper
  MainThreadSupport getMainThreadSupport() {
        if (mainThreadSupport != null) {
            return mainThreadSupport;
        } else if (AndroidLogger.isAndroidLogAvailable()) {
            Object looperOrNull = getAndroidMainLooperOrNull();
            return looperOrNull == null ? null :
                    new MainThreadSupport.AndroidHandlerMainThreadSupport((Looper) looperOrNull);
        } else {
            return null;
        }
    }

   static Object getAndroidMainLooperOrNull() {
        try {
            return Looper.getMainLooper();
        } catch (RuntimeException e) {
            // Not really a functional Android (e.g. "Stub!" maven dependencies)
            return null;
        }
    }

public interface MainThreadSupport {

    boolean isMainThread();

    Poster createPoster(EventBus eventBus);

    class AndroidHandlerMainThreadSupport implements MainThreadSupport {

        private final Looper looper;

        public AndroidHandlerMainThreadSupport(Looper looper) {
            this.looper = looper;
        }

        @Override
        public boolean isMainThread() {
            return looper == Looper.myLooper();
        }

        @Override
        public Poster createPoster(EventBus eventBus) {
            return new HandlerPoster(eventBus, looper, 10);
        }
    }

}

这里的流程主要获取了主线程需要的looper。
mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;

public class HandlerPoster extends Handler implements Poster {
    //PendingPostQueue 队列
    private final PendingPostQueue queue;
    private final int maxMillisInsideHandleMessage;
    private final EventBus eventBus;
    private boolean handlerActive;

    protected HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
        super(looper);
        this.eventBus = eventBus;
        this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
        queue = new PendingPostQueue();
    }

    public void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
          // 队列信息指向下一个
            queue.enqueue(pendingPost);
            if (!handlerActive) {
                handlerActive = true;
                // 发送消息
                if (!sendMessage(obtainMessage())) {
                    throw new EventBusException("Could not send handler message");
                }
            }
        }
    }

    @Override
    public void handleMessage(Message msg) {
        boolean rescheduled = false;
        try {
            long started = SystemClock.uptimeMillis();
            while (true) {
              // 从头开始取,一个一个往下取PendingPost
                PendingPost pendingPost = queue.poll();
                if (pendingPost == null) {
                    synchronized (this) {
                        // Check again, this time in synchronized
                        pendingPost = queue.poll();
                        if (pendingPost == null) {
                            handlerActive = false;
                            return;
                        }
                    }
                }
            // 订阅者处理pendingPost事件
                eventBus.invokeSubscriber(pendingPost);
                long timeInMethod = SystemClock.uptimeMillis() - started;
        //时间大于最大的处理时间了  则返回并设置相关数据
                if (timeInMethod >= maxMillisInsideHandleMessage) {
                    if (!sendMessage(obtainMessage())) {
                        throw new EventBusException("Could not send handler message");
                    }
                    rescheduled = true;
                    return;
                }
            }
        } finally {
            handlerActive = rescheduled;
        }
    }
}
final class PendingPost {
    private final static List<PendingPost> pendingPostPool = new ArrayList<PendingPost>();

    Object event;
    Subscription subscription;
    PendingPost next;

    private PendingPost(Object event, Subscription subscription) {
        this.event = event;
        this.subscription = subscription;
    }

// 获取到最上面的事件
    static PendingPost obtainPendingPost(Subscription subscription, Object event) {
        synchronized (pendingPostPool) {
            int size = pendingPostPool.size();
            if (size > 0) {
                PendingPost pendingPost = pendingPostPool.remove(size - 1);
                pendingPost.event = event;
                pendingPost.subscription = subscription;
                pendingPost.next = null;
                return pendingPost;
            }
        }
        return new PendingPost(event, subscription);
    }
  // 重置事件
    static void releasePendingPost(PendingPost pendingPost) {
        pendingPost.event = null;
        pendingPost.subscription = null;
        pendingPost.next = null;
        synchronized (pendingPostPool) {
            // Don't let the pool grow indefinitely
            if (pendingPostPool.size() < 10000) {
                pendingPostPool.add(pendingPost);
            }
        }
    }

}

从上面HandlerPoster类中可以看到,enqueue()方法会获取到PendingPostpendingPostPool池中取到一个PendingPost事件对象,然后通过sendMessage(obtainMessage())会发送事件,然后看到handleMessage()处理事件的方法中,取PengdingPost方法,并调用相关订阅者进行处理。同时可以看到当处理的事件大于最大设置时间了则返回不再进行处理了。下面代码描述了PendingPostQueue的相关方法,可以同时进行参考:

final class PendingPostQueue {
    private PendingPost head;
    private PendingPost tail;
    
// 添加PendingPost事件
    synchronized void enqueue(PendingPost pendingPost) {
        if (pendingPost == null) {
            throw new NullPointerException("null cannot be enqueued");
        }
        if (tail != null) {
            tail.next = pendingPost;
            tail = pendingPost;
        } else if (head == null) {
            head = tail = pendingPost;
        } else {
            throw new IllegalStateException("Head present, but no tail");
        }
        notifyAll();
    }

// 取出PengdingPost并向后移动一位
    synchronized PendingPost poll() {
        PendingPost pendingPost = head;
        if (head != null) {
            head = head.next;
            if (head == null) {
                tail = null;
            }
        }
        return pendingPost;
    }

    synchronized PendingPost poll(int maxMillisToWait) throws InterruptedException {
        if (head == null) {
            wait(maxMillisToWait);
        }
        return poll();
    }

}

以上结合HandlerPoster 、PendingPostQueue 、PendingPost类可以看出 mainThreadPoster 主要是维护了事件的集合,同时能够使用handler处理事件。

接下来 backgroundPoster = new BackgroundPoster(this)类似于mainThreadPoster

final class BackgroundPoster implements Runnable, Poster {

    private final PendingPostQueue queue;
    private final EventBus eventBus;

    private volatile boolean executorRunning;

    BackgroundPoster(EventBus eventBus) {
        this.eventBus = eventBus;
        queue = new PendingPostQueue();
    }
    // 添加事件
    public void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            queue.enqueue(pendingPost);
            if (!executorRunning) {
                executorRunning = true;
                eventBus.getExecutorService().execute(this);
            }
        }
    }
  // 从队列取出事件去执行
    @Override
    public void run() {
        try {
            try {
                while (true) {
                    PendingPost pendingPost = queue.poll(1000);
                    if (pendingPost == null) {
                        synchronized (this) {
                            // Check again, this time in synchronized
                            pendingPost = queue.poll();
                            if (pendingPost == null) {
                                executorRunning = false;
                                return;
                            }
                        }
                    }
                    eventBus.invokeSubscriber(pendingPost);
                }
            } catch (InterruptedException e) {
                eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted", e);
            }
        } finally {
            executorRunning = false;
        }
    }

}

可以看出backgroundPoster 实现了Runnable ,在存入事件了之后会去从事件池中依次取事件执行。既invokeSubscriber()方法;

同理 asyncPoster = new AsyncPoster(this):

class AsyncPoster implements Runnable, Poster {

    private final PendingPostQueue queue;
    private final EventBus eventBus;

    AsyncPoster(EventBus eventBus) {
        this.eventBus = eventBus;
        queue = new PendingPostQueue();
    }

    public void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        queue.enqueue(pendingPost);
        eventBus.getExecutorService().execute(this);
    }

    @Override
    public void run() {
        PendingPost pendingPost = queue.poll();
        if(pendingPost == null) {
            throw new IllegalStateException("No pending post available");
        }
        eventBus.invokeSubscriber(pendingPost);
    }

}

这里asyncPoster 只会去执行取出的第一个事件。
这里已经把EventBus进行初始化的一些东西基本查看完毕。

接下来我们继续看下进行注解的方法:

    @Subscribe(threadMode = ThreadMode.MAIN)
    public void onMessageEvent(MessageEvent event) {
        Toast.makeText(this, event.message, Toast.LENGTH_LONG).show();
    }

View层进行注解

@Documented
@Retention(RetentionPolicy.RUNTIME) // 运行期间有效
@Target({ElementType.METHOD}) // 方法注解
public @interface Subscribe {
    // 处理事件的所在线程
    ThreadMode threadMode() default ThreadMode.POSTING;

    /**
     * If true, delivers the most recent sticky event (posted with
     * {@link EventBus#postSticky(Object)}) to this subscriber (if event available).
     */
    // 是否是粘性事件,
    boolean sticky() default false;

    /** Subscriber priority to influence the order of event delivery.
     * Within the same delivery thread ({@link ThreadMode}), higher priority subscribers will receive events before
     * others with a lower priority. The default priority is 0. Note: the priority does *NOT* affect the order of
     * delivery among subscribers with different {@link ThreadMode}s! */
    int priority() default 0;
}

几种处理事件的线程方式:

public enum ThreadMode {
    /**
     * Subscriber will be called directly in the same thread, which is posting the event. This is the default. Event delivery
     * implies the least overhead because it avoids thread switching completely. Thus this is the recommended mode for
     * simple tasks that are known to complete in a very short time without requiring the main thread. Event handlers
     * using this mode must return quickly to avoid blocking the posting thread, which may be the main thread.
     */
    POSTING, // 发送event事件的线程进行处理

    /**
     * On Android, subscriber will be called in Android's main thread (UI thread). If the posting thread is
     * the main thread, subscriber methods will be called directly, blocking the posting thread. Otherwise the event
     * is queued for delivery (non-blocking). Subscribers using this mode must return quickly to avoid blocking the main thread.
     * If not on Android, behaves the same as {@link #POSTING}.
     */
    MAIN, // 主线程处理,会阻塞 需要防止阻塞主线程

    /**
     * On Android, subscriber will be called in Android's main thread (UI thread). Different from {@link #MAIN},
     * the event will always be queued for delivery. This ensures that the post call is non-blocking.
     */
    MAIN_ORDERED,// 主线程处理,不会在主线程阻塞

    /**
     * On Android, subscriber will be called in a background thread. If posting thread is not the main thread, subscriber methods
     * will be called directly in the posting thread. If the posting thread is the main thread, EventBus uses a single
     * background thread, that will deliver all its events sequentially. Subscribers using this mode should try to
     * return quickly to avoid blocking the background thread. If not on Android, always uses a background thread.
     */
    BACKGROUND, // 会在后台线程处理事件

    /**
     * Subscriber will be called in a separate thread. This is always independent from the posting thread and the
     * main thread. Posting events never wait for subscriber methods using this mode. Subscriber methods should
     * use this mode if their execution might take some time, e.g. for network access. Avoid triggering a large number
     * of long running asynchronous subscriber methods at the same time to limit the number of concurrent threads. EventBus
     * uses a thread pool to efficiently reuse threads from completed asynchronous subscriber notifications.
     */
    ASYNC // 会创建一个新的线程处理
}

可以看到这里注解主要定义了运行区间以及处理事件的线程,这里的线程和之前分析的
mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;
backgroundPoster = new BackgroundPoster(this);
asyncPoster = new AsyncPoster(this);

相互呼应;

register()方法进行订阅

EventBus.getDefault().register(this);
查看register()方法:

 public void register(Object subscriber) {
        // 获取当前class的类
        Class<?> subscriberClass = subscriber.getClass();
         // 获取订阅的方法
        List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
        synchronized (this) {
            // 便利循环去订阅方法
            for (SubscriberMethod subscriberMethod : subscriberMethods) {
                subscribe(subscriber, subscriberMethod);
            }
        }
    }

register()方法会去获取当前class类的所有的订阅方法:
List<SubscriberMethod> subscriberMethods =subscriberMethodFinder.findSubscriberMethods(subscriberClass);
如何去获取的呢?

  List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
        // 通过缓存去取
        List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
        if (subscriberMethods != null) {
            return subscriberMethods;
        }

        if (ignoreGeneratedIndex) {// 默认为false
            subscriberMethods = findUsingReflection(subscriberClass);
        } else {
            // 去取订阅方法
            subscriberMethods = findUsingInfo(subscriberClass);
        }
        if (subscriberMethods.isEmpty()) {
            throw new EventBusException("Subscriber " + subscriberClass
                    + " and its super classes have no public methods with the @Subscribe annotation");
        } else {
            // 放入缓存中
            METHOD_CACHE.put(subscriberClass, subscriberMethods);
            return subscriberMethods;
        }
    }

上面流程可以看到,如果缓存中有则去缓存中取出数据,如果没有,通过subscriberMethods = findUsingInfo(subscriberClass);方法继续调取数据;

 private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
        // FindState 对象获取
        FindState findState = prepareFindState();
        // 赋值Class
        findState.initForSubscriber(subscriberClass);
        while (findState.clazz != null) {
            // 获取subscriberInfo 
            findState.subscriberInfo = getSubscriberInfo(findState);
            if (findState.subscriberInfo != null) {
                // 获取方法集合
                SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
                for (SubscriberMethod subscriberMethod : array) {
                    // 检查方法 
                    if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
                              // 添加方法到findState
                              findState.subscriberMethods.add(subscriberMethod);
                    }
                }
            } else {
                findUsingReflectionInSingleClass(findState);
            }
            findState.moveToSuperclass();
        }
        // 释放相关资源
        return getMethodsAndRelease(findState);
    }

到这里我们看到主要是FindState对象进行相关数据操作;

   static class FindState {
        // 订阅的方法集合
        final List<SubscriberMethod> subscriberMethods = new ArrayList<>();
        // EventType事件对应的方法的HashMap集合
        final Map<Class, Object> anyMethodByEventType = new HashMap<>();
        // methodKey对应的class的HashMap集合
        final Map<String, Class> subscriberClassByMethodKey = new HashMap<>();
        final StringBuilder methodKeyBuilder = new StringBuilder(128);

        Class<?> subscriberClass;
        Class<?> clazz;
        boolean skipSuperClasses;
        SubscriberInfo subscriberInfo;

看到FindState 里面定义了一些集合参数;
继续看 FindState findState = prepareFindState();

    private FindState prepareFindState() {
        synchronized (FIND_STATE_POOL) {
            for (int i = 0; i < POOL_SIZE; i++) {
                FindState state = FIND_STATE_POOL[i];
                if (state != null) {
                    FIND_STATE_POOL[i] = null;
                    return state;
                }
            }
        }
        return new FindState();
    }

这里从FindState池中获取FindState的对象,如果没有则新建;

初始化findState
findState.initForSubscriber(subscriberClass);

 void initForSubscriber(Class<?> subscriberClass) {
          // 赋值class
            this.subscriberClass = clazz = subscriberClass;
            skipSuperClasses = false;
            subscriberInfo = null;
        }

继续看 findState.subscriberInfo 为空的情况,执行findUsingReflectionInSingleClass(findState);方法,同时每次执行完后findState.moveToSuperclass();去检测父类方法:

  @Subscribe(threadMode = ThreadMode.MAIN_ORDERED)
    public  void onHuhaiqiangEvent(HuhaiqiangEvent event){
    }

    private void findUsingReflectionInSingleClass(SubscriberMethodFinder.FindState findState) {
        Method[] methods;
        try {
            // This is faster than getMethods, especially when subscribers are fat classes like Activities
            // 获取所有的方法
            methods = findState.clazz.getDeclaredMethods();
        } catch (Throwable th) {
            // Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149
            try {
                // 获取所有的方法
                methods = findState.clazz.getMethods();
            } catch (LinkageError error) { // super class of NoClassDefFoundError to be a bit more broad...
                String msg = "Could not inspect methods of " + findState.clazz.getName();
                if (ignoreGeneratedIndex) {
                    msg += ". Please consider using EventBus annotation processor to avoid reflection.";
                } else {
                    msg += ". Please make this class visible to EventBus annotation processor to avoid reflection.";
                }
                throw new EventBusException(msg, error);
            }
            findState.skipSuperClasses = true;
        }
        // 遍历方法
        for (Method method : methods) {
            int modifiers = method.getModifiers();
            // 筛选Public方法 以及 注册的方法
            if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
                Class<?>[] parameterTypes = method.getParameterTypes();
                // 筛选参数长度为1的
                if (parameterTypes.length == 1) {
                    Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
                    if (subscribeAnnotation != null) {
                        Class<?> eventType = parameterTypes[0];
                        // 检测相关方法事件
                        if (findState.checkAdd(method, eventType)) {
                            ThreadMode threadMode = subscribeAnnotation.threadMode();
                            // 添加到findstate中
                            findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
                                    subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
                        }
                    }
                } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                    String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                    throw new EventBusException("@Subscribe method " + methodName +
                            "must have exactly 1 parameter but has " + parameterTypes.length);
                }
            } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                throw new EventBusException(methodName +
                        " is a illegal @Subscribe method: must be public, non-static, and non-abstract");
            }
        }
    }

可以查看到,在获取到类中所有的方法之后,会根据注册的方法以及参数长度等进行筛选,筛选完成之后会将相关数据整合成SubscriberMethod类加入到findState对象中;

  public SubscriberMethod(Method method, Class<?> eventType, ThreadMode threadMode, int priority, boolean sticky) {
        this.method = method;// 方法
        this.threadMode = threadMode;// 处理线程
        this.eventType = eventType;//事件
        this.priority = priority;//优先级
        this.sticky = sticky;//是否是粘性事件
    }

以上为findState.subscriberInfo为空的情况,同时这里有个findState.checkAdd(method, eventType)方法还未分析,当findState.subscriberInfo不为空的时候呢?我们看到取出相应方法后同时也调用了findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)方法做了筛选而加入到findState中;

      boolean checkAdd(Method method, Class<?> eventType) {
            // 2 level check: 1st level with event type only (fast), 2nd level with complete signature when required.
            // Usually a subscriber doesn't have methods listening to the same event type.
            Object existing = anyMethodByEventType.put(eventType, method);
            if (existing == null) {
                return true;
            } else {
                if (existing instanceof Method) {
                    if (!checkAddWithMethodSignature((Method) existing, eventType)) {
                        // Paranoia check
                        throw new IllegalStateException();
                    }
                    // Put any non-Method object to "consume" the existing Method
                    anyMethodByEventType.put(eventType, this);
                }
                return checkAddWithMethodSignature(method, eventType);
            }
        }

上面有说到寻找方法的时候同时会便利父类,这里可以看到如果已经存在注册的事件(即父类和子类都进行了事件注册)会调用checkAddWithMethodSignature((Method) existing, eventType))方法进行判断:

     private boolean checkAddWithMethodSignature(Method method, Class<?> eventType) {
            methodKeyBuilder.setLength(0);
            methodKeyBuilder.append(method.getName());
            methodKeyBuilder.append('>').append(eventType.getName());

            String methodKey = methodKeyBuilder.toString();
            Class<?> methodClass = method.getDeclaringClass();
            Class<?> methodClassOld = subscriberClassByMethodKey.put(methodKey, methodClass);
            // 如果没注册过method或者没有继承关系 则返回true
            if (methodClassOld == null || methodClassOld.isAssignableFrom(methodClass)) {
                // Only add if not already found in a sub class
                return true;
            } else {
             // 否则还是用之前的method 并返回false
                // Revert the put, old class is further down the class hierarchy
                subscriberClassByMethodKey.put(methodKey, methodClassOld);
                return false;
            }
        }

可以看到,subscriberClassByMethodKey回存储最开始的method,也就是子类绑定的事件。
到这里findUsingInfo()已经基本分析完成,还剩下最后* return getMethodsAndRelease(findState);*方法,返回相应的数据,并且释放资源:

    private List<SubscriberMethod> getMethodsAndRelease(FindState findState) {
        List<SubscriberMethod> subscriberMethods = new ArrayList<>(findState.subscriberMethods);
         //释放资源
        findState.recycle();
        // 赋值数据 并返回
        synchronized (FIND_STATE_POOL) {
            for (int i = 0; i < POOL_SIZE; i++) {
                if (FIND_STATE_POOL[i] == null) {
                    FIND_STATE_POOL[i] = findState;
                    break;
                }
            }
        }
        return subscriberMethods;
    }


  void recycle() {
            subscriberMethods.clear();
            anyMethodByEventType.clear();
            subscriberClassByMethodKey.clear();
            methodKeyBuilder.setLength(0);
            subscriberClass = null;
            clazz = null;
            skipSuperClasses = false;
            subscriberInfo = null;
        }

到这里 EventBus.getDefault().register(this);逻辑已经获取到了负责监听事件的方法,接下来register方法中:
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}

subscribe(subscriber, subscriberMethod)对获取的方法进行绑定:

   // Must be called in synchronized block
    private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
        // 获取事件类型 例如 MessageEvent
        Class<?> eventType = subscriberMethod.eventType;
        //通过subscriber(例如:activity)和 subscriberMethod(例如:onMessageEvent)组装Subscription对象
        Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
        // key:EventType  value:subscriptions查找subscriptions
        CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
        if (subscriptions == null) { // 为空新建并加入subscriptions,
            subscriptions = new CopyOnWriteArrayList<>();
            subscriptionsByEventType.put(eventType, subscriptions);
        } else {
            if (subscriptions.contains(newSubscription)) { // 不为空且已经有该newSubscription报错
                throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
                        + eventType);
            }
        }

        // 循环遍历 按照优先级加入到subscriptions列表中
        int size = subscriptions.size();
        for (int i = 0; i <= size; i++) {
            if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
                subscriptions.add(i, newSubscription);
                break;
            }
        }

        // key :subscriber(例如:activity) value:eventType(例如:MessageEvent) 进行添加数据
        List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
        if (subscribedEvents == null) {
            subscribedEvents = new ArrayList<>();
            typesBySubscriber.put(subscriber, subscribedEvents);
        }
        subscribedEvents.add(eventType);


        // 粘性事件会直接去执行相关响应
        if (subscriberMethod.sticky) {
            if (eventInheritance) {
                // Existing sticky events of all subclasses of eventType have to be considered.
                // Note: Iterating over all events may be inefficient with lots of sticky events,
                // thus data structure should be changed to allow a more efficient lookup
                // (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
                Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
                for (Map.Entry<Class<?>, Object> entry : entries) {
                    Class<?> candidateEventType = entry.getKey();
                    if (eventType.isAssignableFrom(candidateEventType)) {
                        Object stickyEvent = entry.getValue();
                        checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                    }
                }
            } else {
                Object stickyEvent = stickyEvents.get(eventType);
                checkPostStickyEventToSubscription(newSubscription, stickyEvent);
            }
        }
    }

subscribe(subscriber, subscriberMethod);方法,通过获取到的注解的方法,去进行了相关数据的赋值,相关优先级分配等等;同时对于粘性事件来说调用checkPostStickyEventToSubscription(newSubscription, stickyEvent);执行注解方法的监听响应:

 private void checkPostStickyEventToSubscription(Subscription newSubscription, Object stickyEvent) {
        if (stickyEvent != null) {
            // If the subscriber is trying to abort the event, it will fail (event is not tracked in posting state)
            // --> Strange corner case, which we don't take care of here.
            postToSubscription(newSubscription, stickyEvent, isMainThread());
        }
    }

  private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
        switch (subscription.subscriberMethod.threadMode) {
            case POSTING:
                invokeSubscriber(subscription, event);
                break;
            case MAIN:
                if (isMainThread) {
                    invokeSubscriber(subscription, event);
                } else {
                    mainThreadPoster.enqueue(subscription, event);
                }
                break;
            case MAIN_ORDERED:
                if (mainThreadPoster != null) {
                    mainThreadPoster.enqueue(subscription, event);
                } else {
                    // temporary: technically not correct as poster not decoupled from subscriber
                    invokeSubscriber(subscription, event);
                }
                break;
            case BACKGROUND:
                if (isMainThread) {
                    backgroundPoster.enqueue(subscription, event);
                } else {
                    invokeSubscriber(subscription, event);
                }
                break;
            case ASYNC:
                asyncPoster.enqueue(subscription, event);
                break;
            default:
                throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
        }
    }

针对注解定义的线程和当前线程比较进行判断进行分别调用,invokeSubscriber(subscription, event);会直接执行相关的事件响应,而相关对应的enqueue()方法会先加入到对应消息池中进行一次调用。

到这里 EventBus.getDefault().register(this);的相关逻辑已经走完了,总结一下它做了哪些工作:
1 初始化相关数据,mainThreadPoster、backgroundPoster、asyncPoster、typesBySubscriber、subscriptionsByEventType等;
2 获取注解的相关事件方法,并赋值到初始化数据中;
3 如果是粘性事件及时进行事件的响应;

可以看到其实这里注册做的事情很简单,就是初始化我们注解的数据,以便供后续时候拿来进行使用。同时这里运用了大量的反射的方法,这也正式EventBus注解的核心之处。

接下来,真正发送事件的地方;
EventBus.getDefault().post(new MessageEvent("this is an event"));

    private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
        @Override
        protected PostingThreadState initialValue() {
            return new PostingThreadState();
        }
    };


    /** Posts the given event to the event bus. */
    public void post(Object event) {
        PostingThreadState postingState = currentPostingThreadState.get();
        List<Object> eventQueue = postingState.eventQueue;
        eventQueue.add(event);

        if (!postingState.isPosting) {// 未发送
            postingState.isMainThread = isMainThread();
            postingState.isPosting = true;
            if (postingState.canceled) {
                throw new EventBusException("Internal error. Abort state was not reset");
            }
            try {
                while (!eventQueue.isEmpty()) {// 从事件队列取事件进行发送
                    postSingleEvent(eventQueue.remove(0), postingState);
                }
            } finally {// 重置状态
                postingState.isPosting = false;
                postingState.isMainThread = false;
            }
        }
    }

   /** For ThreadLocal, much faster to set (and get multiple values). */
    final static class PostingThreadState {
        final List<Object> eventQueue = new ArrayList<>();
        boolean isPosting;// 是否发送了
        boolean isMainThread;//是否在主线程发送
        Subscription subscription;
        Object event;
        boolean canceled;
    }

通过ThreadLocal线程管理类获取到PostingThreadState;同时可以看到 PostingThreadState管理了发送事件的相关信息,同时PostingThreadState维护了一个事件队列集合,可以取出所有事件队列进行发送, postSingleEvent(eventQueue.remove(0), postingState);

 private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
        Class<?> eventClass = event.getClass();
        boolean subscriptionFound = false;
        if (eventInheritance) {// 遍历出事件event的父类  
            List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
            int countTypes = eventTypes.size();
            for (int h = 0; h < countTypes; h++) {
                Class<?> clazz = eventTypes.get(h);
                subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
            }
        } else {
            subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
        }
        if (!subscriptionFound) { // 没有发现有处理事件 打印日志 发送NoSubscriberEvent事件
            if (logNoSubscriberMessages) {
                logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
            }
            if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
                    eventClass != SubscriberExceptionEvent.class) {
                post(new NoSubscriberEvent(this, event));
            }
        }
    }

通过event事件的class查找其包含父类的event的事件,同时调用postSingleEventForEventType(event, postingState, clazz);发送事件:

   private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
        CopyOnWriteArrayList<Subscription> subscriptions;
        synchronized (this) {// 通过event事件查找到对应subscriptions即对事件监听的相关方法的数据结构(对应的类,方法名等,之前event注册的时候提到过)
            subscriptions = subscriptionsByEventType.get(eventClass);
        }
        if (subscriptions != null && !subscriptions.isEmpty()) {
            for (Subscription subscription : subscriptions) {
                postingState.event = event;
                postingState.subscription = subscription;
                boolean aborted;
                try {
                    postToSubscription(subscription, event, postingState.isMainThread);
                    aborted = postingState.canceled;
                } finally { // 重置参数
                    postingState.event = null;
                    postingState.subscription = null;
                    postingState.canceled = false;
                }
                if (aborted) {
                    break;
                }
            }
            return true;
        }
        return false;
    }

postSingleEventForEventType可以看到找到对应绑定事件的数据结构(方法、所在类);然后通过postToSubscription(subscription, event, postingState.isMainThread);方法去响应事件:

    private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
        switch (subscription.subscriberMethod.threadMode) {
            case POSTING:
                invokeSubscriber(subscription, event);
                break;
            case MAIN:
                if (isMainThread) {
                    invokeSubscriber(subscription, event);
                } else {
                    mainThreadPoster.enqueue(subscription, event);
                }
                break;
            case MAIN_ORDERED:
                if (mainThreadPoster != null) {
                    mainThreadPoster.enqueue(subscription, event);
                } else {
                    // temporary: technically not correct as poster not decoupled from subscriber
                    invokeSubscriber(subscription, event);
                }
                break;
            case BACKGROUND:
                if (isMainThread) {
                    backgroundPoster.enqueue(subscription, event);
                } else {
                    invokeSubscriber(subscription, event);
                }
                break;
            case ASYNC:
                asyncPoster.enqueue(subscription, event);
                break;
            default:
                throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
        }
    }

postToSubscription方法则在EventBus注册的时候处理粘性时间提到了,会根据绑定的线程对事件进行相关处理。这里可以简单看下主线程处理的逻辑:

定义事件在主线程处理,如果当前post就在主线程直接处理,否则mainThreadPoster处理;其实这里mainThreadPoster里面使用的Looper即为主线程Looper;

到这里,EventBus的注册,到事件的处理整个基本流程就已经走完了;这里可以总结一下哪些重要步骤以及主要都是做了什么工作:
1 EventBus.getDefault();初始化一些主线程、后台线程处理类相关数据操作;
2 EventBus.getDefault().register();去查找项目中注解的方法,方法类型,方法名,注解所在类等集合数据,存储在本地并返回;同时粘性事件会及时去响应;
3 EventBus.getDefault().post(new MessageEvent("this is an event"));发送事件,会去比对前面过程获取的数据。并针对注解定义的处理线程结合1操作给予对应响应。

同时,相关类做了一些主要的方法操作如下图所示:


火星瓢虫_003.png

到这里EventBus的主要流程以及方法已经总结完了,可以看到EventBus主要用了java里面的反射方法以及Android中的handler处理机制,这里进行过多研究。
如果对handler有所疑惑;可查看探究Handler的运行机制.

相关文章

网友评论

      本文标题:探究EventBus的运行原理

      本文链接:https://www.haomeiwen.com/subject/ajwbshtx.html