美文网首页
使用SXSSFWorkbook并发导出大批量数据Excel

使用SXSSFWorkbook并发导出大批量数据Excel

作者: 脱缰的小马 | 来源:发表于2020-08-05 19:17 被阅读0次

主体逻辑实现类:

import org.apache.poi.ss.usermodel.Sheet;
import org.apache.poi.xssf.streaming.SXSSFWorkbook;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author 
 * 高性能并发,低内存导出大批量数据Excel工具类
 * 解决大量数据导出可能引起的OOM和速度慢问题
 * 若数据量少,可以不用本封装类
 * @param <T>
 */
public class TransferDataForExcel<T>{

    private final static Logger LOGGER = LoggerFactory.getLogger(TransferDataForExcel.class);

    /** 查询导出数据服务 **/
    private QueryService<T> queryService;

    /** 导出文件名称 **/
    private String fileName;

    /** 导出数据过程中,最大缓存行数 **/
    public final static Integer ROW_ACCESS_WINDOW_SIZE = 5000;

    /** 每个sheet页最多存放行数,最大65536条 **/
    public final static Integer MAX_ROW_SHEET = 10000;

    /**
     * 设置缓存队列容量
     * 因此,最大载入内存行数 = QUEUE_CAPACITY * MAX_ROW_SHEET = 10 * 10000 = 100000
     **/
    public final static Integer QUEUE_CAPACITY = 10;

    /** 设置线程池核心线程数 **/
    public final static Integer THREAD_NUM = 10;

    public TransferDataForExcel(QueryService<T> queryService,String fileName){
        this.queryService = queryService;
        this.fileName = fileName;
    }


    public void doExport() throws Exception {
        Integer totalNum = queryService.getTotalNum();
        LOGGER.info(String.format("当前导出Excel总行数=%s",totalNum));

        //引入SXSSFWorkbook,利用其高效的Excel数据处理特性
        SXSSFWorkbook sxssfWorkbook = new SXSSFWorkbook(ROW_ACCESS_WINDOW_SIZE);
        Integer sheetNum = (totalNum / TransferDataForExcel.MAX_ROW_SHEET) + 1;
        LOGGER.info(String.format("当前导出Excel总Sheet页数=%s",sheetNum));

        List<Sheet> sheetList = new ArrayList<>();
        for(int i=0;i<sheetNum;i++){
            Sheet sheet = sxssfWorkbook.createSheet();
            sheetList.add(sheet);
        }

        //把任务分片后交给线程池执行,每个分页查询的结果保存进一个sheet页
        ArrayBlockingQueue<List<T>> queue = new ArrayBlockingQueue<>(QUEUE_CAPACITY);
        ExecutorService producerExecutorService = Executors.newFixedThreadPool(THREAD_NUM);
        for (int i = 1; i <= sheetNum; i++){
            Producer task = new Producer(queryService,i,queue);
            producerExecutorService.submit(task);
        }
        producerExecutorService.shutdown();

        //消费者开始消费阻塞队列中数据
        ExecutorService consumerExecutorService = Executors.newFixedThreadPool(THREAD_NUM);
        CountDownLatch countDownLatch = new CountDownLatch(sheetList.size());
        AtomicInteger count = new AtomicInteger(0);
        for(Sheet sheet : sheetList) {
            Consumer consumer = new Consumer(queryService,queue,countDownLatch,sheet,totalNum,count);
            consumerExecutorService.submit(consumer);
        }
        countDownLatch.await();
        consumerExecutorService.shutdown();
        LOGGER.info("消费者处理完成,线程池关闭");

        Http.Response response = Http.Response.current();
        response.setHeader("Content-Disposition", "attachment;filename=" + URLEncoder.encode(fileName, "UTF-8"));
        response.setHeader("Content-Type", "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet");
        try{
            sxssfWorkbook.write(response.out);
        }catch (Exception e){
            LOGGER.error(String.format("sxssfWorkbook写入输出流错误,原因=%s",e.getMessage()),e);
        }
    }

}

抽象接口,使用时只要实现这个接口的方法,new出TransferDataForExcel实例,把实现类当参数传入即可。

import org.apache.poi.ss.usermodel.Row;
import org.apache.poi.xssf.streaming.SXSSFSheet;
import org.apache.poi.xssf.streaming.SXSSFWorkbook;

import java.util.List;

public interface QueryService<T> {

    /**
     * 获取导出总数
     * @return
     */
    public Integer getTotalNum();

    /**
     * 分页查询导出的数据
     * @param page 起始页
     * @param pageSize 每页数量
     * @return
     */
    public List<T> queryData(Integer page,Integer pageSize);

