美文网首页
spring boot异步调用批处理

spring boot异步调用批处理

作者: 周六不算加班 | 来源:发表于2018-06-24 16:14 被阅读189次

    在spring boot项目中实现一个批量调用外部接口功能,由于每次调用接口都会比较浪费时间,
    所以打算用异步来实现这个批量的调用的功能。
    具体的思路是:
    1、建立一个全局的队列来存储数据。
    2、在批量调用外部接口的时候,只是往队列中添加相关的数据,添加成功以后就返回。
    3、创建aop,在@After中也就是在队列添加完数据以后,调用异步方法。
    4、创建异步方法,在异步方法中是真正调用外部接口实现。
    在异步方法中注入别的bean方法容易报错,一般是多个线程同时注入同一个bean造成的,
    加上@Lazy注解就能解决了

    功能实现:
    1、建立全局队列
    @Component
    public class QueueUtils {

    private static QueueUtils instance;
    
    static final int FILE_QUEUE_SIZE = 10000;// 阻塞队列大小
    
    private static BlockingQueue<Map<String,Object>> queue = new ArrayBlockingQueue<Map<String,Object>>(FILE_QUEUE_SIZE);
    
    /**
     * 构造方法,private不让外界生成队列工具类
     */
    private QueueUtils(){
    
    }
    
    
    
    /**
     * 添加队列元素
     * @param map
     * @throws InterruptedException
     */
    public static void put(Map<String,Object>  map) throws InterruptedException {
        queue.put(map);
    }
    
    /**
     * 获取队列
     * @return
     */
    public static BlockingQueue<Map<String,Object>> getQueue(){
        return queue;
    }
    

    }
    2、添加队列值
    @Controller
    public class TestController {

    /**
     * aop测试
     */
    @RequestMapping(value = "/aopTest")
    @ResponseBody
    public Object aopTest() throws InterruptedException {
        for (int i = 0; i<11;i++){
            Map<String,Object> map = new HashMap<String, Object>();
            map.put("userId",i);
            map.put("amount",5);
            QueueUtils.put(map);
        }
        String temp =  "aopTest";
        return temp;
    }
    

    }

    3、aop实现
    @Aspect
    @Component
    public class testIntercept {

    @Autowired
    private MyAsyncTask myAsyncTask;
    
    private final static Logger logger = LoggerFactory.getLogger(testIntercept.class);
    
    @Pointcut("execution(public * com.caody.muyi.TestController.aopTest())")
    public void testAop(){};
    
    
    @After("testAop()")
    public void after(){
    
        String aaa = "执行完主体方法以后才会执行的方法";
        myAsyncTask.refreshMyDbAsync();
        logger.info(aaa);
    }
    

    }
    4、异步方法实现
    @Component
    public class MyAsyncTask {

    private Logger logger = LoggerFactory.getLogger(getClass());
    
    @Async
    public void refreshMyDbAsync()  {
        BlockingQueue<Map<String,Object>> queue = QueueUtils.getQueue();
        //队列方式遍历,元素逐个被移除
        while (queue.peek() != null) {
            Map<String,Object> map = queue.poll();
    
            logger.info("userId:"+map.get("userId")+" amount:"+map.get("amount"));
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    

    }
    5、主方法上面开始异步
    @EnableCaching //开启缓存
    @EnableAsync(proxyTargetClass = true)//开启异步处理
    @SpringBootApplication
    public class MuYiApplication {

    public static void main(String[] args) {
        SpringApplication.run(MuYiApplication.class, args);
    }
    

    }

    相关文章

      网友评论

          本文标题:spring boot异步调用批处理

          本文链接:https://www.haomeiwen.com/subject/nafsyftx.html