mysql jdbc驱动源码分析(Statement的executeQuery 和executeUpdate方法)

时间:2022-09-19 11:56:22

在前面的章节中我们获取了Statement对象,下面我们来看看Statement的executeQuery() 和executeUpdate() 方法来执行相关操作。

首先来看看StatementImpl对象的executeQuery() 方法,源码如下:

 /**
* Execute a SQL statement that returns a single ResultSet
*
* @param sql
* typically a static SQL SELECT statement
*
* @return a ResulSet that contains the data produced by the query
*
* @exception SQLException
* if a database access error occurs
*/
// 查询方法,执行给定的sql获取返回结果集
public java.sql.ResultSet executeQuery(String sql) throws SQLException {
synchronized (checkClosed().getConnectionMutex()) {
// mysql 服务的链接
MySQLConnection locallyScopedConn = this.connection;

this.retrieveGeneratedKeys = false;

resetCancelledState();
// 检测sql是不是null,或者sql.length=0,如果为null 或为0 则抛出异常
checkNullOrEmptyQuery(sql);
// 设置超时
setupStreamingTimeout(locallyScopedConn);

if (this.doEscapeProcessing) {
// 开始执行,执行sql语句,获取结果
Object escapedSqlResult = EscapeProcessor.escapeSQL(sql, locallyScopedConn.serverSupportsConvertFn(), this.connection);

if (escapedSqlResult instanceof String) {
sql = (String) escapedSqlResult;
} else {
sql = ((EscapeProcessorResult) escapedSqlResult).escapedSql;
}
}

char firstStatementChar = StringUtils.firstAlphaCharUc(sql, findStartOfStatement(sql));

if (sql.charAt(0) == '/') {
if (sql.startsWith(PING_MARKER)) {
doPingInstead();

return this.results;
}
}

checkForDml(sql, firstStatementChar);

implicitlyCloseAllOpenResults();

CachedResultSetMetaData cachedMetaData = null;

if (useServerFetch()) {
this.results = createResultSetUsingServerFetch(sql);

return this.results;
}

CancelTask timeoutTask = null;

String oldCatalog = null;

try {
if (locallyScopedConn.getEnableQueryTimeouts() && this.timeoutInMillis != 0 && locallyScopedConn.versionMeetsMinimum(5, 0, 0)) {
timeoutTask = new CancelTask(this);
locallyScopedConn.getCancelTimer().schedule(timeoutTask, this.timeoutInMillis);
}

if (!locallyScopedConn.getCatalog().equals(this.currentCatalog)) {
oldCatalog = locallyScopedConn.getCatalog();
locallyScopedConn.setCatalog(this.currentCatalog);
}

//
// Check if we have cached metadata for this query...
//
// 检测这个查询语句有没有缓存,如果有则直接取缓存
Field[] cachedFields = null;

if (locallyScopedConn.getCacheResultSetMetadata()) {
cachedMetaData = locallyScopedConn.getCachedMetaData(sql);

if (cachedMetaData != null) {
cachedFields = cachedMetaData.fields;
}
}

locallyScopedConn.setSessionMaxRows(this.maxRows);

statementBegins();
// 调用 MySQLConnection的方法execSQL() 方法来执行给定的sql语句,返回结构集
// 我们发现MySQLConnection是一个接口,而他的具体实现是ConnectionImpl这个类
this.results = locallyScopedConn.execSQL(this, sql, this.maxRows, null, this.resultSetType, this.resultSetConcurrency,
createStreamingResultSet(), this.currentCatalog, cachedFields);

if (timeoutTask != null) {
if (timeoutTask.caughtWhileCancelling != null) {
throw timeoutTask.caughtWhileCancelling;
}

timeoutTask.cancel();

locallyScopedConn.getCancelTimer().purge();

timeoutTask = null;
}

synchronized (this.cancelTimeoutMutex) {
if (this.wasCancelled) {
SQLException cause = null;

if (this.wasCancelledByTimeout) {
cause = new MySQLTimeoutException();
} else {
cause = new MySQLStatementCancelledException();
}

resetCancelledState();

throw cause;
}
}
} finally {
this.statementExecuting.set(false);

if (timeoutTask != null) {
timeoutTask.cancel();

locallyScopedConn.getCancelTimer().purge();
}

if (oldCatalog != null) {
locallyScopedConn.setCatalog(oldCatalog);
}
}

this.lastInsertId = this.results.getUpdateID();

if (cachedMetaData != null) {
locallyScopedConn.initializeResultsMetadataFromCache(sql, cachedMetaData, this.results);
} else {
if (this.connection.getCacheResultSetMetadata()) {
locallyScopedConn.initializeResultsMetadataFromCache(sql, null /* will be created */, this.results);
}
}

return this.results;
}
}

