美文网首页
千万级数据导出

千万级数据导出

作者: 武云霄 | 来源:发表于2019-03-28 17:35 被阅读0次
import com.sun.istack.internal.Nullable;
import com.suning.framework.dal.client.support.Configuration;
import com.suning.framework.dal.client.support.MappedStatement;
import com.suning.framework.dal.client.support.ShardingDalClient;
import com.suning.framework.dal.client.support.executor.MappedSqlExecutor;
import com.suning.framework.dal.exception.DalException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
import org.springframework.jdbc.core.namedparam.NamedParameterUtils;
import org.springframework.jdbc.core.namedparam.ParsedSql;
import org.springframework.jdbc.core.namedparam.SqlParameterSource;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.sql.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;

/**
 * 〈一句话描述功能〉<br>
 *
 * @author 18070948
 * @date: 2019/3/6 16:47
 * @see [相关类/方法](可选)
 * @since [产品/模块版本] (可选)
 */
public class ExportDataThread extends Thread {

    private final Logger logger = LoggerFactory.getLogger(getClass());

    private boolean isInitialization = false;
    /**
     * 运行模式
     */
    private int perationMode = 0;

    private volatile int currentExportVolume = 0;

    /**
     * 当前导出的数据量
     *
     * @return
     */
    public int getCurrentExportVolume() {
        return currentExportVolume;
    }

    /**
     * 生成的导出文件
     */
    private File generatedFile;

    public File getGeneratedFile() {
        return generatedFile;
    }

    private Map<String, String> headerConversionMap = new HashMap<String, String>();

    /**
     * 设置表头转换MAP
     *
     * @param headerConversionMap
     */
    public void setHeaderConversionMap(Map<String, String> headerConversionMap) {
        this.headerConversionMap = headerConversionMap;
    }

    private int fileSegmentationSize = 0;

    /**
     * 设置文件分割大小,默认0(不分割)
     *
     * @param fileSegmentationSize
     */
    public void setFileSegmentationSize(int fileSegmentationSize) {
        this.fileSegmentationSize = fileSegmentationSize;
    }


    private Map<String, Object> params;


    /**
     * 运行模式A:通过dalClient+sqlId+参数MAP
     */
    private final int OPERATING_MODE_A = 1;

    private String sql;

    private Object[] paramArray;

    private Connection connection;

    /**
     * 运行模式B:数据源连接+sql+参数
     */
    private final int OPERATING_MODE_B = 2;

    /**
     * 初始化,数据源连接+sql+参数
     *
     * @param connection
     * @param sql
     * @param paramArray
     */
    public void initialization(@Nullable Connection connection, @Nullable String sql, @Nullable Object[] paramArray) {
        this.connection = connection;
        this.sql = sql;
        this.paramArray = paramArray;
        perationMode = OPERATING_MODE_B;
        currentExportVolume = 0;
        isInitialization = true;
    }

    private CountDownLatch countDownLatch;

    public void setCountDownLatch(CountDownLatch countDownLatch) {
        this.countDownLatch = countDownLatch;
    }

