Guava的事件总线EventBus库是事件发布订阅模式的实现,让我们能在领域驱动设计(DDD)中以事件的弱引用本质对我们的模块和领域边界很好的解耦设计。下面是常用的异步事件的例子:
1、先初始化一个AsyncEventBus
import com.google.common.eventbus.AsyncEventBus;
import com.google.common.eventbus.EventBus;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.Executors;
@Configuration
public class EventBusConfig {
// @Bean
// public EventBus eventBus(){
// return new EventBus();
// }
@Bean
public AsyncEventBus asyncEventBus(){
return new AsyncEventBus(Executors.newFixedThreadPool(10));
}
}
注:AsyncEventBus是异步的,EventBus不是
2、事件发布
import com.google.common.eventbus.AsyncEventBus;
import com.google.common.eventbus.EventBus;
import com.xxx.vo.ExpBucketUserVo;
import com.xxx.vo.ExperimentBucketUserCountVo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
@Slf4j
@Service
public class EventHandler {
// @Autowired
// private EventBus eventBus;
@Autowired
private AsyncEventBus asyncEventBus;
@Autowired
private EventListener eventListener;
// @PostConstruct
// public void init() {
// eventBus.register(eventListener);
// }
//
// @PreDestroy
// public void destroy() {
// eventBus.unregister(eventListener);
// }
@PostConstruct
public void init() {
asyncEventBus.register(eventListener);
}
@PreDestroy
public void destroy() {
asyncEventBus.unregister(eventListener);
}
public void eventPost(ExpBucketUserVo expBucketUserVo){
// eventBus.post(expBucketUserVo);
asyncEventBus.post(expBucketUserVo);
log.info("eventBus post event {}",expBucketUserVo);
}
public void eventPost(ExperimentBucketUserCountVo experimentBucketUserCountVo){
// eventBus.post(experimentBucketUserCountVo);
asyncEventBus.post(experimentBucketUserCountVo);
log.info("eventBus post event {}",experimentBucketUserCountVo);
}
}
3、事件订阅
import com.google.common.eventbus.Subscribe;
import com.xxx.persistence.dao.AbExpBucketUserCountMapper;
import com.xxx.persistence.dao.AbExpBucketUserMapper;
import com.xxx.persistence.entity.ExperimentBucketUser;
import com.xxx.persistence.entity.ExperimentBucketUserCount;
import com.xxx.service.convert.ExpBucketUserCountResponseConverter;
import com.xxx.service.convert.ExpBucketUserResponseConverter;
import com.xxx.vo.ExpBucketUserVo;
import com.xxx.vo.ExperimentBucketUserCountVo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class EventListener {
@Autowired
private AbExpBucketUserMapper abExpBucketUserMapper;
@Autowired
private AbExpBucketUserCountMapper abExpBucketUserCountMapper;
@Subscribe
public void onMessageEvent(ExpBucketUserVo expBucketUserVo) {
try {
//CommonResponse<Integer> response = expBucketUserService.insert(expBucketUserVo);
ExperimentBucketUser experimentBucketUser = ExpBucketUserResponseConverter.INSTANCE.convertFrom(expBucketUserVo);
Integer i = abExpBucketUserMapper.insert(experimentBucketUser);
log.info("sync insert into ExperimentBucketUser,param={},result={}",expBucketUserVo,i);
}catch (Exception e){
log.error("sync insert into ExperimentBucketUser error,param={},error={}",expBucketUserVo,e);
}
}
@Subscribe
public void onMessageEvent(ExperimentBucketUserCountVo expBucketUserCountVo) {
try {
//CommonResponse<Integer> response = expBucketUserCountService.incrementCountBy(experimentBucketUserCountVo);
Integer i= abExpBucketUserCountMapper.incrementCountBy(expBucketUserCountVo.getExperimentId(),expBucketUserCountVo.getBucketId());
if(i<1){
//说明没有初始化
ExperimentBucketUserCount expBucketUserCount = ExpBucketUserCountResponseConverter.INSTANCE
.convertFrom(expBucketUserCountVo);
i= abExpBucketUserCountMapper.insert(expBucketUserCount);
}
log.info("sync insert into ExperimentBucketUserCount,param={},result={}",expBucketUserCountVo,i);
}catch (Exception e){
log.error("sync insert into ExperimentBucketUserCountCount error,param={},error={}",expBucketUserCountVo,e);
}
}
}
4、使用
@Autowired
private EventHandler eventHandler;
//异步
eventHandler.eventPost(expBucketUserVo);
网友评论