在上面我们看到了具体的实现是在ConnectionImpl这个类中。下面这个这个类中的execSQL()方法,源码如下:

  /**
* Send a query to the server. Returns one of the ResultSet objects. This is
* synchronized, so Statement's queries will be serialized.
*
* @param callingStatement
* @param sql
* the SQL statement to be executed
* @param maxRows
* @param packet
* @param resultSetType
* @param resultSetConcurrency
* @param streamResults
* @param queryIsSelectOnly
* @param catalog
* @param unpackFields
* @return a ResultSet holding the results
* @exception SQLException
* if a database error occurs
*/

// ResultSet execSQL(Statement callingStatement, String sql,
// int maxRowsToRetreive, String catalog) throws SQLException {
// return execSQL(callingStatement, sql, maxRowsToRetreive, null,
// java.sql.ResultSet.TYPE_FORWARD_ONLY,
// DEFAULT_RESULT_SET_CONCURRENCY, catalog);
// }
// ResultSet execSQL(Statement callingStatement, String sql, int maxRows,
// int resultSetType, int resultSetConcurrency, boolean streamResults,
// boolean queryIsSelectOnly, String catalog, boolean unpackFields) throws
// SQLException {
// return execSQL(callingStatement, sql, maxRows, null, resultSetType,
// resultSetConcurrency, streamResults, queryIsSelectOnly, catalog,
// unpackFields);
// }
// 查询方法开始
// 方法参数 statementImp实例,查询sql语句,查询最大行数,buffer缓存区,result结果的一些设置,result结果并发设置,结果流,日志,缓存属性
public ResultSetInternalMethods execSQL(StatementImpl callingStatement, String sql, int maxRows, Buffer packet, int resultSetType,
int resultSetConcurrency, boolean streamResults, String catalog, Field[] cachedMetadata) throws SQLException {
return execSQL(callingStatement, sql, maxRows, packet, resultSetType, resultSetConcurrency, streamResults, catalog, cachedMetadata, false);
}

通过注释和参数我们也能大体了解到这个方法所实现的功能。就是向服务发送查询语句,而获取一个ResultSet object

而通过上面的方法我们看到,具体的实现是调用了下面的方法,即方法重载,源码如下:

 //方法重载
public ResultSetInternalMethods execSQL(StatementImpl callingStatement, String sql, int maxRows, Buffer packet, int resultSetType,
int resultSetConcurrency, boolean streamResults, String catalog, Field[] cachedMetadata, boolean isBatch) throws SQLException {
// 同步方法
synchronized (getConnectionMutex()) {
//
// Fall-back if the master is back online if we've issued queriesBeforeRetryMaster queries since we failed over
//
//查询开始时间
long queryStartTime = 0;
// 查询末尾位置
int endOfQueryPacketPosition = 0;
// 这个那边第一次调用的时候传递就是null
if (packet != null) {
//查询结束数据包位置
endOfQueryPacketPosition = packet.getPosition();
}

if (getGatherPerformanceMetrics()) {
// 设置时间为当前系统时间
queryStartTime = System.currentTimeMillis();
}

this.lastQueryFinishedTime = 0; // we're busy!

if ((getHighAvailability()) && (this.autoCommit || getAutoReconnectForPools()) && this.needsPing && !isBatch) {
try {
pingInternal(false, 0);

this.needsPing = false;
} catch (Exception Ex) {
createNewIO(true);
}
}

try {
// packet 就是查询结果数据包
if (packet == null) {
String encoding = null;
// 获取使用的编码
if (getUseUnicode()) {
// 设置编码方法
encoding = getEncoding();
}
// 直接执行查询
return this.io.sqlQueryDirect(callingStatement, sql, encoding, null, maxRows, resultSetType, resultSetConcurrency, streamResults, catalog,
cachedMetadata);
}
// 从缓存中获取数据
return this.io.sqlQueryDirect(callingStatement, null, null, packet, maxRows, resultSetType, resultSetConcurrency, streamResults, catalog,
cachedMetadata);
} catch (java.sql.SQLException sqlE) {
// don't clobber SQL exceptions

if (getDumpQueriesOnException()) {
String extractedSql = extractSqlFromPacket(sql, packet, endOfQueryPacketPosition);
StringBuilder messageBuf = new StringBuilder(extractedSql.length() + 32);
messageBuf.append("\n\nQuery being executed when exception was thrown:\n");
messageBuf.append(extractedSql);
messageBuf.append("\n\n");

sqlE = appendMessageToException(sqlE, messageBuf.toString(), getExceptionInterceptor());
}

if ((getHighAvailability())) {
this.needsPing = true;
} else {
String sqlState = sqlE.getSQLState();

if ((sqlState != null) && sqlState.equals(SQLError.SQL_STATE_COMMUNICATION_LINK_FAILURE)) {
cleanup(sqlE);
}
}

throw sqlE;
} catch (Exception ex) {
if (getHighAvailability()) {
this.needsPing = true;
} else if (ex instanceof IOException) {
cleanup(ex);
}

SQLException sqlEx = SQLError.createSQLException(Messages.getString("Connection.UnexpectedException"), SQLError.SQL_STATE_GENERAL_ERROR,
getExceptionInterceptor());
sqlEx.initCause(ex);

throw sqlEx;
} finally {
if (getMaintainTimeStats()) {
this.lastQueryFinishedTime = System.currentTimeMillis();
}

if (getGatherPerformanceMetrics()) {
long queryTime = System.currentTimeMillis() - queryStartTime;

registerQueryExecutionTime(queryTime);
}
}
}
}

