美文网首页
guava的EventBus,可以用来做单体服务的消息处理。

guava的EventBus,可以用来做单体服务的消息处理。

作者: zxbyh | 来源:发表于2023-05-20 07:59 被阅读0次

guava的EventBus,可以用来做单体服务的消息处理。

加入pom依赖

<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>31.1-jre</version>
</dependency>

直接上代码

package com.smooth.common.core.event;

import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.AsyncEventBus;
import com.google.common.eventbus.Subscribe;

import java.lang.annotation.Annotation;
import java.util.concurrent.Executors;

public class EventBusDemo {

    enum msgType{one,two,three}

    record MsgRecord(String id,msgType type,String info,Long timestamp){}

    interface ISubscriber<T>{
        public void deal(T msg) throws InterruptedException;
    }

    class SubscriberOne implements ISubscriber<MsgRecord> {
        // 标记当前订阅者是线程安全的,支持并发接收消息
        @AllowConcurrentEvents
        @Subscribe
        public void deal(MsgRecord msg) throws InterruptedException {
            if (msg.type.equals(msgType.one)) {
                System.out.println("SubscriberOne >>>>>> 收到消息,"+msg);
                Thread.sleep(600);
                System.out.println("SubscriberOne >>>>>> 完成处理,"+msg.id);
            }
        }

    }

    class SubscriberTwo implements ISubscriber<MsgRecord> {
        // 标记当前订阅者是线程安全的,支持并发接收消息
        @AllowConcurrentEvents
        @Subscribe
        public void deal(MsgRecord msg) throws InterruptedException {
            if (msg.type.equals(msgType.two)) {
                System.out.println("SubscriberTwo >>>>>> 收到消息,"+msg);
                Thread.sleep(900);
                System.out.println("SubscriberTwo >>>>>> 完成处理,"+msg.id);
            }
        }
    }

    public void notifySubscriber() {
        //EventBus eventBus = new EventBus();
        //todo 实际开发中不要用这个,请使用ThreadPoolExecutor创建线程池
        AsyncEventBus eventBus = new AsyncEventBus(Executors.newFixedThreadPool(5));
        /*
        //todo 实际开发中不要用这个,请使用以下方式创建线程池
        final int nThreads = Runtime.getRuntime().availableProcessors()*12; //IO密集型任务线程并不是一直在执行任务,则应配置尽可能多的线程;
        final int capacity = 10 * nThreads; //设置有界队列的容量
        AsyncEventBus eventBus = new AsyncEventBus(
            new ThreadPoolExecutor(
                nThreads, //核心线程数 = 处理器的核数 * 期望CPU利用率0~1 * (1 + 等待时间/计算时间)
                nThreads+10, //总线程数= 核心线程数+救急线程数 . 如果队列满了,就会创建救急线程.
                60000, //救急线程生存时间.一分钟
                TimeUnit.MILLISECONDS, //救急线程生存时间的单位,毫秒.
                new ResizeableCapacityLinkedBlockingQueue<>(capacity), //设置有界队列的容量
                new ThreadPoolExecutor.AbortPolicy() //拒绝策略, AbortPolicy()当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略:抛出异常,[注意开发人员一定要捕获异常并记录日志!]。
            )
        );
        */
        eventBus.register(new SubscriberOne());
        eventBus.register(new SubscriberTwo());

        eventBus.post(new MsgRecord("msg_01",msgType.one,"今天天气不错",System.currentTimeMillis()));
        eventBus.post(new MsgRecord("msg_02",msgType.two,"今天要下雨",System.currentTimeMillis()));
        eventBus.post(new MsgRecord("msg_03",msgType.two,"今天要打雷",System.currentTimeMillis()));
        eventBus.post(new MsgRecord("msg_04",msgType.three,"今天要刮风",System.currentTimeMillis()));

        System.out.println("message posted! ");

    }



    public static void main(String[] args) {
        new EventBusDemo().notifySubscriber();
    }
    
}

运行结果如下:

message posted!
SubscriberTwo >>>>>> 收到消息,EventMsg\[id=msg\_03, type=two, info=今天要打雷, timestamp=1684626583478]
SubscriberTwo >>>>>> 收到消息,EventMsg\[id=msg\_02, type=two, info=今天要下雨, timestamp=1684626583478]
SubscriberOne >>>>>> 收到消息,EventMsg\[id=msg\_01, type=one, info=今天天气不错, timestamp=1684626583475]
SubscriberOne >>>>>> 完成处理,msg\_01
SubscriberTwo >>>>>> 完成处理,msg\_03
SubscriberTwo >>>>>> 完成处理,msg\_02

然后可以看到 type=three的消息没有被处理。

在实际的项目开发中,可以将 eventBus 设置为static
如果是用springboot的话,可以弄成Bean

@Configuration
public class EventBusConfig {
    @Bean
    public AsyncEventBus asyncEventBus () {
        new AsyncEventBus(
            new ThreadPoolExecutor(
                nThreads, //核心线程数 = 处理器的核数 * 期望CPU利用率0~1 * (1 + 等待时间/计算时间)
                nThreads+10, //总线程数= 核心线程数+救急线程数 . 如果队列满了,就会创建救急线程.
                60000, //救急线程生存时间.一分钟
                TimeUnit.MILLISECONDS, //救急线程生存时间的单位,毫秒.
                new ResizeableCapacityLinkedBlockingQueue<>(capacity), //设置有界队列的容量
                new ThreadPoolExecutor.AbortPolicy() //拒绝策略, AbortPolicy()当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略:抛出异常,[注意开发人员一定要捕获异常并记录日志!]。
            )
        );
    }
}

相关文章

  • Guava EventBus 和事件处理程序中的异常

    Guava EventBus文档说明了这一点 “一般情况下,处理程序不应该抛出.如果这样做,EventBus将捕获...

  • Guava之EventBus消息发布订阅实现

    消息发布订阅实现 guava中的EventBus在项目开发中,可以快速实现发布订阅模型,不需要我们自己去实现.下面...

  • Guava EventBus应用实例

    来个开头 EventBus是guava包中的一个事件通知组件,可以用来在同一个JVM中,实现事件通知机制。异步和同...

  • Google EventBus使用详解

    EventBus是Google.Guava提供的消息发布-订阅类库,它实现了观察者设计模式,消息通知负责人通过Ev...

  • Guava EventBus

    http://www.baeldung.com/guava-eventbus

  • EventBus - (观察者模式) 消息发布订阅类库

    1,简介 1)EventBus实现了观察者模式,是Google.Guava提供的消息发布-订阅类库。2)Multi...

  • Google Guava EventBus(事件总线)

    EventBus是Guava的事件处理机制,是设计模式中的观察者模式(生产/消费者编程模型)的优雅实现。对于...

  • Guava EventBus实现原理

    开篇 EventBus是Guava的事件处理机制,是设计模式中的观察者模式的优雅实现。对于事件监听和发布订阅模式,...

  • Guava EventBus

    我称其为单块架构的利器 前言 在设计模式中, 有一种叫做发布/订阅模式, 即某事件被发布, 订阅该事件的角色将自动...

  • Guava——EventBus

    EventBus是Guava的事件处理机制,是设计模式中的观察者模式(生产/消费者编程模型)的优雅实现。对于事件监...

网友评论

      本文标题:guava的EventBus,可以用来做单体服务的消息处理。

      本文链接:https://www.haomeiwen.com/subject/zhxosdtx.html