美文网首页
JDBC三种读取方式

JDBC三种读取方式

作者: 毛小力 | 来源:发表于2018-09-13 16:38 被阅读0次
    1. 一次全部(默认):一次获取全部。对应MySQL对象为RowDataStatic
    2. 流式:多次获取,一次一行。对应MySQL对象为RowDataDynamic
    3. 游标:多次获取,一次多行。对应MySQL对象为RowDataCursor

    MySQL:获取ResultSet

    // com.mysql.jdbc.MysqlIO.java
    protected ResultSetImpl getResultSet(StatementImpl callingStatement, long columnCount, 
        int maxRows, int resultSetType, int resultSetConcurrency, boolean streamResults, 
        String catalog, boolean isBinaryEncoded, Field[] metadataFromCache) throws SQLException {
    
        Buffer packet; // The packet from the server
        Field[] fields = null;
    
        // Read in the column information
        ...
       
        packet = reuseAndReadPacket(this.reusablePacket);
        readServerStatusForResultSets(packet);
    
        // 游标模式
        if (this.connection.versionMeetsMinimum(5, 0, 2) && 
            this.connection.getUseCursorFetch() && 
            isBinaryEncoded && callingStatement != null && 
            callingStatement.getFetchSize() != 0 && 
            callingStatement.getResultSetType() == ResultSet.TYPE_FORWARD_ONLY) {
    
            ServerPreparedStatement prepStmt = (com.mysql.jdbc.ServerPreparedStatement) callingStatement;
    
            boolean usingCursor = true;
            ...
    
            if (usingCursor) {
                RowData rows = new RowDataCursor(this, prepStmt, fields);
                ResultSetImpl rs = buildResultSetWithRows(callingStatement, catalog, fields, rows, 
                    resultSetType, resultSetConcurrency, isBinaryEncoded);
                if (usingCursor) {
                    rs.setFetchSize(callingStatement.getFetchSize());
                }
                return rs;
            }
        }
    
        RowData rowData = null;
    
        if (!streamResults) { // 一次全部模式
            rowData = readSingleRowSet(columnCount, maxRows, resultSetConcurrency, 
                isBinaryEncoded, (metadataFromCache == null) ? fields : metadataFromCache);
    
        } else { // 流式模式
            rowData = new RowDataDynamic(this, (int) columnCount, 
                (metadataFromCache == null) ? fields : metadataFromCache, isBinaryEncoded);
            this.streamingData = rowData;
        }
    
        ResultSetImpl rs = buildResultSetWithRows(callingStatement, catalog, 
            (metadataFromCache == null) ? fields : metadataFromCache, 
            rowData, resultSetType, resultSetConcurrency, isBinaryEncoded);
        return rs;
    }
    

    流式

    1. JDBC代码
    String url = "jdbc:mysql://ip:port/db?user=&password"; 
    String sql = "";
    
    Class.forName("com.mysql.jdbc.Driver");
    
    Connection conn = null;
    PreparedStatement ps = null;
    ResultSet rs = null;
    int count = 0;
    try {
        conn = DriverManager.getConnection(url);
        ps = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
        ps.setFetchSize(Integer.MIN_VALUE);
        
        rs = ps.executeQuery();
        while (rs.next()) {
            System.out.println(++count);
        }
    } finally {
        if (rs != null) rs.close();
        if (ps != null) ps.close();
        if (conn != null) conn.close();
    }
    

    必须项:

    • resultSetType: TYPE_FORWARD_ONLY
    • resultSetConcurrency: CONCUR_READ_ONLY
    • fetchSize: Integer.MIN_VALUE
    1. MySQL代码
    • StatementImpl.java
    // com.mysql.jdbc.StatementImpl.java
    // We only stream result sets when they are forward-only, read-only, and the
    // fetch size has been set to Integer.MIN_VALUE
    protected boolean createStreamingResultSet() {
        try {
            synchronized (checkClosed().getConnectionMutex()) {
                return ((this.resultSetType == java.sql.ResultSet.TYPE_FORWARD_ONLY) &&
                     (this.resultSetConcurrency == java.sql.ResultSet.CONCUR_READ_ONLY) && 
                    (this.fetchSize == Integer.MIN_VALUE));
            }
        } catch (SQLException e) {
            // we can't break the interface, having this be no-op in case of error is ok
            return false;
        }
    }
    
    • RowDataDynamic.java
    // com.mysql.jdbc.RowDataDynamic.java
    // 返回下一行数据
    private void nextRecord() throws SQLException {
        try {
            if (!this.noMoreRows) {
                this.nextRow = this.isInterrupted ? null : 
                    this.io.nextRow(this.metadata, this.columnCount, this.isBinaryEncoded,
                        java.sql.ResultSet.CONCUR_READ_ONLY, true, 
                        this.useBufferRowExplicit, true, null);
    
                if (this.nextRow == null) {
                    this.noMoreRows = true;
                    this.isAfterEnd = true;
                    this.moreResultsExisted = this.io.tackOnMoreStreamingResults(this.owner);
                    if (this.index == -1) {
                        this.wasEmpty = true;
                    }
                }
            } else {
                this.isAfterEnd = true;
            }
        } catch (SQLException sqlEx) {
            ...
        } catch (Exception ex) {
            ...
        }
    }
    
    • MysqlIO.java
    final ResultSetRow nextRow(Field[] fields, int columnCount, boolean isBinaryEncoded, 
        int resultSetConcurrency, boolean useBufferRowIfPossible,
        boolean useBufferRowExplicit, boolean canReuseRowPacketForBufferRow, 
        Buffer existingRowPacket) throws SQLException {
    
        // 一次全部模式路径
        if (this.useDirectRowUnpack && existingRowPacket == null && 
                !isBinaryEncoded && !useBufferRowIfPossible && !useBufferRowExplicit) {
            return nextRowFast(fields, columnCount, isBinaryEncoded, resultSetConcurrency, 
                useBufferRowIfPossible, useBufferRowExplicit, canReuseRowPacketForBufferRow);
        }
    
        Buffer rowPacket = null;
        if (existingRowPacket == null) {
            rowPacket = checkErrorPacket();
            if (!useBufferRowExplicit && useBufferRowIfPossible) {
                if (rowPacket.getBufLength() > this.useBufferRowSizeThreshold) {
                    useBufferRowExplicit = true;
                }
            }
        } else {
            // We attempted to do nextRowFast(), but the packet was a multipacket, so we couldn't unpack it directly
            rowPacket = existingRowPacket;
            checkErrorPacket(existingRowPacket);
        }
    
        if (!isBinaryEncoded) {
            //
            // Didn't read an error, so re-position to beginning of packet in order to read result set data
            //
            rowPacket.setPosition(rowPacket.getPosition() - 1);
    
            if (!rowPacket.isLastDataPacket()) {
                if (resultSetConcurrency == ResultSet.CONCUR_UPDATABLE || (!useBufferRowIfPossible && !useBufferRowExplicit)) {
    
                    byte[][] rowData = new byte[columnCount][];
    
                    for (int i = 0; i < columnCount; i++) {
                        rowData[i] = rowPacket.readLenByteArray(0);
                    }
    
                    return new ByteArrayRow(rowData, getExceptionInterceptor());
                }
    
                if (!canReuseRowPacketForBufferRow) {
                    this.reusablePacket = new Buffer(rowPacket.getBufLength());
                }
    
                return new BufferRow(rowPacket, fields, false, getExceptionInterceptor());
    
            }
    
            readServerStatusForResultSets(rowPacket);
    
            return null;
        }
    
        ...
    }
    

    游标

    1. JDBC代码
    String url = "jdbc:mysql://ip:port/db?user=&password&useCursorFetch=true"; 
    String sql = "";
    
    Class.forName("com.mysql.jdbc.Driver");
    
    Connection conn = null;
    PreparedStatement ps = null;
    ResultSet rs = null;
    int count = 0;
    try {
        conn = DriverManager.getConnection(url);
        ps = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
        ps.setFetchSize(50);
        
        rs = ps.executeQuery();
        while (rs.next()) {
            System.out.println(++count + ": " + rs.getLong(1) + ", " + rs.getLong(2));
        }
        
    } finally {
        if (rs != null) rs.close();
        if (ps != null) ps.close();
        if (conn != null) conn.close();
    }
    

    必须项:

    • connection:useCursorFetch=true
    • resultSetType: TYPE_FORWARD_ONLY
    • fetchSize: 大于0,若为Integer.MIN_VALUE则fetchSize转化为1,即流式模式
    1. MySQL代码
    • RowDataCursor.java
    // com.mysql.jdbc.RowDataCursor.java
    private void fetchMoreRows() throws SQLException {
        if (this.lastRowFetched) {
            this.fetchedRows = new ArrayList<ResultSetRow>(0);
            return;
        }
    
        synchronized (this.owner.connection.getConnectionMutex()) {
            boolean oldFirstFetchCompleted = this.firstFetchCompleted;
            if (!this.firstFetchCompleted) {
                this.firstFetchCompleted = true;
            }
    
            int numRowsToFetch = this.owner.getFetchSize();
            if (numRowsToFetch == 0) {
                numRowsToFetch = this.prepStmt.getFetchSize();
            }
            if (numRowsToFetch == Integer.MIN_VALUE) {
                // Handle the case where the user used 'old' streaming result sets
                numRowsToFetch = 1;
            }
    
            // fetchRowsViaCursor方法读取数据
            this.fetchedRows = this.mysql.fetchRowsViaCursor(this.fetchedRows, 
                this.statementIdOnServer, this.metadata, 
                numRowsToFetch, this.useBufferRowExplicit);
    
            this.currentPositionInFetchedRows = BEFORE_START_OF_ROWS;
            if ((this.mysql.getServerStatus() & SERVER_STATUS_LAST_ROW_SENT) != 0) {
                this.lastRowFetched = true;
                if (!oldFirstFetchCompleted && this.fetchedRows.size() == 0) {
                    this.wasEmpty = true;
                }
            }
        }
    }
    
    • MysqlIO
    // com.mysql.jdbc.MysqlIO.java
    protected List<ResultSetRow> fetchRowsViaCursor(List<ResultSetRow> fetchedRows, 
        long statementId, Field[] columnTypes, int fetchSize,
        boolean useBufferRowExplicit) throws SQLException {
    
        if (fetchedRows == null) {
            fetchedRows = new ArrayList<ResultSetRow>(fetchSize);
        } else {
            fetchedRows.clear();
        }
    
        // 通信获取fetchSize行数据
        this.sharedSendPacket.clear();
        this.sharedSendPacket.writeByte((byte) MysqlDefs.COM_FETCH);
        this.sharedSendPacket.writeLong(statementId);
        this.sharedSendPacket.writeLong(fetchSize);
        sendCommand(MysqlDefs.COM_FETCH, null, this.sharedSendPacket, true, null, 0);
    
        // 读取数据
        ResultSetRow row = null;
        while ((row = nextRow(columnTypes, columnTypes.length, true, 
                ResultSet.CONCUR_READ_ONLY, false, useBufferRowExplicit, false, null)) != null) {
            fetchedRows.add(row);
        }
        return fetchedRows;
    }
    

    相关文章

      网友评论

          本文标题:JDBC三种读取方式

          本文链接:https://www.haomeiwen.com/subject/xdbyeftx.html