    @Override
    public void run() {
        //未初始化
        if (!isInitialization) {
            return;
        }
        try {
            switch (perationMode) {
                case OPERATING_MODE_A:
                    generatedFile = exportFile(params, sqlId);
                    break;
                case OPERATING_MODE_B:
                    generatedFile = exportFile(connection, sql, paramArray);
                    break;
                default:
                    break;
            }
        } catch (SQLException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
        //
        isInitialization = false;
        headerConversionMap = new HashMap<String, String>();
        countDownLatch.countDown();
    }


    private File exportFile(Connection connection, String sqlToUse, Object[] paramArray) throws SQLException, IOException {
        PreparedStatement ps = connection.prepareStatement(sqlToUse,
                ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
        for (int i = 0; i < paramArray.length; i++) {
            ps.setObject(i + 1, paramArray[i]);
        }
        ps.setFetchSize(1000);
        ResultSet rs = ps.executeQuery();
        ResultSetMetaData resultSetMetaData = rs.getMetaData();
        return generateFile(rs, resultSetMetaData);
    }

    @SuppressWarnings("unchecked")
    private Connection getConnection(Map<String, Object> params, String sqlId) throws SQLException {
        Connection connection;
        Method lookupMappedSqlExecutorMethod = null;
        try {
            lookupMappedSqlExecutorMethod = ShardingDalClient.class.getDeclaredMethod("lookupMappedSqlExecutor", String.class, Object.class);
        } catch (NoSuchMethodException e) {
            return null;
        }
        lookupMappedSqlExecutorMethod.setAccessible(true);
        List<MappedSqlExecutor> mappedSqlExecutor = null;
        try {
            mappedSqlExecutor = (List<MappedSqlExecutor>) lookupMappedSqlExecutorMethod.invoke(shardingDalClient, sqlId, params);
        } catch (IllegalAccessException e) {
            return null;
        } catch (InvocationTargetException e) {
            return null;
        }
        if (mappedSqlExecutor.size() != 1) {
            throw new DalException(
                    "Route to multiple data sources when exporting data, params are" + params);
        }
        connection = mappedSqlExecutor.get(0).getDataSource().getConnection();
        return connection;
    }

    private File generateFile(ResultSet rs, ResultSetMetaData resultSetMetaData) throws IOException, SQLException {
        long startTime = System.currentTimeMillis();
/*        if (fileSegmentationSize != 0) {

        }*/
        File sqlResult = new File("/sqlTempResults/" + System.currentTimeMillis() + "_QueryResult.csv");
        if (!sqlResult.getParentFile().exists()) {
            sqlResult.getParentFile().mkdirs();
        }
        sqlResult.createNewFile();
        OutputStream sqlResultOutStream = new FileOutputStream(sqlResult);
        char separator = ',';
        char lineBreak = '\n';
        StringBuilder stringBuilder = new StringBuilder();
        for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
            String column = resultSetMetaData.getColumnName(i);
            if (headerConversionMap.containsKey(column)) {
                column = headerConversionMap.get(column);
            }
            stringBuilder.append(column).append(separator);
        }
        stringBuilder.setCharAt(stringBuilder.length() - 1, lineBreak);
        sqlResultOutStream.write(stringBuilder.toString().getBytes());
        try {
            while (rs.next() && !isInterrupted()) {
                stringBuilder = new StringBuilder();
                for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
                    Object object = rs.getObject(i);
                    if (object != null) {
                        stringBuilder.append(object);
                    }
                    stringBuilder.append(separator);
                }
                stringBuilder.setCharAt(stringBuilder.length() - 1, lineBreak);
                synchronized (this) {
                    sqlResultOutStream.write(stringBuilder.toString().getBytes());
                    currentExportVolume++;
                }
            }
        } finally {
            sqlResultOutStream.close();
        }
        logger.info("TimeCost:" + (System.currentTimeMillis() - startTime));
        return sqlResult;
    }

}
    private void downLoadFile(HttpServletResponse response, File downLoadFile) throws IOException {
        response.setContentType(Files.probeContentType(downLoadFile.toPath()));
        response.setCharacterEncoding("UTF-8");
        response.setHeader("Content-Disposition",
                "inline; filename=\"" + downLoadFile.getName() + "\"");
        response.setContentLength((int) downLoadFile.length());
        InputStream inputStream = new FileInputStream(downLoadFile);
        OutputStream outputStream = response.getOutputStream();
        byte[] buffer = new byte[1024];
        int len = 0;
        while ((len = inputStream.read(buffer)) != -1) {
            outputStream.write(buffer, 0, len);
        }
        inputStream.close();
        outputStream.flush();
        outputStream.close();
    }

转载注明地址:https://www.jianshu.com/p/d4551af0fef1

相关文章

  • 千万级数据导出

    转载注明地址:https://www.jianshu.com/p/d4551af0fef1

  • MySQL的数据导入导出

    MySQL数据库的导入导出 导出 导出数据库 导出数据 导出单表数据 导出单表数据结构 操作数据库 导出整个数据库...

  • 性能优化:大表数据导出

    最近项目中有一个需求:根据查询条件导出(mysql)单表数据(几千万条数据量级的导出)。而查询条件各种各样,无法在...

  • 千万级海量数据导出中间件的思考

    导出主要考虑生成Excel文件存储数据。但是,Excel一般是有行列的最大限制的。因此,实现导出逻辑的时候需要把一...

  • MySQL导入导出一个、多个、全部数据库,一张、多张表

    导出 导出一个数据库 导出多个数据库 导出全部数据库 导出一张表 导出多张表 导入 导入一个数据库 导入多个数据库...

  • 千万级数据更新

    ORACLE千万级单表数据更新 更新场景是,千万级的单表数据,以列2更新列1 方法一:写最简单的update语句 ...

  • MySQL备份还原

    备份数据库,备份表 导出数据库,表结构+数据 导出数据库,仅表结构。-d 不导出数据只导出结构,--add-dro...

  • Oracle数据库常见操作

    一篇详尽的Oracle常见语句记录~~不断补充 数据导出 导出库 导入库 数据泵导出 数据泵导入 基操 导出数据库...

  • mysqldump备份数据

    标签(空格分隔): mysql 1 导出数据库 1.1 导出所有数据库 1.2 导出单个数据库 1.3 导出...

  • mysql 数据库导入导出

    mysql 数据库导入导出 一般形式: 注意: 一:数据库的导出 导出数据库结构和数据(此时不用加-d),如下导出...

网友评论

      本文标题:千万级数据导出

      本文链接:https://www.haomeiwen.com/subject/qvjcbqtx.html