美文网首页
guava的EventBus,可以用来做单体服务的消息处理。

guava的EventBus,可以用来做单体服务的消息处理。

作者: zxbyh | 来源:发表于2023-05-20 07:59 被阅读0次

    guava的EventBus,可以用来做单体服务的消息处理。

    加入pom依赖

    <dependency>
        <groupId>com.google.guava</groupId>
        <artifactId>guava</artifactId>
        <version>31.1-jre</version>
    </dependency>
    

    直接上代码

    package com.smooth.common.core.event;
    
    import com.google.common.eventbus.AllowConcurrentEvents;
    import com.google.common.eventbus.AsyncEventBus;
    import com.google.common.eventbus.Subscribe;
    
    import java.lang.annotation.Annotation;
    import java.util.concurrent.Executors;
    
    public class EventBusDemo {
    
        enum msgType{one,two,three}
    
        record MsgRecord(String id,msgType type,String info,Long timestamp){}
    
        interface ISubscriber<T>{
            public void deal(T msg) throws InterruptedException;
        }
    
        class SubscriberOne implements ISubscriber<MsgRecord> {
            // 标记当前订阅者是线程安全的,支持并发接收消息
            @AllowConcurrentEvents
            @Subscribe
            public void deal(MsgRecord msg) throws InterruptedException {
                if (msg.type.equals(msgType.one)) {
                    System.out.println("SubscriberOne >>>>>> 收到消息,"+msg);
                    Thread.sleep(600);
                    System.out.println("SubscriberOne >>>>>> 完成处理,"+msg.id);
                }
            }
    
        }
    
        class SubscriberTwo implements ISubscriber<MsgRecord> {
            // 标记当前订阅者是线程安全的,支持并发接收消息
            @AllowConcurrentEvents
            @Subscribe
            public void deal(MsgRecord msg) throws InterruptedException {
                if (msg.type.equals(msgType.two)) {
                    System.out.println("SubscriberTwo >>>>>> 收到消息,"+msg);
                    Thread.sleep(900);
                    System.out.println("SubscriberTwo >>>>>> 完成处理,"+msg.id);
                }
            }
        }
    
        public void notifySubscriber() {
            //EventBus eventBus = new EventBus();
            //todo 实际开发中不要用这个,请使用ThreadPoolExecutor创建线程池
            AsyncEventBus eventBus = new AsyncEventBus(Executors.newFixedThreadPool(5));
            /*
            //todo 实际开发中不要用这个,请使用以下方式创建线程池
            final int nThreads = Runtime.getRuntime().availableProcessors()*12; //IO密集型任务线程并不是一直在执行任务,则应配置尽可能多的线程;
            final int capacity = 10 * nThreads; //设置有界队列的容量
            AsyncEventBus eventBus = new AsyncEventBus(
                new ThreadPoolExecutor(
                    nThreads, //核心线程数 = 处理器的核数 * 期望CPU利用率0~1 * (1 + 等待时间/计算时间)
                    nThreads+10, //总线程数= 核心线程数+救急线程数 . 如果队列满了,就会创建救急线程.
                    60000, //救急线程生存时间.一分钟
                    TimeUnit.MILLISECONDS, //救急线程生存时间的单位,毫秒.
                    new ResizeableCapacityLinkedBlockingQueue<>(capacity), //设置有界队列的容量
                    new ThreadPoolExecutor.AbortPolicy() //拒绝策略, AbortPolicy()当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略:抛出异常,[注意开发人员一定要捕获异常并记录日志!]。
                )
            );
            */
            eventBus.register(new SubscriberOne());
            eventBus.register(new SubscriberTwo());
    
            eventBus.post(new MsgRecord("msg_01",msgType.one,"今天天气不错",System.currentTimeMillis()));
            eventBus.post(new MsgRecord("msg_02",msgType.two,"今天要下雨",System.currentTimeMillis()));
            eventBus.post(new MsgRecord("msg_03",msgType.two,"今天要打雷",System.currentTimeMillis()));
            eventBus.post(new MsgRecord("msg_04",msgType.three,"今天要刮风",System.currentTimeMillis()));
    
            System.out.println("message posted! ");
    
        }
    
    
    
        public static void main(String[] args) {
            new EventBusDemo().notifySubscriber();
        }
        
    }
    

    运行结果如下:

    message posted!
    SubscriberTwo >>>>>> 收到消息,EventMsg\[id=msg\_03, type=two, info=今天要打雷, timestamp=1684626583478]
    SubscriberTwo >>>>>> 收到消息,EventMsg\[id=msg\_02, type=two, info=今天要下雨, timestamp=1684626583478]
    SubscriberOne >>>>>> 收到消息,EventMsg\[id=msg\_01, type=one, info=今天天气不错, timestamp=1684626583475]
    SubscriberOne >>>>>> 完成处理,msg\_01
    SubscriberTwo >>>>>> 完成处理,msg\_03
    SubscriberTwo >>>>>> 完成处理,msg\_02
    

    然后可以看到 type=three的消息没有被处理。

    在实际的项目开发中,可以将 eventBus 设置为static
    如果是用springboot的话,可以弄成Bean

    @Configuration
    public class EventBusConfig {
        @Bean
        public AsyncEventBus asyncEventBus () {
            new AsyncEventBus(
                new ThreadPoolExecutor(
                    nThreads, //核心线程数 = 处理器的核数 * 期望CPU利用率0~1 * (1 + 等待时间/计算时间)
                    nThreads+10, //总线程数= 核心线程数+救急线程数 . 如果队列满了,就会创建救急线程.
                    60000, //救急线程生存时间.一分钟
                    TimeUnit.MILLISECONDS, //救急线程生存时间的单位,毫秒.
                    new ResizeableCapacityLinkedBlockingQueue<>(capacity), //设置有界队列的容量
                    new ThreadPoolExecutor.AbortPolicy() //拒绝策略, AbortPolicy()当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略:抛出异常,[注意开发人员一定要捕获异常并记录日志!]。
                )
            );
        }
    }
    

    相关文章

      网友评论

          本文标题:guava的EventBus,可以用来做单体服务的消息处理。

          本文链接:https://www.haomeiwen.com/subject/zhxosdtx.html