    /**
     * 自定义构建导出结果
     * @param row 第一行,表头
     * @throws Exception
     */
    public void setHeaderForExcel(Row row);

    /**
     * 自定义构建导出结果
     * @param row 当前遍历到的正在操作的行对象
     * @param statisticsDTO 当前行对应的DTO对象
     */
    public void setCellForExcel(Row row, T statisticsDTO);
}

导出Excel的数据源生产者:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;

public class Producer<T> implements Callable {

    private final static Logger LOGGER = LoggerFactory.getLogger(Producer.class);

    private QueryService queryService;
    private Integer page;
    private ArrayBlockingQueue<List<T>> queue;

    /**
     * 初始化任务类
     * @param page
     */
    public Producer(QueryService queryService, Integer page, ArrayBlockingQueue<List<T>> queue) {
        //对数据进行分页查询
        this.queryService = queryService;
        this.page = (page - 1) * TransferDataForExcel.MAX_ROW_SHEET;
        this.queue = queue;
    }

    @Override
    public Boolean call(){
        try {
            while(true){
                if(queue.remainingCapacity() > 0){
                    //最多同时查询页数控制等于队列容量,避免同时查询过多结果载入内存引发OOM
                    List<T> orderList = queryService.queryData(page,TransferDataForExcel.MAX_ROW_SHEET);
                    queue.put(orderList);
                    break;
                }else{
                    Thread.sleep(300);
                    LOGGER.info(String.format("导出excel队列空间=%s,生产者put队列已满,阻塞等待中",queue.size()));
                    continue;
                }
            }
            return true;
        } catch (InterruptedException e) {
            LOGGER.error(String.format("导出excel生产者入队错误,原因=%s",e.getMessage()));
            return false;
        }
    }
}

消费数据导入Excel的消费者:

import org.apache.poi.ss.usermodel.Row;
import org.apache.poi.ss.usermodel.Sheet;
import org.apache.poi.xssf.streaming.SXSSFSheet;
import org.apache.poi.xssf.streaming.SXSSFWorkbook;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 导出excel队列消费者
 * @param <T>
 */
public class Consumer<T> implements Callable{

    private final static Logger LOGGER = LoggerFactory.getLogger(Consumer.class);

    private QueryService<T> queryService;
    private ArrayBlockingQueue<List<T>> queue;
    private CountDownLatch countDownLatch;
    private AtomicInteger count; //已导入行数
    private Integer total;
    private Sheet sheet;

    public Consumer(QueryService<T> queryService, ArrayBlockingQueue<List<T>> queue,
                    CountDownLatch countDownLatch,Sheet sheet, Integer total, AtomicInteger count) {
       this.queryService = queryService;
       this.queue = queue;
       this.countDownLatch = countDownLatch;
       this.total = total;
       this.count = count;
       this.sheet = sheet;
    }

    @Override
    public Boolean call() {
        try {
            if(queue.remainingCapacity() == queue.size()){
                LOGGER.info(String.format("队列空间=%s,消费者take队列为空,阻塞等待中",queue.size()));
            }
            List<T> orderList = queue.take();
            //根据总条数确定消费者本次写入数据时的sheet和开始行数
            int currentCount = count.intValue();
            count.addAndGet(orderList.size());

            writeExcel(orderList,sheet);
            LOGGER.info(String.format("需写入excel总行数=%s,已写入行数=%s,当前操作sheet页=%s",
                    total,currentCount,sheet.getSheetName()));
            countDownLatch.countDown();
            LOGGER.info(String.format("消费者已处理任务数=%s",countDownLatch.getCount()));
            return true;
        } catch (Exception e) {
            countDownLatch.countDown();
            LOGGER.error(String.format("导出excel消费者处理错误=%s",e.getMessage()),e);
            LOGGER.info(String.format("消费者已处理任务数=%s",countDownLatch.getCount()));
            return false;
        }
    }


    private void writeExcel(List<T> orderList,Sheet sheet){
        if (CollectionUtils.isEmpty(orderList)){
            return;
        }
        Row headRow = sheet.createRow(0);
        queryService.setHeaderForExcel(headRow);
        for (int i = 0; i < orderList.size(); i++) {
            int beginRowNum = i+1;
            Row bodyRow = sheet.createRow(beginRowNum);
            T statisticsDTO = orderList.get(i);
            queryService.setCellForExcel(bodyRow,statisticsDTO);
        }
        orderList.clear();
    }
}

相关文章

网友评论

      本文标题:使用SXSSFWorkbook并发导出大批量数据Excel

      本文链接:https://www.haomeiwen.com/subject/ecborktx.html