在上面的方法中我们看到了其调用了 this.io.sqlQueryDirect() 方法进行查询,而这个io对象就是ConnectionImpl 类的一个变量,类型是MysqlIO类型具体代码如下:

    /** The I/O abstraction interface (network conn to MySQL server */
private transient MysqlIO io = null;
而从中我们看到具体的查询方法是在MysqlIO 这个类中实现的。调用方法的源码如下:

/**
* Send a query stored in a packet directly to the server.
* 将一个查询存储在一个数据包中,直接发送到服务器。
* @param callingStatement 调用查询语句
* @param resultSetConcurrency ResultSet结果并发参数设置
* @param characterEncoding 查询字符编码
* @param queryPacket 查询报文
* @param maxRows 最大行数
* @param conn 和server的链接
* @param resultSetType ResultSet结果集类型
* @param resultSetConcurrency ResultSet结果集并发设置
* @param streamResults 流结果集
* @param catalog
* @param unpackFieldInfo
* should we read MYSQL_FIELD info (if available)?
*
* @throws Exception
*/
// 这是一个finall 方法我们不行从写也不能重载
final ResultSetInternalMethods sqlQueryDirect(StatementImpl callingStatement, String query, String characterEncoding, Buffer queryPacket, int maxRows,
int resultSetType, int resultSetConcurrency, boolean streamResults, String catalog, Field[] cachedMetadata) throws Exception {
this.statementExecutionDepth++;

try {
if (this.statementInterceptors != null) {
ResultSetInternalMethods interceptedResults = invokeStatementInterceptorsPre(query, callingStatement, false);

if (interceptedResults != null) {
return interceptedResults;
}
}

long queryStartTime = 0;
long queryEndTime = 0;

String statementComment = this.connection.getStatementComment();

if (this.connection.getIncludeThreadNamesAsStatementComment()) {
statementComment = (statementComment != null ? statementComment + ", " : "") + "java thread: " + Thread.currentThread().getName();
}

if (query != null) {
// We don't know exactly how many bytes we're going to get from the query. Since we're dealing with Unicode, the max is 2, so pad it
// (2 * query) + space for headers
int packLength = HEADER_LENGTH + 1 + (query.length() * 3) + 2;

byte[] commentAsBytes = null;

if (statementComment != null) {
commentAsBytes = StringUtils.getBytes(statementComment, null, characterEncoding, this.connection.getServerCharset(),
this.connection.parserKnowsUnicode(), getExceptionInterceptor());

packLength += commentAsBytes.length;
packLength += 6; // for /*[space] [space]*/
}

if (this.sendPacket == null) {
this.sendPacket = new Buffer(packLength);
} else {
this.sendPacket.clear();
}

this.sendPacket.writeByte((byte) MysqlDefs.QUERY);

if (commentAsBytes != null) {
this.sendPacket.writeBytesNoNull(Constants.SLASH_STAR_SPACE_AS_BYTES);
this.sendPacket.writeBytesNoNull(commentAsBytes);
this.sendPacket.writeBytesNoNull(Constants.SPACE_STAR_SLASH_SPACE_AS_BYTES);
}

if (characterEncoding != null) {
if (this.platformDbCharsetMatches) {
this.sendPacket.writeStringNoNull(query, characterEncoding, this.connection.getServerCharset(), this.connection.parserKnowsUnicode(),
this.connection);
} else {
if (StringUtils.startsWithIgnoreCaseAndWs(query, "LOAD DATA")) {
this.sendPacket.writeBytesNoNull(StringUtils.getBytes(query));
} else {
this.sendPacket.writeStringNoNull(query, characterEncoding, this.connection.getServerCharset(),
this.connection.parserKnowsUnicode(), this.connection);
}
}
} else {
this.sendPacket.writeStringNoNull(query);
}

queryPacket = this.sendPacket;
}

byte[] queryBuf = null;
int oldPacketPosition = 0;

if (this.needToGrabQueryFromPacket) {
queryBuf = queryPacket.getByteBuffer();

// save the packet position
// 获取查询报文的位置,并保存。
oldPacketPosition = queryPacket.getPosition();

queryStartTime = getCurrentTimeNanosOrMillis();
}

if (this.autoGenerateTestcaseScript) {
String testcaseQuery = null;

if (query != null) {
if (statementComment != null) {
testcaseQuery = "/* " + statementComment + " */ " + query;
} else {
testcaseQuery = query;
}
} else {
testcaseQuery = StringUtils.toString(queryBuf, 5, (oldPacketPosition - 5));
}

StringBuilder debugBuf = new StringBuilder(testcaseQuery.length() + 32);
this.connection.generateConnectionCommentBlock(debugBuf);
debugBuf.append(testcaseQuery);
debugBuf.append(';');
this.connection.dumpTestcaseQuery(debugBuf.toString());
}

// Send query command and sql query string
// 发送查询命令和sql语句,获取查询报文
Buffer resultPacket = sendCommand(MysqlDefs.QUERY, null, queryPacket, false, null, 0);

long fetchBeginTime = 0;
long fetchEndTime = 0;

String profileQueryToLog = null;

boolean queryWasSlow = false;

if (this.profileSql || this.logSlowQueries) {
queryEndTime = getCurrentTimeNanosOrMillis();

boolean shouldExtractQuery = false;

if (this.profileSql) {
shouldExtractQuery = true;
} else if (this.logSlowQueries) {
long queryTime = queryEndTime - queryStartTime;

boolean logSlow = false;

if (!this.useAutoSlowLog) {
logSlow = queryTime > this.connection.getSlowQueryThresholdMillis();
} else {
logSlow = this.connection.isAbonormallyLongQuery(queryTime);

this.connection.reportQueryTime(queryTime);
}

if (logSlow) {
shouldExtractQuery = true;
queryWasSlow = true;
}
}

if (shouldExtractQuery) {
// Extract the actual query from the network packet
boolean truncated = false;

int extractPosition = oldPacketPosition;

if (oldPacketPosition > this.connection.getMaxQuerySizeToLog()) {
extractPosition = this.connection.getMaxQuerySizeToLog() + 5;
truncated = true;
}

profileQueryToLog = StringUtils.toString(queryBuf, 5, (extractPosition - 5));

if (truncated) {
profileQueryToLog += Messages.getString("MysqlIO.25");
}
}

fetchBeginTime = queryEndTime;
}
// 查询的结果集
ResultSetInternalMethods rs = readAllResults(callingStatement, maxRows, resultSetType, resultSetConcurrency, streamResults, catalog, resultPacket,
false, -1L, cachedMetadata);

if (queryWasSlow && !this.serverQueryWasSlow /* don't log slow queries twice */) {
StringBuilder mesgBuf = new StringBuilder(48 + profileQueryToLog.length());

mesgBuf.append(Messages.getString(
"MysqlIO.SlowQuery",
new Object[] { String.valueOf(this.useAutoSlowLog ? " 95% of all queries " : this.slowQueryThreshold), this.queryTimingUnits,
Long.valueOf(queryEndTime - queryStartTime) }));
mesgBuf.append(profileQueryToLog);

ProfilerEventHandler eventSink = ProfilerEventHandlerFactory.getInstance(this.connection);

eventSink.consumeEvent(new ProfilerEvent(ProfilerEvent.TYPE_SLOW_QUERY, "", catalog, this.connection.getId(),
(callingStatement != null) ? callingStatement.getId() : 999, ((ResultSetImpl) rs).resultId, System.currentTimeMillis(),
(int) (queryEndTime - queryStartTime), this.queryTimingUnits, null, LogUtils.findCallingClassAndMethod(new Throwable()), mesgBuf
.toString()));

if (this.connection.getExplainSlowQueries()) {
if (oldPacketPosition < MAX_QUERY_SIZE_TO_EXPLAIN) {
explainSlowQuery(queryPacket.getBytes(5, (oldPacketPosition - 5)), profileQueryToLog);
} else {
this.connection.getLog().logWarn(Messages.getString("MysqlIO.28") + MAX_QUERY_SIZE_TO_EXPLAIN + Messages.getString("MysqlIO.29"));
}
}
}

if (this.logSlowQueries) {

ProfilerEventHandler eventSink = ProfilerEventHandlerFactory.getInstance(this.connection);

if (this.queryBadIndexUsed && this.profileSql) {
eventSink.consumeEvent(new ProfilerEvent(ProfilerEvent.TYPE_SLOW_QUERY, "", catalog, this.connection.getId(),
(callingStatement != null) ? callingStatement.getId() : 999, ((ResultSetImpl) rs).resultId, System.currentTimeMillis(),
(queryEndTime - queryStartTime), this.queryTimingUnits, null, LogUtils.findCallingClassAndMethod(new Throwable()), Messages
.getString("MysqlIO.33") + profileQueryToLog));
}

if (this.queryNoIndexUsed && this.profileSql) {
eventSink.consumeEvent(new ProfilerEvent(ProfilerEvent.TYPE_SLOW_QUERY, "", catalog, this.connection.getId(),
(callingStatement != null) ? callingStatement.getId() : 999, ((ResultSetImpl) rs).resultId, System.currentTimeMillis(),
(queryEndTime - queryStartTime), this.queryTimingUnits, null, LogUtils.findCallingClassAndMethod(new Throwable()), Messages
.getString("MysqlIO.35") + profileQueryToLog));
}

if (this.serverQueryWasSlow && this.profileSql) {
eventSink.consumeEvent(new ProfilerEvent(ProfilerEvent.TYPE_SLOW_QUERY, "", catalog, this.connection.getId(),
(callingStatement != null) ? callingStatement.getId() : 999, ((ResultSetImpl) rs).resultId, System.currentTimeMillis(),
(queryEndTime - queryStartTime), this.queryTimingUnits, null, LogUtils.findCallingClassAndMethod(new Throwable()), Messages
.getString("MysqlIO.ServerSlowQuery") + profileQueryToLog));
}
}

if (this.profileSql) {
fetchEndTime = getCurrentTimeNanosOrMillis();

ProfilerEventHandler eventSink = ProfilerEventHandlerFactory.getInstance(this.connection);

eventSink.consumeEvent(new ProfilerEvent(ProfilerEvent.TYPE_QUERY, "", catalog, this.connection.getId(),
(callingStatement != null) ? callingStatement.getId() : 999, ((ResultSetImpl) rs).resultId, System.currentTimeMillis(),
(queryEndTime - queryStartTime), this.queryTimingUnits, null, LogUtils.findCallingClassAndMethod(new Throwable()), profileQueryToLog));

eventSink.consumeEvent(new ProfilerEvent(ProfilerEvent.TYPE_FETCH, "", catalog, this.connection.getId(),
(callingStatement != null) ? callingStatement.getId() : 999, ((ResultSetImpl) rs).resultId, System.currentTimeMillis(),
(fetchEndTime - fetchBeginTime), this.queryTimingUnits, null, LogUtils.findCallingClassAndMethod(new Throwable()), null));
}

if (this.hadWarnings) {
scanForAndThrowDataTruncation();
}

if (this.statementInterceptors != null) {
ResultSetInternalMethods interceptedResults = invokeStatementInterceptorsPost(query, callingStatement, rs, false, null);

if (interceptedResults != null) {
rs = interceptedResults;
}
}

return rs;
} catch (SQLException sqlEx) {
if (this.statementInterceptors != null) {
invokeStatementInterceptorsPost(query, callingStatement, null, false, sqlEx); // we don't do anything with the result set in this case
}

if (callingStatement != null) {
synchronized (callingStatement.cancelTimeoutMutex) {
if (callingStatement.wasCancelled) {
SQLException cause = null;

if (callingStatement.wasCancelledByTimeout) {
cause = new MySQLTimeoutException();
} else {
cause = new MySQLStatementCancelledException();
}

callingStatement.resetCancelledState();

throw cause;
}
}
}

throw sqlEx;
} finally {
this.statementExecutionDepth--;
}
}


