美文网首页Vue
手把手Java多线程实战(1)

手把手Java多线程实战(1)

作者: 孤山之王 | 来源:发表于2020-12-30 14:38 被阅读0次

    1. 背景

    这些天得空,将为孩子们拍的图片顺带整理下, 发现我 iCloud 自动下载以及 Onedrive 自动备份还有本身随机拷贝的文件散落个人电脑上的每个文件夹下都有录,也怪我平时手太懒这下好啦,面对这些无序的图片,心中泛起一阵凉凉,让人头大。

    可能是由于我图片管理软件不熟悉,只能自己想办法,谁让我们是 Coding ,我第一时间想到就是利用 JAVA 遍历,主要是其他软件我用的没有 JAVA 熟悉,在脑海中整理下思路,等到代码写完发现执行效率太慢,不敢想象,怎么这么慢,这是我写的代码吗?严重鄙视它,好好静下心逐行分析发现好些地方完全可以拆分出来异步执行,充分利用当前操作系统多核的优势,逐步缓解效率慢的问题。

    本章节知识点主要有:

    • 线程池 ThreadPoolExecutor
    • 阻塞队列 LinkedBlockingQueue
    • 非阻塞 CompletableFuture

    2. 实现思路

    在指定文件夹下,循环递归文件夹以及子文件夹,遍历文件夹下每个文件,将文件名、大小、文件路径、文件后缀名、文件校验 MD5 等记录关系型数据库中,后续只需要靠这个 MD5 判断是否是同一个文件即可!

    3. 功能实现

    一切准备妥当,我准备在开发环境弄, JDKIDEAMySQLGradle 均不在本次说明之内。

    3.1. 数据表设计

    表的结构很简单,数据库主键生成策略我就用 自增长 模式,没必要太复杂。

    
    CREATE TABLE `tb_file_info` (
      `id` bigint(20) NOT NULL AUTO_INCREMENT,
      `file_name` varchar(300) DEFAULT NULL,
      `file_path` varchar(300) DEFAULT NULL,
      `file_size` bigint(20) DEFAULT NULL,
      `suffix_name` varchar(30) DEFAULT NULL,
      `md5` varchar(40) DEFAULT NULL,
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=115 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
    
    

    3.2. 实体类三部曲

    持久层我选择 Mybatis 来集成,也没太多必要弄分页,需求分析下来主要瓶颈在遍历读取文件再数据库上。

    作为 JAVA 面向对象的基础,此处用到的 io.github.rothschil ,那是我基础的封装,中央仓库/阿里云上,都可以下载到,再有就是这里批量写入的方式,其他内容真的没什么可以说的。

    3.2.1. 实体类

    这里需要注意下,这里的 BasePo 是我基类。

    
    @EqualsAndHashCode(callSuper=false)
    @Builder(toBuilder=true)
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public class FileInfo extends BasePo<Long> {
        private Long id;
        private String fileName;
        private String filePath;
        private Long fileSize;
        private String suffixName;
        private String md5;
    }
    
    

    3.2.2. Mapper

    这里需要注意下,这里的 BaseMapper 是我基类。

    
    
    public interface FileInfoMapper extends BaseMapper<FileInfo,Long> {
    
        void batchInsert(List<FileInfo> lists);
    
    }
    
    

    3.2.3. XML文件

    
    <?xml version="1.0" encoding="UTF-8" ?>
    <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
    <mapper namespace="io.github.rothschil.domain.mapper.FileInfoMapper" >
      <resultMap id="BaseResultMap" type="io.github.rothschil.domain.entity.FileInfo" >
        <id column="id" property="id" jdbcType="BIGINT" />
        <result column="file_name" property="fileName" jdbcType="VARCHAR" />
        <result column="file_path" property="filePath" jdbcType="VARCHAR" />
        <result column="file_size" property="fileSize" jdbcType="BIGINT" />
        <result column="suffix_name" property="suffixName" jdbcType="VARCHAR" />
        <result column="md5" property="md5" jdbcType="VARCHAR" />
      </resultMap>
      <sql id="Base_Column_List" >
        id, file_name, file_path, file_size, suffix_name, md5
      </sql>
    
      <insert id="batchInsert" parameterType="java.util.List">
        insert into tb_file_info (id, file_name, file_path,
        file_size, suffix_name, md5
        ) values
        <foreach collection="list" item="item" index="index" separator=",">
          (#{item.id,jdbcType=BIGINT}, #{item.fileName,jdbcType=VARCHAR}, #{item.filePath,jdbcType=VARCHAR},
          #{item.fileSize,jdbcType=BIGINT}, #{item.suffixName,jdbcType=VARCHAR}, #{item.md5,jdbcType=VARCHAR}
          )
        </foreach>
      </insert>
    
      <select id="selectByPrimaryKey" resultMap="BaseResultMap" parameterType="java.lang.Long" >
        select 
        <include refid="Base_Column_List" />
        from tb_file_info
        where id = #{id,jdbcType=BIGINT}
      </select>
      <delete id="deleteByPrimaryKey" parameterType="java.lang.Long" >
        delete from tb_file_info
        where id = #{id,jdbcType=BIGINT}
      </delete>
      <insert id="insert" parameterType="io.github.rothschil.domain.entity.FileInfo" >
        insert into tb_file_info (id, file_name, file_path, 
          file_size, suffix_name, md5
          )
        values (#{id,jdbcType=BIGINT}, #{fileName,jdbcType=VARCHAR}, #{filePath,jdbcType=VARCHAR}, 
          #{fileSize,jdbcType=BIGINT}, #{suffixName,jdbcType=VARCHAR}, #{md5,jdbcType=VARCHAR}
          )
      </insert>
      <insert id="insertSelective" parameterType="io.github.rothschil.domain.entity.FileInfo" >
        insert into tb_file_info
        <trim prefix="(" suffix=")" suffixOverrides="," >
          <if test="id != null" >
            id,
          </if>
          <if test="fileName != null" >
            file_name,
          </if>
          <if test="filePath != null" >
            file_path,
          </if>
          <if test="fileSize != null" >
            file_size,
          </if>
          <if test="suffixName != null" >
            suffix_name,
          </if>
          <if test="md5 != null" >
            md5,
          </if>
        </trim>
        <trim prefix="values (" suffix=")" suffixOverrides="," >
          <if test="id != null" >
            #{id,jdbcType=BIGINT},
          </if>
          <if test="fileName != null" >
            #{fileName,jdbcType=VARCHAR},
          </if>
          <if test="filePath != null" >
            #{filePath,jdbcType=VARCHAR},
          </if>
          <if test="fileSize != null" >
            #{fileSize,jdbcType=BIGINT},
          </if>
          <if test="suffixName != null" >
            #{suffixName,jdbcType=VARCHAR},
          </if>
          <if test="md5 != null" >
            #{md5,jdbcType=VARCHAR},
          </if>
        </trim>
      </insert>
      <update id="updateByPrimaryKeySelective" parameterType="io.github.rothschil.domain.entity.FileInfo" >
        update tb_file_info
        <set >
          <if test="fileName != null" >
            file_name = #{fileName,jdbcType=VARCHAR},
          </if>
          <if test="filePath != null" >
            file_path = #{filePath,jdbcType=VARCHAR},
          </if>
          <if test="fileSize != null" >
            file_size = #{fileSize,jdbcType=BIGINT},
          </if>
          <if test="suffixName != null" >
            suffix_name = #{suffixName,jdbcType=VARCHAR},
          </if>
          <if test="md5 != null" >
            md5 = #{md5,jdbcType=VARCHAR},
          </if>
        </set>
        where id = #{id,jdbcType=BIGINT}
      </update>
      <update id="updateByPrimaryKey" parameterType="io.github.rothschil.domain.entity.FileInfo" >
        update tb_file_info
        set file_name = #{fileName,jdbcType=VARCHAR},
          file_path = #{filePath,jdbcType=VARCHAR},
          file_size = #{fileSize,jdbcType=BIGINT},
          suffix_name = #{suffixName,jdbcType=VARCHAR},
          md5 = #{md5,jdbcType=VARCHAR}
        where id = #{id,jdbcType=BIGINT}
      </update>
    </mapper>
    
    

    3.2.4. Service

    这里需要注意下,这里的 BaseService 是我基类。

    
    @Slf4j
    @Service(value="fileInfoService")
    @Transactional(readOnly = true)
    public class FileInfoService extends BaseService<FileInfo, Long> {
    
        @Autowired
        private FileInfoMapper fileInfoMapper;
    
        @Override
        protected BaseMapper<FileInfo, Long> getMapper() {
            return fileInfoMapper;
        }
    
        @Transactional(readOnly = false)
        public void insert(List<FileInfo> lists){
            fileInfoMapper.batchInsert(lists);
        }
    
    }
    
    

    3.3. 核心类

    核心处理类,这是一个 bean,结合 Junit做测试,所以标明为 @Component

    • 判断是否是文件夹

    • 判断文件后缀名字是图片格式

    • 批量写入数据库中

    20201230140531
    
    
    @Component
    @Slf4j
    public class RunFileTask {
      
    
     @Autowired
     public FileInfoService fileInfoService;  
    
     public void run(String path) {
     File file = new File(path);
     if (!file.isDirectory()) {
     return;
     }
    
     listFiles(file);
    
     }
    
     public void listFiles(File file) {
    
     File[] files = file.listFiles();
    
     List<FileInfo> lists = new ArrayList<FileInfo>();
    
     for (File fl : files) {
    
     // 1、文件夹就递归
    
     if (fl.isDirectory()) {
    
     listFiles(fl);
    
     continue;
    
     }
    
     String suffixName = FileUtil.getSuffix(fl);
    
     // 2、只要文件后缀名字是图片的
    
     if (!ImageConst.LIST_SUFFIX.contains(suffixName.toUpperCase())) {
    
     continue;
    
     }
    
     String fileName = FileUtil.getName(fl);
    
     float size = fl.length();
    
     String filePath = FileUtil.getAbsolutePath(fl);
    
     FileInfo imageInfo = FileInfo.builder().fileName(fileName).filePath(filePath).fileSize(size).suffixName(suffixName)
    
     .md5(Md5Utils.getMd5(fl)).build();
    
     lists.add(imageInfo);
    
     }
    
     // 3、批量写入数据库中
    
     if (!lists.isEmpty()) {
    
     fileInfoService.insert(lists);
     }
     }
      
    
    

    4. 问题

    最终执行起来,好家伙,差不多 4000 多文件,发现执行耗时超过 15 分钟,时间有点长,一时间我还很难以接受,但是总能把事情解决,具体解决的过程完美与否暂时没考虑。

    20201230140612

    .......

    .......

    这一夜,注定是漫长的,让人无法忍受,一点也不完美。

    怎么能执行这么久,不科学,能不能想个办法优化执行时间,一定有。

    5. 问题分析

    文件信息在写库、以及获取 MD5 消耗资源过多,所以较其他逻辑要耗时些,文件单次处理大概只要在 100 毫秒内,但是写库以及生成文件校验值差不多又要 120 毫秒,两个业务步骤加起来整体耗时差不多就要 220 多毫秒。

    20201230140426

    6. 优化思路

    根据直觉将任务的实现改为异步并行处理,主要体现在以下两个地方:

    • 将最后批量写库,这个地方改为异步队列方式

    • 针对文件生成 MD5 这个地方本来比较耗费资源,能否也改为异步

    6.1. 异步批量写库

    弄个队列,让它处理需要批量入库的集合,并且这个队列要是阻塞队列。

    6.1.1. 定义实际操作接口

    
    public interface IntfFileInfoHandler {
        void processData();
    }
    
    

    FileInfoHandler 实现 IntfFileInfoHandler 接口,并且有一个成员变量,用于接收需要批量入库的集合。

    
    @Data
    @Slf4j
    @Component("fileInfoHandler")
    public class FileInfoHandler implements IntfFileInfoHandler {
    
        private List<FileInfo> lists;
    
        @Autowired
        private FileInfoService fileInfoService;
    
        public void processData(){
            fileInfoService.insert(lists);
        }
    }
    
    

    6.1.2. 定义队列

    
    @Slf4j
    @Component
    public class FileInfoQueue {
    
        private final LinkedBlockingQueue<IntfFileInfoHandler> queue = new LinkedBlockingQueue<>(500);
    
        /**
         * 线程池
         */
        private ExecutorService service = new ThreadPoolExecutor(1, 1,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
        /**
         * 检查服务是否运行
         */
        private volatile boolean running = true;
    
        /**
         * 线程状态
         */
        private Future<?> threadStatus = null;
    
        @PostConstruct
        public void init(){
            threadStatus = service.submit(
                    () -> {
                        while(running){
                            try {
                                // 队列中不存在元素 则不处理
                                if(!queue.isEmpty()){
                                    IntfFileInfoHandler taskHandler = queue.take();
                                    taskHandler.processData();
                                }
                            } catch (InterruptedException e) {
                                log.error("服务停止,退出", e);
                                running = false;
                            }
                        }
                    });
        }
    
        @PreDestroy
        public void destory() {
            running = false;
            service.shutdownNow();
        }
    
        public void activeService() {
            running = true;
            if (service.isShutdown()) {
                service = Executors.newSingleThreadExecutor();
                init();
                log.info("线程池关闭,重新初始化线程池及任务");
            }
            if (threadStatus.isDone()) {
                init();
                log.info("线程池任务结束,重新初始化任务");
            }
        }
    
        public boolean addQueue(IntfFileInfoHandler taskHandler){
            if(!running){
                log.warn("service is stop");
                return false;
            }
            boolean isFull = queue.offer(taskHandler);
            if(!isFull){
                log.warn("添加任务到队列失败");
            }
            return isFull;
        }
        public boolean empty(){
            return queue.isEmpty();
        }
    
    }
    
    

    6.1.3. 核心方法改造

    
    public static final String THREAD_NAME ="RUN_FILE_NAME";
    
    @Autowired
    public FileInfoService fileInfoService;
    
    @Autowired
    private FileInfoQueue fileInfoQueue;
    
    @Autowired
    private FileInfoHandler fileInfoHandler;
    
    public void run(String path){
        File file = new File(path);
        if(!file.isDirectory()){
            return;
        }
        listFiles(file);
    }
    
    public void listFiles(File file){
        File[] files = file.listFiles();
        List<FileInfo> lists = new ArrayList<FileInfo>();
        for (File fl : files) {
            if(fl.isDirectory()){
                listFiles(fl);
                continue;
            }
            String suffixName = FileUtil.getSuffix(fl);
            if(!ImageConst.LIST_SUFFIX.contains(suffixName.toUpperCase())){
                continue;
            }
            long size = fl.length();
            String filePath = FileUtil.getAbsolutePath(fl);
            try {
                FileInfo fileInfo = FileInfo.builder().fileName(fileName).filePath(filePath).fileSize(size).suffixName(suffixName)
                        .md5(DigestUtils.md5Hex(new FileInputStream(fl))).build();
                lists.add(fileInfo);
            } catch (IOException e){
                e.printStackTrace();
            }
        }
        if(!lists.isEmpty()){
            fileInfoHandler.setLists(lists);
            fileInfoQueue.addQueue(fileInfoHandler);
        }
    }
    
    

    通过这次改造,执行耗时差不多在 580 秒,比之前的 15 分钟 减少了一大截,终于露出一丝微笑。

    6.2. 异步获取MD5

    思路通过一个有返回值的线程处理 MD5,我想到了 Callable

    FileSizeThread 实现 Callable 接口,并且通过构造函数来指明需要处理的文件。

    6.2.1. 异步线程定义

    
    public class FileSizeThread implements Callable<String> {
    
        private File file;
    
        public FileSizeThread(){}
    
        public FileSizeThread(File file){
            this.file = file;
        }
    
        @Override
        public String call() {
            try {
                return DigestUtils.md5Hex(new FileInputStream(file));
            } catch (IOException e) {
                return StringUtils.EMPTY;
            }
        }
    }  
    
    

    6.2.2. 核心方法改造

    
        public static final String THREAD_NAME ="RUN_FILE_NAME";
    
        private ThreadPoolExecutor executor = ThreadPoolsUtil.doCreate(3,5,THREAD_NAME);
    
        public void run(String path){
            File file = new File(path);
            if(!file.isDirectory()){
                return;
            }
            listFiles(file);
            //关闭线程池
            executor.shutdown();
        }
    
        public void listFiles(File file){
            File[] files = file.listFiles();
            List<FileInfo> lists = new ArrayList<>();
            if(files.length==0){
                return;
            }
            for (File fl : files) {
                if(fl.isDirectory()){
                    listFiles(fl);
                    continue;
                }
                String suffixName = FileUtil.getSuffix(fl);
                if(!ImageConst.LIST_SUFFIX.contains(suffixName.toUpperCase())){
                    continue;
                }
                Future<String> result = executor.submit(new FileSizeThread(fl));
                String fileName = FileUtil.getName(fl);
                long size = fl.length();
                String filePath = FileUtil.getAbsolutePath(fl);
                try {
                    FileInfo fileInfo = FileInfo.builder().fileName(fileName).filePath(filePath).fileSize(size).suffixName(suffixName)
                            .md5(result.get()).build();
                    lists.add(fileInfo);
                } catch (InterruptedException | ExecutionException e){
                    e.printStackTrace();
                }
            }
            if(!lists.isEmpty()){
                fileInfoHandler.setLists(lists);
                fileInfoQueue.addQueue(fileInfoHandler);
            }
        }
    
    

    通过这次改造,执行耗时差不多在 314 秒,比之前的 580 秒又减少了,这样的结果是在线程池开启一个线程处理结果,将核心线程数调制到 3,最大线程数调整到 5 ,我这里执行结果是 294 秒,线程池的设置根据实际物理机器还有机器的 CPU 负荷来设置。

    7. 小结

    我这里将所有处理耗时统计了下做成一张表格,方便大家对比。

    序号 实现 耗时(秒)
    1 普通方法基本实现,无任何异步操作,全是阻塞模式 1758
    2 将写库的操作改为以队列方式,进行异步批量处理 508
    3 在 2 上将获取 MD5 改为异步,核心线程池设置为 1 314
    4 在 3 上将核心线程数设置为 3,最大线程数设置为 5 294

    可以看到简单的几步可以将性能提升的好几倍,这就是多线程带来的收获。

    8. 源码地址,如果觉得对你有帮助,请Star

    觉得对你有帮助,请Star

    Github源码地址

    Gitee源码地址

    相关文章

      网友评论

        本文标题:手把手Java多线程实战(1)

        本文链接:https://www.haomeiwen.com/subject/gujhoktx.html