美文网首页程序员
浅析MySQL的批量提交的优化

浅析MySQL的批量提交的优化

作者: ElseF | 来源:发表于2018-12-22 15:12 被阅读3次

    公众号请关注ElseF,ElseF(Else Figure),是一个专注于「分享」国内外高质量的科技类文章和新闻的自媒体。维护者主要来自国内外大型互联网公司和创业公司,主要专注于与互联网相关的精彩内容。


    微信公众号

    Batch提交

    Batch提交是用一个Connection来对若干个SQL进行一次性、批量提交给MySQL处理。但有一个问题是,如果某个SQL 挂了而其他的并不会受影响,这样就无法保证原子性,只能再封装一层Transaction来实现。 同时,Batch处理并不是每个Server都支持的,需要看具体的产品和版本号。

    代码

    我们通过观察具体的客户端实现代码来了解批量提交是如何实现的 spring-jdbc-5.1.3.RELEASE中的实现如下:

    @Override
        public int[] batchUpdate(String sql, SqlParameterSource[] batchArgs) {
            if (batchArgs.length == 0) {
                return new int[0];
            }
    
            ParsedSql parsedSql = getParsedSql(sql);
            PreparedStatementCreatorFactory pscf = getPreparedStatementCreatorFactory(parsedSql, batchArgs[0]);
            // 调用另外一个batchUpdate方法
            return getJdbcOperations().batchUpdate(
                    pscf.getSql(),
                    new BatchPreparedStatementSetter() {
                        @Override
                        public void setValues(PreparedStatement ps, int i) throws SQLException {
                            Object[] values = NamedParameterUtils.buildValueArray(parsedSql, batchArgs[i], null);
                            pscf.newPreparedStatementSetter(values).setValues(ps);
                        }
                        @Override
                        public int getBatchSize() {
                            return batchArgs.length;
                        }
                    });
        }
    
    

    第二个batchUpdate方法如下:

    @Override
        public int[] batchUpdate(String sql, final BatchPreparedStatementSetter pss) throws DataAccessException {
            if (logger.isDebugEnabled()) {
                logger.debug("Executing SQL batch update [" + sql + "]");
            }
    
            // 调用execute方法,传入sql和回调接口
            int[] result = execute(sql, (PreparedStatementCallback<int[]>) ps -> {
                try {
                    int batchSize = pss.getBatchSize();
                    InterruptibleBatchPreparedStatementSetter ipss =
                            (pss instanceof InterruptibleBatchPreparedStatementSetter ?
                            (InterruptibleBatchPreparedStatementSetter) pss : null);
                    if (JdbcUtils.supportsBatchUpdates(ps.getConnection())) {
                        // 如果支持批处理操作则依次将每个SQL的参数进行填充
                        for (int i = 0; i < batchSize; i++) {
                            pss.setValues(ps, i);
                            if (ipss != null && ipss.isBatchExhausted(i)) {
                                break;
                            }
                            ps.addBatch();
                        }
                        // 执行批处理函数(底层交给jdbc driver的实现类去执行,如mysql-connector-java)
                        return ps.executeBatch();
                    }
                    else {
                        List<Integer> rowsAffected = new ArrayList<>();
                        for (int i = 0; i < batchSize; i++) {
                            pss.setValues(ps, i);
                            if (ipss != null && ipss.isBatchExhausted(i)) {
                                break;
                            }
                            rowsAffected.add(ps.executeUpdate());
                        }
                        int[] rowsAffectedArray = new int[rowsAffected.size()];
                        for (int i = 0; i < rowsAffectedArray.length; i++) {
                            rowsAffectedArray[i] = rowsAffected.get(i);
                        }
                        return rowsAffectedArray;
                    }
                }
                finally {
                    if (pss instanceof ParameterDisposer) {
                        ((ParameterDisposer) pss).cleanupParameters();
                    }
                }
            });
    
            Assert.state(result != null, "No result array");
            return result;
        }
    
    

    调用的execute方法如下:

    public <T> T execute(PreparedStatementCreator psc, PreparedStatementCallback<T> action)
                throws DataAccessException {
    
            Assert.notNull(psc, "PreparedStatementCreator must not be null");
            Assert.notNull(action, "Callback object must not be null");
            if (logger.isDebugEnabled()) {
                String sql = getSql(psc);
                logger.debug("Executing prepared SQL statement" + (sql != null ? " [" + sql + "]" : ""));
            }
    
            Connection con = DataSourceUtils.getConnection(obtainDataSource());
            PreparedStatement ps = null;
            try {
                ps = psc.createPreparedStatement(con);
                applyStatementSettings(ps);
                T result = action.doInPreparedStatement(ps);
                handleWarnings(ps);
                return result;
            }
            catch (SQLException ex) {
                // Release Connection early, to avoid potential connection pool deadlock
                // in the case when the exception translator hasn't been initialized yet.
                if (psc instanceof ParameterDisposer) {
                    ((ParameterDisposer) psc).cleanupParameters();
                }
                String sql = getSql(psc);
                JdbcUtils.closeStatement(ps);
                ps = null;
                DataSourceUtils.releaseConnection(con, getDataSource());
                con = null;
                throw translateException("PreparedStatementCallback", sql, ex);
            }
            finally {
                if (psc instanceof ParameterDisposer) {
                    ((ParameterDisposer) psc).cleanupParameters();
                }
                JdbcUtils.closeStatement(ps);
                DataSourceUtils.releaseConnection(con, getDataSource());
            }
        }
    
    

    上面是spring-jdbc的batchUpdate逻辑的实现,但是具体的业务执行是由Server来执行的,所以不同的Server的实现机制不同可能对性能有较大影响。 以MySQL为例,在mysql-connector-java-5.1.43的实现如下:

        public boolean canRewriteAsMultiValueInsertAtSqlLevel() throws SQLException {
            return this.parseInfo.canRewriteAsMultiValueInsert;
        }
    
        @Override
            protected long[] executeBatchInternal() throws SQLException {
                synchronized (checkClosed().getConnectionMutex()) {
    
                    if (this.connection.isReadOnly()) {
                        throw new SQLException(Messages.getString("PreparedStatement.25") + Messages.getString("PreparedStatement.26"),
                                SQLError.SQL_STATE_ILLEGAL_ARGUMENT);
                    }
    
                    if (this.batchedArgs == null || this.batchedArgs.size() == 0) {
                        return new long[0];
                    }
    
                    // we timeout the entire batch, not individual statements
                    int batchTimeout = this.timeoutInMillis;
                    this.timeoutInMillis = 0;
    
                    resetCancelledState();
    
                    try {
                        statementBegins();
    
                        clearWarnings();
                        // 注意这里的条件,batchHasPlainStatements默认初始化就是false,而rewriteBatchedStatements
                        // 需要在jdbc url里制定,否则不会走合并SQL的逻辑,如jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&rewriteBatchedStatements=true
                        if (!this.batchHasPlainStatements && this.connection.getRewriteBatchedStatements()) {
    
                            if (canRewriteAsMultiValueInsertAtSqlLevel()) {
                                return executeBatchedInserts(batchTimeout);
                            }
    
                            if (this.connection.versionMeetsMinimum(4, 1, 0) && !this.batchHasPlainStatements && this.batchedArgs != null
                                    && this.batchedArgs.size() > 3 /* cost of option setting rt-wise */) {
                                return executePreparedBatchAsMultiStatement(batchTimeout);
                            }
                        }
    
                        return executeBatchSerially(batchTimeout);
                    } finally {
                        this.statementExecuting.set(false);
    
                        clearBatch();
                    }
                }
            }
    
        /** * Rewrites the already prepared statement into a multi-value insert * statement of 'statementsPerBatch' values and executes the entire batch * using this new statement. * * @return update counts in the same fashion as executeBatch() * * @throws SQLException */
        protected long[] executeBatchedInserts(int batchTimeout) throws SQLException {
            ......
        }
    
    

    如何整合

    根据DML的类型是insert、update还是delete,会走不同方法:

    • 如果是insert语句,满成条件情况下,会整合成:”insert into xxx_table values (xx),(yy),(zz)…”这样的语句。
    • 如果是update\delete语句,满成条件情况下,会整合成:”update t set … where id = 1; update t set … where id = 2; update t set … where id = 3 …“这样的语句。

    总结

    如果想要达到MySQL真正batchUpdate效果,需要有以下几个条件:

    • 需要在jdbcUrl后添加参数rewriteBatchedStatements=true
    • this.batchHasPlainStatements为false
    • 如果是update\delete 语句,还需要mysql版本>=4.1.0,并且batch的数量>3

    References

    本文首次发布于 ElseF’s Blog, 作者 @stuartlau ,转载请保留原文链接.

    相关文章

      网友评论

        本文标题:浅析MySQL的批量提交的优化

        本文链接:https://www.haomeiwen.com/subject/jzbikqtx.html