在方法中我们看到了,调用readAllResults() 这个方法来获取ResultSet 对象,readAllResults() 方法源码如下:

// 获取结果解
ResultSetImpl readAllResults(StatementImpl callingStatement, int maxRows, int resultSetType, int resultSetConcurrency, boolean streamResults,
String catalog, Buffer resultPacket, boolean isBinaryEncoded, long preSentColumnCount, Field[] metadataFromCache) throws SQLException {
resultPacket.setPosition(resultPacket.getPosition() - 1);
// 调用方法获取ResultSet对象
ResultSetImpl topLevelResultSet = readResultsForQueryOrUpdate(callingStatement, maxRows, resultSetType, resultSetConcurrency, streamResults, catalog,
resultPacket, isBinaryEncoded, preSentColumnCount, metadataFromCache);

ResultSetImpl currentResultSet = topLevelResultSet;

boolean checkForMoreResults = ((this.clientParam & CLIENT_MULTI_RESULTS) != 0);

boolean serverHasMoreResults = (this.serverStatus & SERVER_MORE_RESULTS_EXISTS) != 0;

//
// TODO: We need to support streaming of multiple result sets
//
if (serverHasMoreResults && streamResults) {
//clearInputStream();
//
//throw SQLError.createSQLException(Messages.getString("MysqlIO.23"),
//SQLError.SQL_STATE_DRIVER_NOT_CAPABLE);
if (topLevelResultSet.getUpdateCount() != -1) {
tackOnMoreStreamingResults(topLevelResultSet);
}

reclaimLargeReusablePacket();

return topLevelResultSet;
}

boolean moreRowSetsExist = checkForMoreResults & serverHasMoreResults;

while (moreRowSetsExist) {
Buffer fieldPacket = checkErrorPacket();
fieldPacket.setPosition(0);

ResultSetImpl newResultSet = readResultsForQueryOrUpdate(callingStatement, maxRows, resultSetType, resultSetConcurrency, streamResults, catalog,
fieldPacket, isBinaryEncoded, preSentColumnCount, metadataFromCache);

currentResultSet.setNextResultSet(newResultSet);

currentResultSet = newResultSet;

moreRowSetsExist = (this.serverStatus & SERVER_MORE_RESULTS_EXISTS) != 0;
}

if (!streamResults) {
clearInputStream();
}

reclaimLargeReusablePacket();

return topLevelResultSet;
}

