美文网首页
JDBC三种读取方式

JDBC三种读取方式

作者: 童伯虎 | 来源:发表于2018-09-13 16:38 被阅读0次
  1. 一次全部(默认):一次获取全部。对应MySQL对象为RowDataStatic
  2. 流式:多次获取,一次一行。对应MySQL对象为RowDataDynamic
  3. 游标:多次获取,一次多行。对应MySQL对象为RowDataCursor

MySQL:获取ResultSet

// com.mysql.jdbc.MysqlIO.java
protected ResultSetImpl getResultSet(StatementImpl callingStatement, long columnCount, 
    int maxRows, int resultSetType, int resultSetConcurrency, boolean streamResults, 
    String catalog, boolean isBinaryEncoded, Field[] metadataFromCache) throws SQLException {

    Buffer packet; // The packet from the server
    Field[] fields = null;

    // Read in the column information
    ...
   
    packet = reuseAndReadPacket(this.reusablePacket);
    readServerStatusForResultSets(packet);

    // 游标模式
    if (this.connection.versionMeetsMinimum(5, 0, 2) && 
        this.connection.getUseCursorFetch() && 
        isBinaryEncoded && callingStatement != null && 
        callingStatement.getFetchSize() != 0 && 
        callingStatement.getResultSetType() == ResultSet.TYPE_FORWARD_ONLY) {

        ServerPreparedStatement prepStmt = (com.mysql.jdbc.ServerPreparedStatement) callingStatement;

        boolean usingCursor = true;
        ...

        if (usingCursor) {
            RowData rows = new RowDataCursor(this, prepStmt, fields);
            ResultSetImpl rs = buildResultSetWithRows(callingStatement, catalog, fields, rows, 
                resultSetType, resultSetConcurrency, isBinaryEncoded);
            if (usingCursor) {
                rs.setFetchSize(callingStatement.getFetchSize());
            }
            return rs;
        }
    }

    RowData rowData = null;

    if (!streamResults) { // 一次全部模式
        rowData = readSingleRowSet(columnCount, maxRows, resultSetConcurrency, 
            isBinaryEncoded, (metadataFromCache == null) ? fields : metadataFromCache);

    } else { // 流式模式
        rowData = new RowDataDynamic(this, (int) columnCount, 
            (metadataFromCache == null) ? fields : metadataFromCache, isBinaryEncoded);
        this.streamingData = rowData;
    }

    ResultSetImpl rs = buildResultSetWithRows(callingStatement, catalog, 
        (metadataFromCache == null) ? fields : metadataFromCache, 
        rowData, resultSetType, resultSetConcurrency, isBinaryEncoded);
    return rs;
}

流式

  1. JDBC代码
String url = "jdbc:mysql://ip:port/db?user=&password"; 
String sql = "";

Class.forName("com.mysql.jdbc.Driver");

Connection conn = null;
PreparedStatement ps = null;
ResultSet rs = null;
int count = 0;
try {
    conn = DriverManager.getConnection(url);
    ps = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
    ps.setFetchSize(Integer.MIN_VALUE);
    
    rs = ps.executeQuery();
    while (rs.next()) {
        System.out.println(++count);
    }
} finally {
    if (rs != null) rs.close();
    if (ps != null) ps.close();
    if (conn != null) conn.close();
}

必须项:

  • resultSetType: TYPE_FORWARD_ONLY
  • resultSetConcurrency: CONCUR_READ_ONLY
  • fetchSize: Integer.MIN_VALUE
  1. MySQL代码
  • StatementImpl.java
// com.mysql.jdbc.StatementImpl.java
// We only stream result sets when they are forward-only, read-only, and the
// fetch size has been set to Integer.MIN_VALUE
protected boolean createStreamingResultSet() {
    try {
        synchronized (checkClosed().getConnectionMutex()) {
            return ((this.resultSetType == java.sql.ResultSet.TYPE_FORWARD_ONLY) &&
                 (this.resultSetConcurrency == java.sql.ResultSet.CONCUR_READ_ONLY) && 
                (this.fetchSize == Integer.MIN_VALUE));
        }
    } catch (SQLException e) {
        // we can't break the interface, having this be no-op in case of error is ok
        return false;
    }
}
  • RowDataDynamic.java
// com.mysql.jdbc.RowDataDynamic.java
// 返回下一行数据
private void nextRecord() throws SQLException {
    try {
        if (!this.noMoreRows) {
            this.nextRow = this.isInterrupted ? null : 
                this.io.nextRow(this.metadata, this.columnCount, this.isBinaryEncoded,
                    java.sql.ResultSet.CONCUR_READ_ONLY, true, 
                    this.useBufferRowExplicit, true, null);

            if (this.nextRow == null) {
                this.noMoreRows = true;
                this.isAfterEnd = true;
                this.moreResultsExisted = this.io.tackOnMoreStreamingResults(this.owner);
                if (this.index == -1) {
                    this.wasEmpty = true;
                }
            }
        } else {
            this.isAfterEnd = true;
        }
    } catch (SQLException sqlEx) {
        ...
    } catch (Exception ex) {
        ...
    }
}
  • MysqlIO.java
