美文网首页Java 杂谈读书Java进阶之路
Spring 多线程、异步和redis队来解决非等待性方法

Spring 多线程、异步和redis队来解决非等待性方法

作者: elijah777 | 来源:发表于2019-05-27 23:11 被阅读3次

    在处理后台程序时如果执行比较久,而不需要用户等待的话,可以考虑使用多线程,线程异步或者redis队的方法来实现

    Spring通过任务执行器(TaskExecutor)来实现多线程和并发编程。使用TheadPoolTaskExecutor可实现一个基于线程池的TaskExecutor。而实际开发任务一般是非阻碍的,即异步的,所以要开启异步任务的支持(@EnableAsync),并通过实际的执行bean中的方法使用@Async注释来生命其是一个异步任务

    代码示例

    线程数的配置类

    import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
    import org.springframework.context.annotation.ComponentScan;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.scheduling.annotation.AsyncConfigurer;
    import org.springframework.scheduling.annotation.EnableAsync;
    import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
    
    import java.util.concurrent.Executor;
    
    /**
     * @description: 多线程 配置类
     * @author: Shenshuaihu
     * @version: 1.0
     * @data: 2019-05-25 11:41
     */
    @Configuration
    @ComponentScan("com.ch3.taskexecutor")
    @EnableAsync    // 开启异步任务支持
    public class TaskExecutorConfig implements AsyncConfigurer {
        @Override
        public Executor getAsyncExecutor() {
            /**
             *  创建线程池
             *      核心线程数
             *      最大线程数
             *      队列最大长度
             */
            ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
            taskExecutor.setCorePoolSize(5);
            taskExecutor.setMaxPoolSize(10);
            taskExecutor.setQueueCapacity(25);
            taskExecutor.initialize();
            return taskExecutor;
        }
    
        @Override
        public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
            return null;
        }
    }
    

    需要开启的异步方法

    import org.springframework.scheduling.annotation.Async;
    import org.springframework.stereotype.Service;
    
    /**
     * @description: 任务执行类
     * @author: Shenshuaihu
     * @version: 1.0
     * @data: 2019-05-25 10:39
     */
    @Service
    public class AsyncTaskService {
    
        /**
         *  Async 异步方法
         * @param i
         */
        @Async
        public void executeAsyncTask(Integer i) {
            int a = (int)(1+Math.random()*(800-1+1));
    
            try {
                Thread.sleep(a);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("执行异步任务:" + i);
        }
    
        @Async
        public void executeAsyncTaskPlus(Integer i) {
            System.out.println("执行异步任务+1:" + i);
        }
    }
    

    程序入口类

    import org.springframework.context.annotation.AnnotationConfigApplicationContext;
    
    /**
     * @description: 线程调用入口
     * @author: Shenshuaihu
     * @version: 1.0
     * @data: 2019-05-25 13:13
     */
    public class TaskMain {
        public static void main(String[] args) {
            AnnotationConfigApplicationContext context =
                    new AnnotationConfigApplicationContext(TaskExecutorConfig.class);
            AsyncTaskService taskService = context.getBean(AsyncTaskService.class);
            System.out.println(taskService);
            for (int i = 0; i < 100; i++) {
                taskService.executeAsyncTask(i);
                taskService.executeAsyncTaskPlus(i);
            }
            context.close();
        }
    }
    

    简单代码说明

    如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。

    如果此时线程池中的数量等于 corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。

    如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maxPoolSize,建新的线程来处理被添加的任务。

    如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maxPoolSize,那么通过handler所指定的策略来处理此任务。也就是:处理任务的优先级为:核心线程corePoolSize、任务队列workQueue、最大线程 maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。

    当线程池中的线程数量大于corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。

    知识拓展

    如果不想使用线程池的话,用redis队也是不错的选择。redis队先进先出也可以满足,也是需要用线程来开启出发方法

    入队方法,即需要将执行的内容push进来

    public void pushTaskQueue(Long resultDataId,  String[] command){
    
        // 将参数放进redis队列中   resultDataId 与 cmd
    
        String data = String.valueOf(resultDataId) + "&&" + Arrays.toString(command);
    
        redisTemplate.opsForList().leftPush("task-queue",data);
    
    }
    
    

    执行等待时间比较久的任务,出队

    import jodd.util.StringUtil;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.stereotype.Component;
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.text.SimpleDateFormat;
    import java.util.Arrays;
    
    /**
     * @Date: 2018/12/15 17:28
     * @Description: 任务队列消费者   执行Python 更新数据库
     */
    
    @Component
    @Slf4j
    public class TaskConsumer implements Runnable {
    
        @Autowired
        private IResultDataService service;
    
        @Autowired
        private RedisTemplate<String, Object> redisTemplate;
    
        @Override
        public void run() {
    
            try {
                String data = redisTemplate.opsForList().leftPop("task-queue").toString();
                String resultData[] = data.split("&&");
                dataId = resultData[0];
                String cmdData  = resultData[1].replace(",","#").replace("csv\"#","csv\",");
                cmdData  = cmdData.substring(1,cmdData.length()-1).replace(" ","");
                String[] command  = cmdData.split("#");
                log.info(" 队列中数据{} " , data );
               
                // 执行py文件 等待时间比较久,需要异步操作
                    Process proc = Runtime.getRuntime().exec(command);
                    in.close();
                    proc.waitFor();
            }catch (Exception e){
            }
        }
     }
    

    2019/05/27晚于成都

    相关文章

      网友评论

        本文标题:Spring 多线程、异步和redis队来解决非等待性方法

        本文链接:https://www.haomeiwen.com/subject/lglltctx.html