- 一次全部(默认):一次获取全部。对应MySQL对象为RowDataStatic。
- 流式:多次获取,一次一行。对应MySQL对象为RowDataDynamic。
- 游标:多次获取,一次多行。对应MySQL对象为RowDataCursor。
MySQL:获取ResultSet
// com.mysql.jdbc.MysqlIO.java
protected ResultSetImpl getResultSet(StatementImpl callingStatement, long columnCount,
int maxRows, int resultSetType, int resultSetConcurrency, boolean streamResults,
String catalog, boolean isBinaryEncoded, Field[] metadataFromCache) throws SQLException {
Buffer packet; // The packet from the server
Field[] fields = null;
// Read in the column information
...
packet = reuseAndReadPacket(this.reusablePacket);
readServerStatusForResultSets(packet);
// 游标模式
if (this.connection.versionMeetsMinimum(5, 0, 2) &&
this.connection.getUseCursorFetch() &&
isBinaryEncoded && callingStatement != null &&
callingStatement.getFetchSize() != 0 &&
callingStatement.getResultSetType() == ResultSet.TYPE_FORWARD_ONLY) {
ServerPreparedStatement prepStmt = (com.mysql.jdbc.ServerPreparedStatement) callingStatement;
boolean usingCursor = true;
...
if (usingCursor) {
RowData rows = new RowDataCursor(this, prepStmt, fields);
ResultSetImpl rs = buildResultSetWithRows(callingStatement, catalog, fields, rows,
resultSetType, resultSetConcurrency, isBinaryEncoded);
if (usingCursor) {
rs.setFetchSize(callingStatement.getFetchSize());
}
return rs;
}
}
RowData rowData = null;
if (!streamResults) { // 一次全部模式
rowData = readSingleRowSet(columnCount, maxRows, resultSetConcurrency,
isBinaryEncoded, (metadataFromCache == null) ? fields : metadataFromCache);
} else { // 流式模式
rowData = new RowDataDynamic(this, (int) columnCount,
(metadataFromCache == null) ? fields : metadataFromCache, isBinaryEncoded);
this.streamingData = rowData;
}
ResultSetImpl rs = buildResultSetWithRows(callingStatement, catalog,
(metadataFromCache == null) ? fields : metadataFromCache,
rowData, resultSetType, resultSetConcurrency, isBinaryEncoded);
return rs;
}
流式
- JDBC代码
String url = "jdbc:mysql://ip:port/db?user=&password";
String sql = "";
Class.forName("com.mysql.jdbc.Driver");
Connection conn = null;
PreparedStatement ps = null;
ResultSet rs = null;
int count = 0;
try {
conn = DriverManager.getConnection(url);
ps = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
ps.setFetchSize(Integer.MIN_VALUE);
rs = ps.executeQuery();
while (rs.next()) {
System.out.println(++count);
}
} finally {
if (rs != null) rs.close();
if (ps != null) ps.close();
if (conn != null) conn.close();
}
必须项:
- resultSetType: TYPE_FORWARD_ONLY
- resultSetConcurrency: CONCUR_READ_ONLY
- fetchSize: Integer.MIN_VALUE
- MySQL代码
- StatementImpl.java
// com.mysql.jdbc.StatementImpl.java
// We only stream result sets when they are forward-only, read-only, and the
// fetch size has been set to Integer.MIN_VALUE
protected boolean createStreamingResultSet() {
try {
synchronized (checkClosed().getConnectionMutex()) {
return ((this.resultSetType == java.sql.ResultSet.TYPE_FORWARD_ONLY) &&
(this.resultSetConcurrency == java.sql.ResultSet.CONCUR_READ_ONLY) &&
(this.fetchSize == Integer.MIN_VALUE));
}
} catch (SQLException e) {
// we can't break the interface, having this be no-op in case of error is ok
return false;
}
}
- RowDataDynamic.java
// com.mysql.jdbc.RowDataDynamic.java
// 返回下一行数据
private void nextRecord() throws SQLException {
try {
if (!this.noMoreRows) {
this.nextRow = this.isInterrupted ? null :
this.io.nextRow(this.metadata, this.columnCount, this.isBinaryEncoded,
java.sql.ResultSet.CONCUR_READ_ONLY, true,
this.useBufferRowExplicit, true, null);
if (this.nextRow == null) {
this.noMoreRows = true;
this.isAfterEnd = true;
this.moreResultsExisted = this.io.tackOnMoreStreamingResults(this.owner);
if (this.index == -1) {
this.wasEmpty = true;
}
}
} else {
this.isAfterEnd = true;
}
} catch (SQLException sqlEx) {
...
} catch (Exception ex) {
...
}
}
- MysqlIO.java
final ResultSetRow nextRow(Field[] fields, int columnCount, boolean isBinaryEncoded,
int resultSetConcurrency, boolean useBufferRowIfPossible,
boolean useBufferRowExplicit, boolean canReuseRowPacketForBufferRow,
Buffer existingRowPacket) throws SQLException {
// 一次全部模式路径
if (this.useDirectRowUnpack && existingRowPacket == null &&
!isBinaryEncoded && !useBufferRowIfPossible && !useBufferRowExplicit) {
return nextRowFast(fields, columnCount, isBinaryEncoded, resultSetConcurrency,
useBufferRowIfPossible, useBufferRowExplicit, canReuseRowPacketForBufferRow);
}
Buffer rowPacket = null;
if (existingRowPacket == null) {
rowPacket = checkErrorPacket();
if (!useBufferRowExplicit && useBufferRowIfPossible) {
if (rowPacket.getBufLength() > this.useBufferRowSizeThreshold) {
useBufferRowExplicit = true;
}
}
} else {
// We attempted to do nextRowFast(), but the packet was a multipacket, so we couldn't unpack it directly
rowPacket = existingRowPacket;
checkErrorPacket(existingRowPacket);
}
if (!isBinaryEncoded) {
//
// Didn't read an error, so re-position to beginning of packet in order to read result set data
//
rowPacket.setPosition(rowPacket.getPosition() - 1);
if (!rowPacket.isLastDataPacket()) {
if (resultSetConcurrency == ResultSet.CONCUR_UPDATABLE || (!useBufferRowIfPossible && !useBufferRowExplicit)) {
byte[][] rowData = new byte[columnCount][];
for (int i = 0; i < columnCount; i++) {
rowData[i] = rowPacket.readLenByteArray(0);
}
return new ByteArrayRow(rowData, getExceptionInterceptor());
}
if (!canReuseRowPacketForBufferRow) {
this.reusablePacket = new Buffer(rowPacket.getBufLength());
}
return new BufferRow(rowPacket, fields, false, getExceptionInterceptor());
}
readServerStatusForResultSets(rowPacket);
return null;
}
...
}
游标
- JDBC代码
String url = "jdbc:mysql://ip:port/db?user=&password&useCursorFetch=true";
String sql = "";
Class.forName("com.mysql.jdbc.Driver");
Connection conn = null;
PreparedStatement ps = null;
ResultSet rs = null;
int count = 0;
try {
conn = DriverManager.getConnection(url);
ps = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
ps.setFetchSize(50);
rs = ps.executeQuery();
while (rs.next()) {
System.out.println(++count + ": " + rs.getLong(1) + ", " + rs.getLong(2));
}
} finally {
if (rs != null) rs.close();
if (ps != null) ps.close();
if (conn != null) conn.close();
}
必须项:
- connection:useCursorFetch=true
- resultSetType: TYPE_FORWARD_ONLY
- fetchSize: 大于0,若为Integer.MIN_VALUE则fetchSize转化为1,即流式模式
- MySQL代码
- RowDataCursor.java
// com.mysql.jdbc.RowDataCursor.java
private void fetchMoreRows() throws SQLException {
if (this.lastRowFetched) {
this.fetchedRows = new ArrayList<ResultSetRow>(0);
return;
}
synchronized (this.owner.connection.getConnectionMutex()) {
boolean oldFirstFetchCompleted = this.firstFetchCompleted;
if (!this.firstFetchCompleted) {
this.firstFetchCompleted = true;
}
int numRowsToFetch = this.owner.getFetchSize();
if (numRowsToFetch == 0) {
numRowsToFetch = this.prepStmt.getFetchSize();
}
if (numRowsToFetch == Integer.MIN_VALUE) {
// Handle the case where the user used 'old' streaming result sets
numRowsToFetch = 1;
}
// fetchRowsViaCursor方法读取数据
this.fetchedRows = this.mysql.fetchRowsViaCursor(this.fetchedRows,
this.statementIdOnServer, this.metadata,
numRowsToFetch, this.useBufferRowExplicit);
this.currentPositionInFetchedRows = BEFORE_START_OF_ROWS;
if ((this.mysql.getServerStatus() & SERVER_STATUS_LAST_ROW_SENT) != 0) {
this.lastRowFetched = true;
if (!oldFirstFetchCompleted && this.fetchedRows.size() == 0) {
this.wasEmpty = true;
}
}
}
}
- MysqlIO
// com.mysql.jdbc.MysqlIO.java
protected List<ResultSetRow> fetchRowsViaCursor(List<ResultSetRow> fetchedRows,
long statementId, Field[] columnTypes, int fetchSize,
boolean useBufferRowExplicit) throws SQLException {
if (fetchedRows == null) {
fetchedRows = new ArrayList<ResultSetRow>(fetchSize);
} else {
fetchedRows.clear();
}
// 通信获取fetchSize行数据
this.sharedSendPacket.clear();
this.sharedSendPacket.writeByte((byte) MysqlDefs.COM_FETCH);
this.sharedSendPacket.writeLong(statementId);
this.sharedSendPacket.writeLong(fetchSize);
sendCommand(MysqlDefs.COM_FETCH, null, this.sharedSendPacket, true, null, 0);
// 读取数据
ResultSetRow row = null;
while ((row = nextRow(columnTypes, columnTypes.length, true,
ResultSet.CONCUR_READ_ONLY, false, useBufferRowExplicit, false, null)) != null) {
fetchedRows.add(row);
}
return fetchedRows;
}
网友评论