主体逻辑实现类:
import org.apache.poi.ss.usermodel.Sheet;
import org.apache.poi.xssf.streaming.SXSSFWorkbook;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author
* 高性能并发,低内存导出大批量数据Excel工具类
* 解决大量数据导出可能引起的OOM和速度慢问题
* 若数据量少,可以不用本封装类
* @param <T>
*/
public class TransferDataForExcel<T>{
private final static Logger LOGGER = LoggerFactory.getLogger(TransferDataForExcel.class);
/** 查询导出数据服务 **/
private QueryService<T> queryService;
/** 导出文件名称 **/
private String fileName;
/** 导出数据过程中,最大缓存行数 **/
public final static Integer ROW_ACCESS_WINDOW_SIZE = 5000;
/** 每个sheet页最多存放行数,最大65536条 **/
public final static Integer MAX_ROW_SHEET = 10000;
/**
* 设置缓存队列容量
* 因此,最大载入内存行数 = QUEUE_CAPACITY * MAX_ROW_SHEET = 10 * 10000 = 100000
**/
public final static Integer QUEUE_CAPACITY = 10;
/** 设置线程池核心线程数 **/
public final static Integer THREAD_NUM = 10;
public TransferDataForExcel(QueryService<T> queryService,String fileName){
this.queryService = queryService;
this.fileName = fileName;
}
public void doExport() throws Exception {
Integer totalNum = queryService.getTotalNum();
LOGGER.info(String.format("当前导出Excel总行数=%s",totalNum));
//引入SXSSFWorkbook,利用其高效的Excel数据处理特性
SXSSFWorkbook sxssfWorkbook = new SXSSFWorkbook(ROW_ACCESS_WINDOW_SIZE);
Integer sheetNum = (totalNum / TransferDataForExcel.MAX_ROW_SHEET) + 1;
LOGGER.info(String.format("当前导出Excel总Sheet页数=%s",sheetNum));
List<Sheet> sheetList = new ArrayList<>();
for(int i=0;i<sheetNum;i++){
Sheet sheet = sxssfWorkbook.createSheet();
sheetList.add(sheet);
}
//把任务分片后交给线程池执行,每个分页查询的结果保存进一个sheet页
ArrayBlockingQueue<List<T>> queue = new ArrayBlockingQueue<>(QUEUE_CAPACITY);
ExecutorService producerExecutorService = Executors.newFixedThreadPool(THREAD_NUM);
for (int i = 1; i <= sheetNum; i++){
Producer task = new Producer(queryService,i,queue);
producerExecutorService.submit(task);
}
producerExecutorService.shutdown();
//消费者开始消费阻塞队列中数据
ExecutorService consumerExecutorService = Executors.newFixedThreadPool(THREAD_NUM);
CountDownLatch countDownLatch = new CountDownLatch(sheetList.size());
AtomicInteger count = new AtomicInteger(0);
for(Sheet sheet : sheetList) {
Consumer consumer = new Consumer(queryService,queue,countDownLatch,sheet,totalNum,count);
consumerExecutorService.submit(consumer);
}
countDownLatch.await();
consumerExecutorService.shutdown();
LOGGER.info("消费者处理完成,线程池关闭");
Http.Response response = Http.Response.current();
response.setHeader("Content-Disposition", "attachment;filename=" + URLEncoder.encode(fileName, "UTF-8"));
response.setHeader("Content-Type", "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet");
try{
sxssfWorkbook.write(response.out);
}catch (Exception e){
LOGGER.error(String.format("sxssfWorkbook写入输出流错误,原因=%s",e.getMessage()),e);
}
}
}
抽象接口,使用时只要实现这个接口的方法,new出TransferDataForExcel实例,把实现类当参数传入即可。
import org.apache.poi.ss.usermodel.Row;
import org.apache.poi.xssf.streaming.SXSSFSheet;
import org.apache.poi.xssf.streaming.SXSSFWorkbook;
import java.util.List;
public interface QueryService<T> {
/**
* 获取导出总数
* @return
*/
public Integer getTotalNum();
/**
* 分页查询导出的数据
* @param page 起始页
* @param pageSize 每页数量
* @return
*/
public List<T> queryData(Integer page,Integer pageSize);
/**
* 自定义构建导出结果
* @param row 第一行,表头
* @throws Exception
*/
public void setHeaderForExcel(Row row);
/**
* 自定义构建导出结果
* @param row 当前遍历到的正在操作的行对象
* @param statisticsDTO 当前行对应的DTO对象
*/
public void setCellForExcel(Row row, T statisticsDTO);
}
导出Excel的数据源生产者:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
public class Producer<T> implements Callable {
private final static Logger LOGGER = LoggerFactory.getLogger(Producer.class);
private QueryService queryService;
private Integer page;
private ArrayBlockingQueue<List<T>> queue;
/**
* 初始化任务类
* @param page
*/
public Producer(QueryService queryService, Integer page, ArrayBlockingQueue<List<T>> queue) {
//对数据进行分页查询
this.queryService = queryService;
this.page = (page - 1) * TransferDataForExcel.MAX_ROW_SHEET;
this.queue = queue;
}
@Override
public Boolean call(){
try {
while(true){
if(queue.remainingCapacity() > 0){
//最多同时查询页数控制等于队列容量,避免同时查询过多结果载入内存引发OOM
List<T> orderList = queryService.queryData(page,TransferDataForExcel.MAX_ROW_SHEET);
queue.put(orderList);
break;
}else{
Thread.sleep(300);
LOGGER.info(String.format("导出excel队列空间=%s,生产者put队列已满,阻塞等待中",queue.size()));
continue;
}
}
return true;
} catch (InterruptedException e) {
LOGGER.error(String.format("导出excel生产者入队错误,原因=%s",e.getMessage()));
return false;
}
}
}
消费数据导入Excel的消费者:
import org.apache.poi.ss.usermodel.Row;
import org.apache.poi.ss.usermodel.Sheet;
import org.apache.poi.xssf.streaming.SXSSFSheet;
import org.apache.poi.xssf.streaming.SXSSFWorkbook;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 导出excel队列消费者
* @param <T>
*/
public class Consumer<T> implements Callable{
private final static Logger LOGGER = LoggerFactory.getLogger(Consumer.class);
private QueryService<T> queryService;
private ArrayBlockingQueue<List<T>> queue;
private CountDownLatch countDownLatch;
private AtomicInteger count; //已导入行数
private Integer total;
private Sheet sheet;
public Consumer(QueryService<T> queryService, ArrayBlockingQueue<List<T>> queue,
CountDownLatch countDownLatch,Sheet sheet, Integer total, AtomicInteger count) {
this.queryService = queryService;
this.queue = queue;
this.countDownLatch = countDownLatch;
this.total = total;
this.count = count;
this.sheet = sheet;
}
@Override
public Boolean call() {
try {
if(queue.remainingCapacity() == queue.size()){
LOGGER.info(String.format("队列空间=%s,消费者take队列为空,阻塞等待中",queue.size()));
}
List<T> orderList = queue.take();
//根据总条数确定消费者本次写入数据时的sheet和开始行数
int currentCount = count.intValue();
count.addAndGet(orderList.size());
writeExcel(orderList,sheet);
LOGGER.info(String.format("需写入excel总行数=%s,已写入行数=%s,当前操作sheet页=%s",
total,currentCount,sheet.getSheetName()));
countDownLatch.countDown();
LOGGER.info(String.format("消费者已处理任务数=%s",countDownLatch.getCount()));
return true;
} catch (Exception e) {
countDownLatch.countDown();
LOGGER.error(String.format("导出excel消费者处理错误=%s",e.getMessage()),e);
LOGGER.info(String.format("消费者已处理任务数=%s",countDownLatch.getCount()));
return false;
}
}
private void writeExcel(List<T> orderList,Sheet sheet){
if (CollectionUtils.isEmpty(orderList)){
return;
}
Row headRow = sheet.createRow(0);
queryService.setHeaderForExcel(headRow);
for (int i = 0; i < orderList.size(); i++) {
int beginRowNum = i+1;
Row bodyRow = sheet.createRow(beginRowNum);
T statisticsDTO = orderList.get(i);
queryService.setCellForExcel(bodyRow,statisticsDTO);
}
orderList.clear();
}
}
网友评论