美文网首页
千万级数据导出

千万级数据导出

作者: 武云霄 | 来源:发表于2019-03-28 17:35 被阅读0次
    import com.sun.istack.internal.Nullable;
    import com.suning.framework.dal.client.support.Configuration;
    import com.suning.framework.dal.client.support.MappedStatement;
    import com.suning.framework.dal.client.support.ShardingDalClient;
    import com.suning.framework.dal.client.support.executor.MappedSqlExecutor;
    import com.suning.framework.dal.exception.DalException;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
    import org.springframework.jdbc.core.namedparam.NamedParameterUtils;
    import org.springframework.jdbc.core.namedparam.ParsedSql;
    import org.springframework.jdbc.core.namedparam.SqlParameterSource;
    
    import java.io.File;
    import java.io.FileOutputStream;
    import java.io.IOException;
    import java.io.OutputStream;
    import java.lang.reflect.InvocationTargetException;
    import java.lang.reflect.Method;
    import java.sql.*;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.CountDownLatch;
    
    /**
     * 〈一句话描述功能〉<br>
     *
     * @author 18070948
     * @date: 2019/3/6 16:47
     * @see [相关类/方法](可选)
     * @since [产品/模块版本] (可选)
     */
    public class ExportDataThread extends Thread {
    
        private final Logger logger = LoggerFactory.getLogger(getClass());
    
        private boolean isInitialization = false;
        /**
         * 运行模式
         */
        private int perationMode = 0;
    
        private volatile int currentExportVolume = 0;
    
        /**
         * 当前导出的数据量
         *
         * @return
         */
        public int getCurrentExportVolume() {
            return currentExportVolume;
        }
    
        /**
         * 生成的导出文件
         */
        private File generatedFile;
    
        public File getGeneratedFile() {
            return generatedFile;
        }
    
        private Map<String, String> headerConversionMap = new HashMap<String, String>();
    
        /**
         * 设置表头转换MAP
         *
         * @param headerConversionMap
         */
        public void setHeaderConversionMap(Map<String, String> headerConversionMap) {
            this.headerConversionMap = headerConversionMap;
        }
    
        private int fileSegmentationSize = 0;
    
        /**
         * 设置文件分割大小,默认0(不分割)
         *
         * @param fileSegmentationSize
         */
        public void setFileSegmentationSize(int fileSegmentationSize) {
            this.fileSegmentationSize = fileSegmentationSize;
        }
    
    
        private Map<String, Object> params;
    
    
        /**
         * 运行模式A:通过dalClient+sqlId+参数MAP
         */
        private final int OPERATING_MODE_A = 1;
    
        private String sql;
    
        private Object[] paramArray;
    
        private Connection connection;
    
        /**
         * 运行模式B:数据源连接+sql+参数
         */
        private final int OPERATING_MODE_B = 2;
    
        /**
         * 初始化,数据源连接+sql+参数
         *
         * @param connection
         * @param sql
         * @param paramArray
         */
        public void initialization(@Nullable Connection connection, @Nullable String sql, @Nullable Object[] paramArray) {
            this.connection = connection;
            this.sql = sql;
            this.paramArray = paramArray;
            perationMode = OPERATING_MODE_B;
            currentExportVolume = 0;
            isInitialization = true;
        }
    
        private CountDownLatch countDownLatch;
    
        public void setCountDownLatch(CountDownLatch countDownLatch) {
            this.countDownLatch = countDownLatch;
        }
    
        @Override
        public void run() {
            //未初始化
            if (!isInitialization) {
                return;
            }
            try {
                switch (perationMode) {
                    case OPERATING_MODE_A:
                        generatedFile = exportFile(params, sqlId);
                        break;
                    case OPERATING_MODE_B:
                        generatedFile = exportFile(connection, sql, paramArray);
                        break;
                    default:
                        break;
                }
            } catch (SQLException e) {
                e.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            }
            //
            isInitialization = false;
            headerConversionMap = new HashMap<String, String>();
            countDownLatch.countDown();
        }
    
    
        private File exportFile(Connection connection, String sqlToUse, Object[] paramArray) throws SQLException, IOException {
            PreparedStatement ps = connection.prepareStatement(sqlToUse,
                    ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
            for (int i = 0; i < paramArray.length; i++) {
                ps.setObject(i + 1, paramArray[i]);
            }
            ps.setFetchSize(1000);
            ResultSet rs = ps.executeQuery();
            ResultSetMetaData resultSetMetaData = rs.getMetaData();
            return generateFile(rs, resultSetMetaData);
        }
    
        @SuppressWarnings("unchecked")
        private Connection getConnection(Map<String, Object> params, String sqlId) throws SQLException {
            Connection connection;
            Method lookupMappedSqlExecutorMethod = null;
            try {
                lookupMappedSqlExecutorMethod = ShardingDalClient.class.getDeclaredMethod("lookupMappedSqlExecutor", String.class, Object.class);
            } catch (NoSuchMethodException e) {
                return null;
            }
            lookupMappedSqlExecutorMethod.setAccessible(true);
            List<MappedSqlExecutor> mappedSqlExecutor = null;
            try {
                mappedSqlExecutor = (List<MappedSqlExecutor>) lookupMappedSqlExecutorMethod.invoke(shardingDalClient, sqlId, params);
            } catch (IllegalAccessException e) {
                return null;
            } catch (InvocationTargetException e) {
                return null;
            }
            if (mappedSqlExecutor.size() != 1) {
                throw new DalException(
                        "Route to multiple data sources when exporting data, params are" + params);
            }
            connection = mappedSqlExecutor.get(0).getDataSource().getConnection();
            return connection;
        }
    
        private File generateFile(ResultSet rs, ResultSetMetaData resultSetMetaData) throws IOException, SQLException {
            long startTime = System.currentTimeMillis();
    /*        if (fileSegmentationSize != 0) {
    
            }*/
            File sqlResult = new File("/sqlTempResults/" + System.currentTimeMillis() + "_QueryResult.csv");
            if (!sqlResult.getParentFile().exists()) {
                sqlResult.getParentFile().mkdirs();
            }
            sqlResult.createNewFile();
            OutputStream sqlResultOutStream = new FileOutputStream(sqlResult);
            char separator = ',';
            char lineBreak = '\n';
            StringBuilder stringBuilder = new StringBuilder();
            for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
                String column = resultSetMetaData.getColumnName(i);
                if (headerConversionMap.containsKey(column)) {
                    column = headerConversionMap.get(column);
                }
                stringBuilder.append(column).append(separator);
            }
            stringBuilder.setCharAt(stringBuilder.length() - 1, lineBreak);
            sqlResultOutStream.write(stringBuilder.toString().getBytes());
            try {
                while (rs.next() && !isInterrupted()) {
                    stringBuilder = new StringBuilder();
                    for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
                        Object object = rs.getObject(i);
                        if (object != null) {
                            stringBuilder.append(object);
                        }
                        stringBuilder.append(separator);
                    }
                    stringBuilder.setCharAt(stringBuilder.length() - 1, lineBreak);
                    synchronized (this) {
                        sqlResultOutStream.write(stringBuilder.toString().getBytes());
                        currentExportVolume++;
                    }
                }
            } finally {
                sqlResultOutStream.close();
            }
            logger.info("TimeCost:" + (System.currentTimeMillis() - startTime));
            return sqlResult;
        }
    
    }
        private void downLoadFile(HttpServletResponse response, File downLoadFile) throws IOException {
            response.setContentType(Files.probeContentType(downLoadFile.toPath()));
            response.setCharacterEncoding("UTF-8");
            response.setHeader("Content-Disposition",
                    "inline; filename=\"" + downLoadFile.getName() + "\"");
            response.setContentLength((int) downLoadFile.length());
            InputStream inputStream = new FileInputStream(downLoadFile);
            OutputStream outputStream = response.getOutputStream();
            byte[] buffer = new byte[1024];
            int len = 0;
            while ((len = inputStream.read(buffer)) != -1) {
                outputStream.write(buffer, 0, len);
            }
            inputStream.close();
            outputStream.flush();
            outputStream.close();
        }
    

    转载注明地址:https://www.jianshu.com/p/d4551af0fef1

    相关文章

      网友评论

          本文标题:千万级数据导出

          本文链接:https://www.haomeiwen.com/subject/qvjcbqtx.html