在这方法中我们看到了,是调用了readResultsForQueryOrUpdate() 方法进行查询获取ResultSet 对象的。下面我们看看这个方法源码:


  /**
* Reads one result set off of the wire, if the result is actually an
* update count, creates an update-count only result set.
* 读取一个结果,如果结果是实际的
* 更新计数,创建一个更新计数结果集。
* @param callingStatement
* @param maxRows
* the maximum rows to return in the result set.
* 结果集合中返回的最大行
* @param resultSetType
* scrollability
* @param resultSetConcurrency
* updatability
* @param streamResults
* should the driver leave the results on the wire,
* and read them only when needed?
* 驱动应该把结果放在电报上,只有在需要时才看出来吗?
* @param catalog
* the catalog in use
* @param resultPacket
* the first packet of information in the result set
* 结果集合中的第一个数据包
* @param isBinaryEncoded
* is this result set from a prepared statement?
* @param preSentColumnCount
* do we already know the number of columns?
* 我们已经知道的列数吗?
* @param unpackFieldInfo
* should we unpack the field information?
*
* @return a result set that either represents the rows, or an update count
*
* @throws SQLException
* if an error occurs while reading the rows
*/
// 这里是执行sql查询或update的具体方法。而各个参数具体意义通过注释也能了解一二