final ResultSetRow nextRow(Field[] fields, int columnCount, boolean isBinaryEncoded, 
    int resultSetConcurrency, boolean useBufferRowIfPossible,
    boolean useBufferRowExplicit, boolean canReuseRowPacketForBufferRow, 
    Buffer existingRowPacket) throws SQLException {

    // 一次全部模式路径
    if (this.useDirectRowUnpack && existingRowPacket == null && 
            !isBinaryEncoded && !useBufferRowIfPossible && !useBufferRowExplicit) {
        return nextRowFast(fields, columnCount, isBinaryEncoded, resultSetConcurrency, 
            useBufferRowIfPossible, useBufferRowExplicit, canReuseRowPacketForBufferRow);
    }

    Buffer rowPacket = null;
    if (existingRowPacket == null) {
        rowPacket = checkErrorPacket();
        if (!useBufferRowExplicit && useBufferRowIfPossible) {
            if (rowPacket.getBufLength() > this.useBufferRowSizeThreshold) {
                useBufferRowExplicit = true;
            }
        }
    } else {
        // We attempted to do nextRowFast(), but the packet was a multipacket, so we couldn't unpack it directly
        rowPacket = existingRowPacket;
        checkErrorPacket(existingRowPacket);
    }

    if (!isBinaryEncoded) {
        //
        // Didn't read an error, so re-position to beginning of packet in order to read result set data
        //
        rowPacket.setPosition(rowPacket.getPosition() - 1);

        if (!rowPacket.isLastDataPacket()) {
            if (resultSetConcurrency == ResultSet.CONCUR_UPDATABLE || (!useBufferRowIfPossible && !useBufferRowExplicit)) {

                byte[][] rowData = new byte[columnCount][];

                for (int i = 0; i < columnCount; i++) {
                    rowData[i] = rowPacket.readLenByteArray(0);
                }

                return new ByteArrayRow(rowData, getExceptionInterceptor());
            }

            if (!canReuseRowPacketForBufferRow) {
                this.reusablePacket = new Buffer(rowPacket.getBufLength());
            }

            return new BufferRow(rowPacket, fields, false, getExceptionInterceptor());

        }

        readServerStatusForResultSets(rowPacket);

        return null;
    }

    ...
}

游标

  1. JDBC代码
String url = "jdbc:mysql://ip:port/db?user=&password&useCursorFetch=true"; 
String sql = "";

Class.forName("com.mysql.jdbc.Driver");

Connection conn = null;
PreparedStatement ps = null;
ResultSet rs = null;
int count = 0;
try {
    conn = DriverManager.getConnection(url);
    ps = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
    ps.setFetchSize(50);
    
    rs = ps.executeQuery();
    while (rs.next()) {
        System.out.println(++count + ": " + rs.getLong(1) + ", " + rs.getLong(2));
    }
    
} finally {
    if (rs != null) rs.close();
    if (ps != null) ps.close();
    if (conn != null) conn.close();
}

必须项:

  • connection:useCursorFetch=true
  • resultSetType: TYPE_FORWARD_ONLY
  • fetchSize: 大于0,若为Integer.MIN_VALUE则fetchSize转化为1,即流式模式
  1. MySQL代码
  • RowDataCursor.java
// com.mysql.jdbc.RowDataCursor.java
private void fetchMoreRows() throws SQLException {
    if (this.lastRowFetched) {
        this.fetchedRows = new ArrayList<ResultSetRow>(0);
        return;
    }

    synchronized (this.owner.connection.getConnectionMutex()) {
        boolean oldFirstFetchCompleted = this.firstFetchCompleted;
        if (!this.firstFetchCompleted) {
            this.firstFetchCompleted = true;
        }

        int numRowsToFetch = this.owner.getFetchSize();
        if (numRowsToFetch == 0) {
            numRowsToFetch = this.prepStmt.getFetchSize();
        }
        if (numRowsToFetch == Integer.MIN_VALUE) {
            // Handle the case where the user used 'old' streaming result sets
            numRowsToFetch = 1;
        }

        // fetchRowsViaCursor方法读取数据
        this.fetchedRows = this.mysql.fetchRowsViaCursor(this.fetchedRows, 
            this.statementIdOnServer, this.metadata, 
            numRowsToFetch, this.useBufferRowExplicit);

        this.currentPositionInFetchedRows = BEFORE_START_OF_ROWS;
        if ((this.mysql.getServerStatus() & SERVER_STATUS_LAST_ROW_SENT) != 0) {
            this.lastRowFetched = true;
            if (!oldFirstFetchCompleted && this.fetchedRows.size() == 0) {
                this.wasEmpty = true;
            }
        }
    }
}
  • MysqlIO
// com.mysql.jdbc.MysqlIO.java
protected List<ResultSetRow> fetchRowsViaCursor(List<ResultSetRow> fetchedRows, 
    long statementId, Field[] columnTypes, int fetchSize,
    boolean useBufferRowExplicit) throws SQLException {

    if (fetchedRows == null) {
        fetchedRows = new ArrayList<ResultSetRow>(fetchSize);
    } else {
        fetchedRows.clear();
    }

    // 通信获取fetchSize行数据
    this.sharedSendPacket.clear();
    this.sharedSendPacket.writeByte((byte) MysqlDefs.COM_FETCH);
    this.sharedSendPacket.writeLong(statementId);
    this.sharedSendPacket.writeLong(fetchSize);
    sendCommand(MysqlDefs.COM_FETCH, null, this.sharedSendPacket, true, null, 0);

    // 读取数据
    ResultSetRow row = null;
    while ((row = nextRow(columnTypes, columnTypes.length, true, 
            ResultSet.CONCUR_READ_ONLY, false, useBufferRowExplicit, false, null)) != null) {
        fetchedRows.add(row);
    }
    return fetchedRows;
}

相关文章

网友评论

      本文标题:JDBC三种读取方式

      本文链接:https://www.haomeiwen.com/subject/xdbyeftx.html