美文网首页
Springboot 集成EventBus

Springboot 集成EventBus

作者: 雪飘千里 | 来源:发表于2019-11-22 11:56 被阅读0次

    Guava的事件总线EventBus库是事件发布订阅模式的实现,让我们能在领域驱动设计(DDD)中以事件的弱引用本质对我们的模块和领域边界很好的解耦设计。下面是常用的异步事件的例子:

    1、先初始化一个AsyncEventBus

    import com.google.common.eventbus.AsyncEventBus;
    import com.google.common.eventbus.EventBus;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import java.util.concurrent.Executors;
    
    @Configuration
    public class EventBusConfig {
    
    //  @Bean
    //  public EventBus eventBus(){
    //      return new EventBus();
    //  }
    
        @Bean
        public AsyncEventBus asyncEventBus(){
            return new AsyncEventBus(Executors.newFixedThreadPool(10));
        }
    
    }
    
    

    注:AsyncEventBus是异步的,EventBus不是

    2、事件发布

    import com.google.common.eventbus.AsyncEventBus;
    import com.google.common.eventbus.EventBus;
    import com.xxx.vo.ExpBucketUserVo;
    import com.xxx.vo.ExperimentBucketUserCountVo;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    import javax.annotation.PostConstruct;
    import javax.annotation.PreDestroy;
    
    @Slf4j
    @Service
    public class EventHandler {
    
    //  @Autowired
    //  private EventBus eventBus;
    
        @Autowired
        private AsyncEventBus asyncEventBus;
    
        @Autowired
        private EventListener eventListener;
    
    //  @PostConstruct
    //  public void init() {
    //      eventBus.register(eventListener);
    //  }
    //
    //  @PreDestroy
    //  public void destroy() {
    //      eventBus.unregister(eventListener);
    //  }
    
        @PostConstruct
        public void init() {
            asyncEventBus.register(eventListener);
        }
    
        @PreDestroy
        public void destroy() {
            asyncEventBus.unregister(eventListener);
        }
    
        public void eventPost(ExpBucketUserVo expBucketUserVo){
    //      eventBus.post(expBucketUserVo);
            asyncEventBus.post(expBucketUserVo);
            log.info("eventBus post event {}",expBucketUserVo);
        }
    
        public void eventPost(ExperimentBucketUserCountVo experimentBucketUserCountVo){
    //      eventBus.post(experimentBucketUserCountVo);
            asyncEventBus.post(experimentBucketUserCountVo);
            log.info("eventBus post event {}",experimentBucketUserCountVo);
        }
    
    }
    
    

    3、事件订阅

    import com.google.common.eventbus.Subscribe;
    import com.xxx.persistence.dao.AbExpBucketUserCountMapper;
    import com.xxx.persistence.dao.AbExpBucketUserMapper;
    import com.xxx.persistence.entity.ExperimentBucketUser;
    import com.xxx.persistence.entity.ExperimentBucketUserCount;
    import com.xxx.service.convert.ExpBucketUserCountResponseConverter;
    import com.xxx.service.convert.ExpBucketUserResponseConverter;
    import com.xxx.vo.ExpBucketUserVo;
    import com.xxx.vo.ExperimentBucketUserCountVo;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    
    @Slf4j
    @Service
    public class EventListener {
        @Autowired
        private AbExpBucketUserMapper abExpBucketUserMapper;
    
        @Autowired
        private AbExpBucketUserCountMapper abExpBucketUserCountMapper;
    
        @Subscribe
        public void onMessageEvent(ExpBucketUserVo expBucketUserVo) {
            try {
                //CommonResponse<Integer> response = expBucketUserService.insert(expBucketUserVo);
                ExperimentBucketUser experimentBucketUser = ExpBucketUserResponseConverter.INSTANCE.convertFrom(expBucketUserVo);
                Integer i = abExpBucketUserMapper.insert(experimentBucketUser);
                log.info("sync insert into ExperimentBucketUser,param={},result={}",expBucketUserVo,i);
            }catch (Exception e){
                log.error("sync insert into ExperimentBucketUser error,param={},error={}",expBucketUserVo,e);
            }
        }
    
    
    
        @Subscribe
        public void onMessageEvent(ExperimentBucketUserCountVo expBucketUserCountVo) {
            try {
                //CommonResponse<Integer> response = expBucketUserCountService.incrementCountBy(experimentBucketUserCountVo);
                Integer i=  abExpBucketUserCountMapper.incrementCountBy(expBucketUserCountVo.getExperimentId(),expBucketUserCountVo.getBucketId());
                if(i<1){
                    //说明没有初始化
                    ExperimentBucketUserCount expBucketUserCount = ExpBucketUserCountResponseConverter.INSTANCE
                            .convertFrom(expBucketUserCountVo);
                    i= abExpBucketUserCountMapper.insert(expBucketUserCount);
                }
    
                log.info("sync insert into ExperimentBucketUserCount,param={},result={}",expBucketUserCountVo,i);
            }catch (Exception e){
                log.error("sync insert into ExperimentBucketUserCountCount error,param={},error={}",expBucketUserCountVo,e);
            }
        }
    }
    
    

    4、使用

    
        @Autowired
        private EventHandler eventHandler;
    
         //异步
         eventHandler.eventPost(expBucketUserVo);
    
    

    相关文章

      网友评论

          本文标题:Springboot 集成EventBus

          本文链接:https://www.haomeiwen.com/subject/azmfwctx.html