protected final ResultSetImpl readResultsForQueryOrUpdate(StatementImpl callingStatement, int maxRows, int resultSetType, int resultSetConcurrency,
boolean streamResults, String catalog, Buffer resultPacket, boolean isBinaryEncoded, long preSentColumnCount, Field[] metadataFromCache)
throws SQLException {
// 从查询的结果的数据包中获取总行数
long columnCount = resultPacket.readFieldLength();
// 如果是0 则执行updates方法来执行
if (columnCount == 0) {
// 调用方法执行保存获取ResultSet方法
return buildResultSetWithUpdates(callingStatement, resultPacket);
} else if (columnCount == Buffer.NULL_LENGTH) {
String charEncoding = null;

if (this.connection.getUseUnicode()) {
charEncoding = this.connection.getEncoding();
}

String fileName = null;

if (this.platformDbCharsetMatches) {
fileName = ((charEncoding != null) ? resultPacket.readString(charEncoding, getExceptionInterceptor()) : resultPacket.readString());
} else {
fileName = resultPacket.readString();
}
// 向服务器发送信息
return sendFileToServer(callingStatement, fileName);
} else {
// 调用方法获取resultset
com.mysql.jdbc.ResultSetImpl results = getResultSet(callingStatement, columnCount, maxRows, resultSetType, resultSetConcurrency, streamResults,
catalog, isBinaryEncoded, metadataFromCache);

return results;
}
}


