美文网首页
Guava AsynEventBus使用

Guava AsynEventBus使用

作者: 好好先生90 | 来源:发表于2019-11-02 18:37 被阅读0次

    Guava 基于内存的
    1, 首先使用Spring Initializr (https://github.com/spring-io/initializr/)创建一个新项目,加入Guava依赖

            <dependency>
                <groupId>com.google.guava</groupId>
                <artifactId>guava</artifactId>
                <version>28.1-jre</version>
            </dependency>
    

    2, 配置event-bus
    application.yml配置线程池参数

    task:
      pool:
        core-pool-size: 2
        max-pool-size: 12
        keep-alive-seconds: 60
        queue-capacity: 10000
    

    配置AsyncEventBus

    import com.google.common.eventbus.AsyncEventBus;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
    
    import java.util.concurrent.ThreadPoolExecutor;
    
    @Configuration
    public class AsynEventBusConfig {
    
        @Value(value = "${task.pool.core-pool-size:2}")
        private Integer corePoolSize;
    
        @Value(value = "${task.pool.max-pool-size:8}")
        private Integer maxPoolSize;
    
        @Value(value = "${task.pool.keep-alive-seconds:60}")
        private Integer keepAliveSeconds;
    
        @Value(value = "${task.pool.queue-capacity:10000}")
        private Integer queueCapacity;
    
        @Bean
        public AsyncEventBus asyncEventBus() {
            ThreadPoolTaskExecutor executor = executor();
            return new AsyncEventBus(executor);
        }
    
        @Bean
        public ThreadPoolTaskExecutor executor() {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            executor.setCorePoolSize(corePoolSize);//当前线程数
            executor.setMaxPoolSize(maxPoolSize);// 最大线程数
            executor.setQueueCapacity(queueCapacity);//这个值的设定应该稍微大于请求高峰值
            executor.setKeepAliveSeconds(keepAliveSeconds);
            executor.setWaitForTasksToCompleteOnShutdown(true);//等待任务在关机时完成--表明等待所有线程执行完
            executor.setAwaitTerminationSeconds(60 * 15);// 等待时间 (默认为0,此时立即停止),并没等待xx秒后强制停止
            executor.setThreadNamePrefix("asyncThread-");//  线程名称前缀
            executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
            executor.initialize(); // 初始化
            return executor;
        }
    }
    

    3, 随便创建一个事件对象和Service类

    import java.io.Serializable;
    import java.time.LocalDateTime;
    
    /**
     * author: zhouliliang
     * Date: 2019/11/2 18:01
     * Description:
     */
    public class User implements Serializable {
        private static final long serialVersionUID = 2569882961592695891L;
    
        private Long id;
        private String name;
        private String password;
        private LocalDateTime createTime;
    
        public Long getId() {
            return id;
        }
    
        public void setId(Long id) {
            this.id = id;
        }
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    
        public String getPassword() {
            return password;
        }
    
        public void setPassword(String password) {
            this.password = password;
        }
    
        public LocalDateTime getCreateTime() {
            return createTime;
        }
    
        public void setCreateTime(LocalDateTime createTime) {
            this.createTime = createTime;
        }
    
        @Override
        public String toString() {
            return "User{" +
                    "id=" + id +
                    ", name='" + name + '\'' +
                    ", password='" + password + '\'' +
                    ", createTime=" + createTime +
                    '}';
        }
    }
    

    Service类

    import ai.guiji.eventbus.model.User;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.stereotype.Service;
    
    import java.time.LocalDateTime;
    
    /**
     * author: zhouliliang
     * Date: 2019/11/2 18:09
     * Description:
     */
    @Service("userService")
    public class UserService {
        private static final Logger logger = LoggerFactory.getLogger(UserService.class);
    
        public void process(User user) {
            user.setCreateTime(LocalDateTime.now());
            logger.info("user:{}", user);
        }
    }
    

    4, 事件监听类

    import ai.guiji.eventbus.model.User;
    import ai.guiji.eventbus.service.BorrowService;
    import ai.guiji.eventbus.service.UserService;
    import com.google.common.eventbus.AsyncEventBus;
    import com.google.common.eventbus.Subscribe;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.ApplicationContext;
    import org.springframework.stereotype.Service;
    
    import javax.annotation.PostConstruct;
    
    @Service("eventListener")
    public class EventListener {
        private static final Logger logger = LoggerFactory.getLogger(EventListener.class);
    
        @Autowired
        private ApplicationContext applicationContext;
    
        @Autowired
        private AsyncEventBus asyncEventBus;
    
        @PostConstruct // 注册该类
        public void register() {
            asyncEventBus.register(this);
        }
    
        @Subscribe
        public void userListener(User user) {
            try {
                UserService userService = (UserService) applicationContext.getBean("userService");
                userService.process(user);
            } catch (Exception e) {
                logger.error("user fail", e);
            }
        }
    
        @Subscribe
        public void borrowListener(Borrow borrow) {
            try {
                BorrowService borrowService = (BorrowService) applicationContext.getBean("borrowService");
                borrowService.process(borrow);
            } catch (Exception e) {
                logger.error("borrow fail", e);
            }
        }
    }
    

    5, 创建测试controller

    import ai.guiji.eventbus.model.Borrow;
    import ai.guiji.eventbus.model.User;
    import com.google.common.base.CharMatcher;
    import com.google.common.base.Splitter;
    import com.google.common.eventbus.AsyncEventBus;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.*;
    
    import java.util.List;
    
    @RestController
    @RequestMapping("event")
    public class EventController {
    
        @Autowired
        private AsyncEventBus asyncEventBus;
    
        @PostMapping(path = "/user")
        public void testEventBus(@RequestBody User user) {
            asyncEventBus.post(user);
        }
    
        @GetMapping(path = "/borrow")
        public void testEventBus2() {
            List<String> amounts = Splitter.on(',')
                    .trimResults()
                    .omitEmptyStrings()
                    .splitToList(CharMatcher.anyOf("[]\"").removeFrom("[500,1000,1500]"));
            amounts.parallelStream().forEach(n -> asyncEventBus.post(new Borrow(Double.valueOf(n))));
        }
    }
    

    相关文章

      网友评论

          本文标题:Guava AsynEventBus使用

          本文链接:https://www.haomeiwen.com/subject/jtrhbctx.html