guava的EventBus,可以用来做单体服务的消息处理。
加入pom依赖
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>31.1-jre</version>
</dependency>
直接上代码
package com.smooth.common.core.event;
import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.AsyncEventBus;
import com.google.common.eventbus.Subscribe;
import java.lang.annotation.Annotation;
import java.util.concurrent.Executors;
public class EventBusDemo {
enum msgType{one,two,three}
record MsgRecord(String id,msgType type,String info,Long timestamp){}
interface ISubscriber<T>{
public void deal(T msg) throws InterruptedException;
}
class SubscriberOne implements ISubscriber<MsgRecord> {
// 标记当前订阅者是线程安全的,支持并发接收消息
@AllowConcurrentEvents
@Subscribe
public void deal(MsgRecord msg) throws InterruptedException {
if (msg.type.equals(msgType.one)) {
System.out.println("SubscriberOne >>>>>> 收到消息,"+msg);
Thread.sleep(600);
System.out.println("SubscriberOne >>>>>> 完成处理,"+msg.id);
}
}
}
class SubscriberTwo implements ISubscriber<MsgRecord> {
// 标记当前订阅者是线程安全的,支持并发接收消息
@AllowConcurrentEvents
@Subscribe
public void deal(MsgRecord msg) throws InterruptedException {
if (msg.type.equals(msgType.two)) {
System.out.println("SubscriberTwo >>>>>> 收到消息,"+msg);
Thread.sleep(900);
System.out.println("SubscriberTwo >>>>>> 完成处理,"+msg.id);
}
}
}
public void notifySubscriber() {
//EventBus eventBus = new EventBus();
//todo 实际开发中不要用这个,请使用ThreadPoolExecutor创建线程池
AsyncEventBus eventBus = new AsyncEventBus(Executors.newFixedThreadPool(5));
/*
//todo 实际开发中不要用这个,请使用以下方式创建线程池
final int nThreads = Runtime.getRuntime().availableProcessors()*12; //IO密集型任务线程并不是一直在执行任务,则应配置尽可能多的线程;
final int capacity = 10 * nThreads; //设置有界队列的容量
AsyncEventBus eventBus = new AsyncEventBus(
new ThreadPoolExecutor(
nThreads, //核心线程数 = 处理器的核数 * 期望CPU利用率0~1 * (1 + 等待时间/计算时间)
nThreads+10, //总线程数= 核心线程数+救急线程数 . 如果队列满了,就会创建救急线程.
60000, //救急线程生存时间.一分钟
TimeUnit.MILLISECONDS, //救急线程生存时间的单位,毫秒.
new ResizeableCapacityLinkedBlockingQueue<>(capacity), //设置有界队列的容量
new ThreadPoolExecutor.AbortPolicy() //拒绝策略, AbortPolicy()当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略:抛出异常,[注意开发人员一定要捕获异常并记录日志!]。
)
);
*/
eventBus.register(new SubscriberOne());
eventBus.register(new SubscriberTwo());
eventBus.post(new MsgRecord("msg_01",msgType.one,"今天天气不错",System.currentTimeMillis()));
eventBus.post(new MsgRecord("msg_02",msgType.two,"今天要下雨",System.currentTimeMillis()));
eventBus.post(new MsgRecord("msg_03",msgType.two,"今天要打雷",System.currentTimeMillis()));
eventBus.post(new MsgRecord("msg_04",msgType.three,"今天要刮风",System.currentTimeMillis()));
System.out.println("message posted! ");
}
public static void main(String[] args) {
new EventBusDemo().notifySubscriber();
}
}
运行结果如下:
message posted!
SubscriberTwo >>>>>> 收到消息,EventMsg\[id=msg\_03, type=two, info=今天要打雷, timestamp=1684626583478]
SubscriberTwo >>>>>> 收到消息,EventMsg\[id=msg\_02, type=two, info=今天要下雨, timestamp=1684626583478]
SubscriberOne >>>>>> 收到消息,EventMsg\[id=msg\_01, type=one, info=今天天气不错, timestamp=1684626583475]
SubscriberOne >>>>>> 完成处理,msg\_01
SubscriberTwo >>>>>> 完成处理,msg\_03
SubscriberTwo >>>>>> 完成处理,msg\_02
然后可以看到 type=three的消息没有被处理。
在实际的项目开发中,可以将 eventBus 设置为static
如果是用springboot的话,可以弄成Bean
@Configuration
public class EventBusConfig {
@Bean
public AsyncEventBus asyncEventBus () {
new AsyncEventBus(
new ThreadPoolExecutor(
nThreads, //核心线程数 = 处理器的核数 * 期望CPU利用率0~1 * (1 + 等待时间/计算时间)
nThreads+10, //总线程数= 核心线程数+救急线程数 . 如果队列满了,就会创建救急线程.
60000, //救急线程生存时间.一分钟
TimeUnit.MILLISECONDS, //救急线程生存时间的单位,毫秒.
new ResizeableCapacityLinkedBlockingQueue<>(capacity), //设置有界队列的容量
new ThreadPoolExecutor.AbortPolicy() //拒绝策略, AbortPolicy()当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略:抛出异常,[注意开发人员一定要捕获异常并记录日志!]。
)
);
}
}
网友评论