这个方法中我们看到了 调用getResultSet() 方法获取ResultSet实例,源码如下:

  /**
* Build a result set. Delegates to buildResultSetWithRows() to build a
* JDBC-version-specific ResultSet, given rows as byte data, and field
* information.
*
* @param callingStatement
* @param columnCount
* the number of columns in the result set
* @param maxRows
* the maximum number of rows to read (-1 means all rows)
* @param resultSetType
* (TYPE_FORWARD_ONLY, TYPE_SCROLL_????)
* @param resultSetConcurrency
* the type of result set (CONCUR_UPDATABLE or
* READ_ONLY)
* @param streamResults
* should the result set be read all at once, or
* streamed?
* @param catalog
* the database name in use when the result set was created
* @param isBinaryEncoded
* is this result set in native encoding?
* @param unpackFieldInfo
* should we read MYSQL_FIELD info (if available)?
*
* @return a result set
*
* @throws SQLException
* if a database access error occurs
*/
protected ResultSetImpl getResultSet(StatementImpl callingStatement, long columnCount, int maxRows, int resultSetType, int resultSetConcurrency,
boolean streamResults, String catalog, boolean isBinaryEncoded, Field[] metadataFromCache) throws SQLException {
Buffer packet; // The packet from the server
Field[] fields = null;

// Read in the column information

if (metadataFromCache == null /* we want the metadata from the server */) {
fields = new Field[(int) columnCount];

for (int i = 0; i < columnCount; i++) {
Buffer fieldPacket = null;

fieldPacket = readPacket();
fields[i] = unpackField(fieldPacket, false);
}
} else {
for (int i = 0; i < columnCount; i++) {
skipPacket();
}
}

packet = reuseAndReadPacket(this.reusablePacket);

readServerStatusForResultSets(packet);

//
// Handle cursor-based fetch first
//

if (this.connection.versionMeetsMinimum(5, 0, 2) && this.connection.getUseCursorFetch() && isBinaryEncoded && callingStatement != null
&& callingStatement.getFetchSize() != 0 && callingStatement.getResultSetType() == ResultSet.TYPE_FORWARD_ONLY) {
ServerPreparedStatement prepStmt = (com.mysql.jdbc.ServerPreparedStatement) callingStatement;

boolean usingCursor = true;

//
// Server versions 5.0.5 or newer will only open a cursor and set this flag if they can, otherwise they punt and go back to mysql_store_results()
// behavior
//

if (this.connection.versionMeetsMinimum(5, 0, 5)) {
usingCursor = (this.serverStatus & SERVER_STATUS_CURSOR_EXISTS) != 0;
}

if (usingCursor) {
RowData rows = new RowDataCursor(this, prepStmt, fields);

ResultSetImpl rs = buildResultSetWithRows(callingStatement, catalog, fields, rows, resultSetType, resultSetConcurrency, isBinaryEncoded);

if (usingCursor) {
rs.setFetchSize(callingStatement.getFetchSize());
}

return rs;
}
}

RowData rowData = null;

if (!streamResults) {
rowData = readSingleRowSet(columnCount, maxRows, resultSetConcurrency, isBinaryEncoded, (metadataFromCache == null) ? fields : metadataFromCache);
} else {
rowData = new RowDataDynamic(this, (int) columnCount, (metadataFromCache == null) ? fields : metadataFromCache, isBinaryEncoded);
this.streamingData = rowData;
}

ResultSetImpl rs = buildResultSetWithRows(callingStatement, catalog, (metadataFromCache == null) ? fields : metadataFromCache, rowData, resultSetType,
resultSetConcurrency, isBinaryEncoded);

return rs;
}

我们又看到 他是调用buildResultSetWithRows 这个方法来获取 ResultSetImpl 实例的。具体源码如下:

// 获取resultset 类型的对象
private com.mysql.jdbc.ResultSetImpl buildResultSetWithRows(StatementImpl callingStatement, String catalog, com.mysql.jdbc.Field[] fields, RowData rows,
int resultSetType, int resultSetConcurrency, boolean isBinaryEncoded) throws SQLException {
ResultSetImpl rs = null;
// 按照结果集并发的设置来执行获取。
switch (resultSetConcurrency) {
case java.sql.ResultSet.CONCUR_READ_ONLY:
rs = com.mysql.jdbc.ResultSetImpl.getInstance(catalog, fields, rows, this.connection, callingStatement, false);

if (isBinaryEncoded) {
rs.setBinaryEncoded();
}

break;

case java.sql.ResultSet.CONCUR_UPDATABLE:
rs = com.mysql.jdbc.ResultSetImpl.getInstance(catalog, fields, rows, this.connection, callingStatement, true);

break;

default:
return com.mysql.jdbc.ResultSetImpl.getInstance(catalog, fields, rows, this.connection, callingStatement, false);
}

rs.setResultSetType(resultSetType);
rs.setResultSetConcurrency(resultSetConcurrency);

return rs;
}

