Guava 基于内存的
1, 首先使用Spring Initializr (https://github.com/spring-io/initializr/)创建一个新项目,加入Guava依赖
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>28.1-jre</version>
</dependency>
2, 配置event-bus
application.yml配置线程池参数
task:
pool:
core-pool-size: 2
max-pool-size: 12
keep-alive-seconds: 60
queue-capacity: 10000
配置AsyncEventBus
import com.google.common.eventbus.AsyncEventBus;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
public class AsynEventBusConfig {
@Value(value = "${task.pool.core-pool-size:2}")
private Integer corePoolSize;
@Value(value = "${task.pool.max-pool-size:8}")
private Integer maxPoolSize;
@Value(value = "${task.pool.keep-alive-seconds:60}")
private Integer keepAliveSeconds;
@Value(value = "${task.pool.queue-capacity:10000}")
private Integer queueCapacity;
@Bean
public AsyncEventBus asyncEventBus() {
ThreadPoolTaskExecutor executor = executor();
return new AsyncEventBus(executor);
}
@Bean
public ThreadPoolTaskExecutor executor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSize);//当前线程数
executor.setMaxPoolSize(maxPoolSize);// 最大线程数
executor.setQueueCapacity(queueCapacity);//这个值的设定应该稍微大于请求高峰值
executor.setKeepAliveSeconds(keepAliveSeconds);
executor.setWaitForTasksToCompleteOnShutdown(true);//等待任务在关机时完成--表明等待所有线程执行完
executor.setAwaitTerminationSeconds(60 * 15);// 等待时间 (默认为0,此时立即停止),并没等待xx秒后强制停止
executor.setThreadNamePrefix("asyncThread-");// 线程名称前缀
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize(); // 初始化
return executor;
}
}
3, 随便创建一个事件对象和Service类
import java.io.Serializable;
import java.time.LocalDateTime;
/**
* author: zhouliliang
* Date: 2019/11/2 18:01
* Description:
*/
public class User implements Serializable {
private static final long serialVersionUID = 2569882961592695891L;
private Long id;
private String name;
private String password;
private LocalDateTime createTime;
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public LocalDateTime getCreateTime() {
return createTime;
}
public void setCreateTime(LocalDateTime createTime) {
this.createTime = createTime;
}
@Override
public String toString() {
return "User{" +
"id=" + id +
", name='" + name + '\'' +
", password='" + password + '\'' +
", createTime=" + createTime +
'}';
}
}
Service类
import ai.guiji.eventbus.model.User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
/**
* author: zhouliliang
* Date: 2019/11/2 18:09
* Description:
*/
@Service("userService")
public class UserService {
private static final Logger logger = LoggerFactory.getLogger(UserService.class);
public void process(User user) {
user.setCreateTime(LocalDateTime.now());
logger.info("user:{}", user);
}
}
4, 事件监听类
import ai.guiji.eventbus.model.User;
import ai.guiji.eventbus.service.BorrowService;
import ai.guiji.eventbus.service.UserService;
import com.google.common.eventbus.AsyncEventBus;
import com.google.common.eventbus.Subscribe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
@Service("eventListener")
public class EventListener {
private static final Logger logger = LoggerFactory.getLogger(EventListener.class);
@Autowired
private ApplicationContext applicationContext;
@Autowired
private AsyncEventBus asyncEventBus;
@PostConstruct // 注册该类
public void register() {
asyncEventBus.register(this);
}
@Subscribe
public void userListener(User user) {
try {
UserService userService = (UserService) applicationContext.getBean("userService");
userService.process(user);
} catch (Exception e) {
logger.error("user fail", e);
}
}
@Subscribe
public void borrowListener(Borrow borrow) {
try {
BorrowService borrowService = (BorrowService) applicationContext.getBean("borrowService");
borrowService.process(borrow);
} catch (Exception e) {
logger.error("borrow fail", e);
}
}
}
5, 创建测试controller
import ai.guiji.eventbus.model.Borrow;
import ai.guiji.eventbus.model.User;
import com.google.common.base.CharMatcher;
import com.google.common.base.Splitter;
import com.google.common.eventbus.AsyncEventBus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.List;
@RestController
@RequestMapping("event")
public class EventController {
@Autowired
private AsyncEventBus asyncEventBus;
@PostMapping(path = "/user")
public void testEventBus(@RequestBody User user) {
asyncEventBus.post(user);
}
@GetMapping(path = "/borrow")
public void testEventBus2() {
List<String> amounts = Splitter.on(',')
.trimResults()
.omitEmptyStrings()
.splitToList(CharMatcher.anyOf("[]\"").removeFrom("[500,1000,1500]"));
amounts.parallelStream().forEach(n -> asyncEventBus.post(new Borrow(Double.valueOf(n))));
}
}
网友评论