我们看到这里直接调用到了com.mysql.jdbc.ResultSetImpl.getInstance() 方法来获取实例。具体源码如下:

 protected static ResultSetImpl getInstance(String catalog, Field[] fields, RowData tuples, MySQLConnection conn, StatementImpl creatorStmt,
boolean isUpdatable) throws SQLException {
if (!Util.isJdbc4()) {
if (!isUpdatable) {
return new ResultSetImpl(catalog, fields, tuples, conn, creatorStmt);
}

return new UpdatableResultSet(catalog, fields, tuples, conn, creatorStmt);
}

if (!isUpdatable) {
return (ResultSetImpl) Util.handleNewInstance(JDBC_4_RS_5_ARG_CTOR, new Object[] { catalog, fields, tuples, conn, creatorStmt },
conn.getExceptionInterceptor());
}

return (ResultSetImpl) Util.handleNewInstance(JDBC_4_UPD_RS_5_ARG_CTOR, new Object[] { catalog, fields, tuples, conn, creatorStmt },
conn.getExceptionInterceptor());
}

ResultSetImpl 类的构造函数如下:

 /**
* Creates a new ResultSet object.
*
* @param catalog
* the database in use when we were created
* @param fields
* an array of Field objects (basically, the ResultSet MetaData)
* @param tuples
* actual row data
* @param conn
* the Connection that created us.
* @param creatorStmt
*
* @throws SQLException
* if an error occurs
*/
public ResultSetImpl(String catalog, Field[] fields, RowData tuples, MySQLConnection conn, StatementImpl creatorStmt) throws SQLException {
this.connection = conn;

this.retainOwningStatement = false;

if (this.connection != null) {
this.exceptionInterceptor = this.connection.getExceptionInterceptor();
this.useStrictFloatingPoint = this.connection.getStrictFloatingPoint();
this.connectionId = this.connection.getId();
this.useFastDateParsing = this.connection.getUseFastDateParsing();
this.profileSql = this.connection.getProfileSql();
this.retainOwningStatement = this.connection.getRetainStatementAfterResultSetClose();
this.jdbcCompliantTruncationForReads = this.connection.getJdbcCompliantTruncationForReads();
this.useFastIntParsing = this.connection.getUseFastIntParsing();
this.serverTimeZoneTz = this.connection.getServerTimezoneTZ();
this.padCharsWithSpace = this.connection.getPadCharsWithSpace();
}

this.owningStatement = creatorStmt;

this.catalog = catalog;

this.fields = fields;
this.rowData = tuples;
this.updateCount = this.rowData.size();

if (NonRegisteringDriver.DEBUG) {
System.out.println(Messages.getString("ResultSet.Retrieved__1") + this.updateCount + " rows");
}

this.reallyResult = true;

// Check for no results
if (this.rowData.size() > 0) {
if (this.updateCount == 1) {
if (this.thisRow == null) {
this.rowData.close(); // empty result set
this.updateCount = -1;
}
}
} else {
this.thisRow = null;
}

this.rowData.setOwner(this);

if (this.fields != null) {
initializeWithMetadata();
} // else called by Connection.initializeResultsMetadataFromCache() when cached
this.useLegacyDatetimeCode = this.connection.getUseLegacyDatetimeCode();

this.useColumnNamesInFindColumn = this.connection.getUseColumnNamesInFindColumn();

setRowPositionValidity();
}

OK 到这里我们也知道,其实什么也没看到就是调用了ResultSetImpl的构造函数创建了一个对象。


简单总结:

我们从整个过程中看到,所有的基础都是Connection 即客户端和服务端的链接也就是和服务器Socket的链接,如果socket断了,就不能进行沟通了。而当有了Socket之后我们可以执行查询方法或更新方法来获取结果集对象,但是在获取结果集对象的时候我们要设置结果集的类型,因为设置不同的结果集类型,我们对获得的结果集对象ResultSet对象会有不同的操作即有的可以向结果集前端移动动,有的可以向结果集后端移动等等,这些都要在生成ResultSet 对象前进行设置。


其实看源码的目的是更清楚,明白的了解jdbc的整个过程但是,在看的过程中发现,能力有限,不能静下心来一个一个的看懂,而只能草草的结束,从中了解了大体过程,但是提升不大,希望下次来一次的时候能够有提升,不过学习的时候还是有些收获在会后的一篇中会进行总结。


在写这文章之前搜了一下,其中分析jdbc源码的文章很少,而自己水平有限,所以如果有好的文章或建议还请指教,这里先谢谢!