美文网首页Java 杂谈
窥探JDBC连接MySQL的源码实现原理

窥探JDBC连接MySQL的源码实现原理

作者: 曾泽浩 | 来源:发表于2018-10-10 18:27 被阅读0次

    简介

    MySQL数据库由后台线程以及一个共享内存区组成。共享内存可以被运行着的后台线程所共享。

    数据库实例才是真正用于操作数据库文件的。MySQL数据库实例在操作系统上的表现实际就是一个进程,

    是一个单进程多线程架构的数据库。(来自《Mysql技术内幕InnoDB存储引擎》)

    上面讲到MySQL数据库在操作系统上的表现是进程,那么连接MySQL的本质就是进程间的通信。常用的进程间通信方式有管道,命名管道,TCP/IP套接字,UNIX域套接字。

    TCP/IP套接字是MySQL数据库在任何平台下(指的是不同的操作系统)都可以使用的连接方式,本文也是此对JDBC连接进行详细的讲解。

    使用mysql-connector-java-5.1.34.jar的版本

    import com.mysql.jdbc.Connection;
    import com.mysql.jdbc.Statement;
    
    import java.sql.DriverManager;
    import java.sql.ResultSet;
    import java.sql.SQLException;
    public class SimpleTest {
        private final static String url = "jdbc:mysql://localhost/test";
        private final static String username = "root";
        private final static String password = "123456";
        public static void main(String[] args) {
    
            try {
                Class.forName("com.mysql.jdbc.Driver");
                //建立连接
                Connection connection = (Connection) DriverManager.getConnection(url, username, password);
                System.out.println("连接成功");
                Statement statement = (Statement) connection.createStatement();
                //进行查询
                ResultSet resultSet = statement.executeQuery("select * from user");
                while (resultSet.next()) {
                    String loginname = resultSet.getString("loginname");
                    String password = resultSet.getString("password");
                    System.out.println("loginname = " + loginname + " password = " + password);
                }
            } catch (ClassNotFoundException e) {
                e.printStackTrace();
            } catch (SQLException e2) {
                e2.printStackTrace();
            }
        }
    }
    

    注册驱动

    Class.forName("com.mysql.jdbc.Driver");
    

    很多人可能对这句代码都有疑问,怎么上面这行代码就注册驱动了呢?

    class.forName()这行代码,有两个作用,一个是类的.class文件加载到jvm中,还有一个是执行类中的static块。

    如果对Java中static块什么时候会执行,可阅读https://www.cnblogs.com/jswang/p/7699643.html

    果不其然,Driver类中有一个静态块,会java.sql.DriverManager.registerDriver(new Driver());

    com.mysql.jdbc.Driver.java

    public class Driver extends NonRegisteringDriver implements java.sql.Driver {
        //
        // Register ourselves with the DriverManager
        // static块,执行这行Class.forName("com.mysql.jdbc.Driver");的时候,会调用static块
        static {
            try {
                java.sql.DriverManager.registerDriver(new Driver());
            } catch (SQLException E) {
                throw new RuntimeException("Can't register driver!");
            }
        }
    
        /**
         * Construct a new driver and register it with DriverManager
         * 
         * @throws SQLException
         *             if a database error occurs.
         */
        public Driver() throws SQLException {
            // Required for Class.forName().newInstance()
        }
    }
    

    java.sql.DriverManager.java

    public static synchronized void registerDriver(java.sql.Driver driver)
            throws SQLException {
            
            registerDriver(driver, null);
    }
    

    registerDriver(driver, null); //注册驱动,进入方法继续看

    private final static CopyOnWriteArrayList<DriverInfo> registeredDrivers = new CopyOnWriteArrayList<>();
    public static synchronized void registerDriver(java.sql.Driver driver,
                DriverAction da)
            throws SQLException {
    
            /* Register the driver if it has not already been added to our list */
            if(driver != null) {
                //registeredDrivers是一个List,将驱动信息保存到List中
                registeredDrivers.addIfAbsent(new DriverInfo(driver, da));
            } else {
                // This is for compatibility with the original DriverManager
                throw new NullPointerException();
            }
    
            println("registerDriver: " + driver);
    
        }
    

    通过上面几步,就完成了数据库驱动的注册。

    数据库连接

    Connection connection = (Connection) DriverManager.getConnection(url, username, password);
    

    java.sql.DriverManager.java

    public static Connection getConnection(String url,
        String user, String password) throws SQLException {
        java.util.Properties info = new java.util.Properties();
    
        if (user != null) {
            info.put("user", user);
        }
        if (password != null) {
            info.put("password", password);
        }
        //getConnection()才是真正连接的,所以往下看
        return (getConnection(url, info, Reflection.getCallerClass()));
    }
    
    private static Connection getConnection(
            String url, java.util.Properties info, Class<?> caller) throws SQLException {
            /*
             * When callerCl is null, we should check the application's
             * (which is invoking this class indirectly)
             * classloader, so that the JDBC driver class outside rt.jar
             * can be loaded from here.
             */
            ClassLoader callerCL = caller != null ? caller.getClassLoader() : null;
            synchronized(DriverManager.class) {
                // synchronize loading of the correct classloader.
                if (callerCL == null) {
                    callerCL = Thread.currentThread().getContextClassLoader();
                }
            }
    
            if(url == null) {
                throw new SQLException("The url cannot be null", "08001");
            }
    
            println("DriverManager.getConnection(\"" + url + "\")");
    
            // Walk through the loaded registeredDrivers attempting to make a connection.
            // Remember the first exception that gets raised so we can reraise it.
            SQLException reason = null;
            //重点
            //registeredDrivers是上一步注册的数据库驱动信息
            for(DriverInfo aDriver : registeredDrivers) {
                // If the caller does not have permission to load the driver then
                // skip it.
                if(isDriverAllowed(aDriver.driver, callerCL)) {
                    try {
                        println("    trying " + aDriver.driver.getClass().getName());
                        //重点中的重点,connect()才是真正与数据库服务器建立连接的
                        Connection con = aDriver.driver.connect(url, info);
                        if (con != null) {
                            // Success!
                            println("getConnection returning " + aDriver.driver.getClass().getName());
                            return (con);
                        }
                    } catch (SQLException ex) {
                        if (reason == null) {
                            reason = ex;
                        }
                    }
    
                } else {
                    println("    skipping: " + aDriver.getClass().getName());
                }
    
            }
    
            // if we got here nobody could connect.
            if (reason != null)    {
                println("getConnection failed: " + reason);
                throw reason;
            }
    
            println("getConnection: no suitable driver found for "+ url);
            throw new SQLException("No suitable driver found for "+ url, "08001");
        }
    

    java.sql.Driver.java 是一个接口,原生的Java本身并没有实现,mysql-connector-java-5.1.34.jar包下面有它的实现类,NonRegisteringDriver。

    Connection connect(String url, java.util.Properties info)
            throws SQLException;
    

    com.mysql.jdbc.NonRegisteringDriver.java

    public java.sql.Connection connect(String url, Properties info) throws SQLException {
            if (url != null) {
                if (StringUtils.startsWithIgnoreCase(url, LOADBALANCE_URL_PREFIX)) {
                    return connectLoadBalanced(url, info);
                } else if (StringUtils.startsWithIgnoreCase(url, REPLICATION_URL_PREFIX)) {
                    return connectReplicationConnection(url, info);
                }
            }
    
            Properties props = null;
    
            if ((props = parseURL(url, info)) == null) {
                return null;
            }
    
            if (!"1".equals(props.getProperty(NUM_HOSTS_PROPERTY_KEY))) {
                return connectFailover(url, info);
            }
            //前面都是一些判断之类
            //重点是下面一行代码
            try {
                Connection newConn = com.mysql.jdbc.ConnectionImpl.getInstance(host(props), port(props), props, database(props), url);
    
                return newConn;
            } catch (SQLException sqlEx) {
                // Don't wrap SQLExceptions, throw
                // them un-changed.
                throw sqlEx;
            } catch (Exception ex) {
                SQLException sqlEx = SQLError.createSQLException(
                        Messages.getString("NonRegisteringDriver.17") + ex.toString() + Messages.getString("NonRegisteringDriver.18"),
                        SQLError.SQL_STATE_UNABLE_TO_CONNECT_TO_DATASOURCE, null);
    
                sqlEx.initCause(ex);
    
                throw sqlEx;
            }
        }
    

    com.mysql.jdbc.ConnectionImpl.java

    protected static Connection getInstance(String hostToConnectTo, int portToConnectTo, Properties info, String databaseToConnectTo, String url)
            throws SQLException {
        if (!Util.isJdbc4()) {
           //重点
            return new ConnectionImpl(hostToConnectTo, portToConnectTo, info, databaseToConnectTo, url);
        }
    
        return (Connection) Util.handleNewInstance(JDBC_4_CONNECTION_CTOR, new Object[] { hostToConnectTo, Integer.valueOf(portToConnectTo), info,
                databaseToConnectTo, url }, null);
    }
    
    //执行完这个构造函数,程序就返回Connectin了,但是我们还没有看到是如何建立连接,有点失望了
    public ConnectionImpl(String hostToConnectTo, int portToConnectTo, Properties info, String databaseToConnectTo, String url) throws SQLException {
    
            this.connectionCreationTimeMillis = System.currentTimeMillis();
    
            if (databaseToConnectTo == null) {
                databaseToConnectTo = "";
            }
    
            // Stash away for later, used to clone this connection for Statement.cancel and Statement.setQueryTimeout().
            //
    
            this.origHostToConnectTo = hostToConnectTo;
            this.origPortToConnectTo = portToConnectTo;
            this.origDatabaseToConnectTo = databaseToConnectTo;
    
            try {
                Blob.class.getMethod("truncate", new Class[] { Long.TYPE });
    
                this.isRunningOnJDK13 = false;
            } catch (NoSuchMethodException nsme) {
                this.isRunningOnJDK13 = true;
            }
    
            this.sessionCalendar = new GregorianCalendar();
            this.utcCalendar = new GregorianCalendar();
            this.utcCalendar.setTimeZone(TimeZone.getTimeZone("GMT"));
    
            //
            // Normally, this code would be in initializeDriverProperties, but we need to do this as early as possible, so we can start logging to the 'correct'
            // place as early as possible...this.log points to 'NullLogger' for every connection at startup to avoid NPEs and the overhead of checking for NULL at
            // every logging call.
            //
            // We will reset this to the configured logger during properties initialization.
            //
            this.log = LogFactory.getLogger(getLogger(), LOGGER_INSTANCE_NAME, getExceptionInterceptor());
    
            // We store this per-connection, due to static synchronization issues in Java's built-in TimeZone class...
            this.defaultTimeZone = Util.getDefaultTimeZone();
    
            this.isClientTzUTC = "GMT".equalsIgnoreCase(this.defaultTimeZone.getID());
    
            this.openStatements = new HashMap<Statement, Statement>();
    
            if (NonRegisteringDriver.isHostPropertiesList(hostToConnectTo)) {
                Properties hostSpecificProps = NonRegisteringDriver.expandHostKeyValues(hostToConnectTo);
    
                Enumeration<?> propertyNames = hostSpecificProps.propertyNames();
    
                while (propertyNames.hasMoreElements()) {
                    String propertyName = propertyNames.nextElement().toString();
                    String propertyValue = hostSpecificProps.getProperty(propertyName);
    
                    info.setProperty(propertyName, propertyValue);
                }
            } else {
    
                if (hostToConnectTo == null) {
                    this.host = "localhost";
                    this.hostPortPair = this.host + ":" + portToConnectTo;
                } else {
                    this.host = hostToConnectTo;
    
                    if (hostToConnectTo.indexOf(":") == -1) {
                        this.hostPortPair = this.host + ":" + portToConnectTo;
                    } else {
                        this.hostPortPair = this.host;
                    }
                }
            }
            //端口
            this.port = portToConnectTo;
            //数据库名称
            this.database = databaseToConnectTo;
            //URL
            this.myURL = url;
            this.user = info.getProperty(NonRegisteringDriver.USER_PROPERTY_KEY);
            this.password = info.getProperty(NonRegisteringDriver.PASSWORD_PROPERTY_KEY);
            
            //用户名和密码
            if ((this.user == null) || this.user.equals("")) {
                this.user = "";
            }
    
            if (this.password == null) {
                this.password = "";
            }
    
            this.props = info;
    
            initializeDriverProperties(info);
    
            if (getUseUsageAdvisor()) {
                this.pointOfOrigin = LogUtils.findCallingClassAndMethod(new Throwable());
            } else {
                this.pointOfOrigin = "";
            }
    
            try {
                this.dbmd = getMetaData(false, false);
                initializeSafeStatementInterceptors();
                //重点
                createNewIO(false);
                unSafeStatementInterceptors();
            } catch (SQLException ex) {
                cleanup(ex);
    
                // don't clobber SQL exceptions
                throw ex;
            } catch (Exception ex) {
                cleanup(ex);
    
                StringBuffer mesg = new StringBuffer(128);
    
                if (!getParanoid()) {
                    mesg.append("Cannot connect to MySQL server on ");
                    mesg.append(this.host);
                    mesg.append(":");
                    mesg.append(this.port);
                    mesg.append(".\n\n");
                    mesg.append("Make sure that there is a MySQL server ");
                    mesg.append("running on the machine/port you are trying ");
                    mesg.append("to connect to and that the machine this software is running on ");
                    mesg.append("is able to connect to this host/port (i.e. not firewalled). ");
                    mesg.append("Also make sure that the server has not been started with the --skip-networking ");
                    mesg.append("flag.\n\n");
                } else {
                    mesg.append("Unable to connect to database.");
                }
    
                SQLException sqlEx = SQLError.createSQLException(mesg.toString(), SQLError.SQL_STATE_COMMUNICATION_LINK_FAILURE, getExceptionInterceptor());
    
                sqlEx.initCause(ex);
    
                throw sqlEx;
            }
    
            NonRegisteringDriver.trackConnection(this);
        }
    
    //接着继续看咯
    public void createNewIO(boolean isForReconnect) throws SQLException {
            synchronized (getConnectionMutex()) {
                // Synchronization Not needed for *new* connections, but defintely for connections going through fail-over, since we might get the new connection up
                // and running *enough* to start sending cached or still-open server-side prepared statements over to the backend before we get a chance to
                // re-prepare them...
    
                Properties mergedProps = exposeAsProperties(this.props);
    
                if (!getHighAvailability()) {
                    //重点,进去看看
                    connectOneTryOnly(isForReconnect, mergedProps);
    
                    return;
                }
                
                connectWithRetries(isForReconnect, mergedProps);
            }
        }
    //重点
    private void coreConnect(Properties mergedProps) throws SQLException, IOException {
            int newPort = 3306;
            String newHost = "localhost";
    
            String protocol = mergedProps.getProperty(NonRegisteringDriver.PROTOCOL_PROPERTY_KEY);
            //在这里,似乎看到了,快要建立连接了,protocol判断是使用哪种协议进行通信的了,tcp/ip套接字,pipe管道,还是其他
            if (protocol != null) {
                // "new" style URL
    
                if ("tcp".equalsIgnoreCase(protocol)) {
                    newHost = normalizeHost(mergedProps.getProperty(NonRegisteringDriver.HOST_PROPERTY_KEY));
                    newPort = parsePortNumber(mergedProps.getProperty(NonRegisteringDriver.PORT_PROPERTY_KEY, "3306"));
                } else if ("pipe".equalsIgnoreCase(protocol)) {
                    setSocketFactoryClassName(NamedPipeSocketFactory.class.getName());
    
                    String path = mergedProps.getProperty(NonRegisteringDriver.PATH_PROPERTY_KEY);
    
                    if (path != null) {
                        mergedProps.setProperty(NamedPipeSocketFactory.NAMED_PIPE_PROP_NAME, path);
                    }
                } else {
                    // normalize for all unknown protocols
                    newHost = normalizeHost(mergedProps.getProperty(NonRegisteringDriver.HOST_PROPERTY_KEY));
                    newPort = parsePortNumber(mergedProps.getProperty(NonRegisteringDriver.PORT_PROPERTY_KEY, "3306"));
                }
            } else {
    
                String[] parsedHostPortPair = NonRegisteringDriver.parseHostPortPair(this.hostPortPair);
                newHost = parsedHostPortPair[NonRegisteringDriver.HOST_NAME_INDEX];
    
                newHost = normalizeHost(newHost);
    
                if (parsedHostPortPair[NonRegisteringDriver.PORT_NUMBER_INDEX] != null) {
                    newPort = parsePortNumber(parsedHostPortPair[NonRegisteringDriver.PORT_NUMBER_INDEX]);
                }
            }
    
            this.port = newPort;
            this.host = newHost;
    
            // reset max-rows to default value
            this.sessionMaxRows = -1;
            //重点中的重点,进去看看咯
            this.io = new MysqlIO(newHost, newPort, mergedProps, getSocketFactoryClassName(), getProxy(), getSocketTimeout(),
                    this.largeRowSizeThreshold.getValueAsInt());
            //重点,建立好连接之后,需要验证密码之类
            this.io.doHandshake(this.user, this.password, this.database);
            if (versionMeetsMinimum(5, 5, 0)) {
                // error messages are returned according to character_set_results which, at this point, is set from the response packet
                this.errorMessageEncoding = this.io.getEncodingForHandshake();
            }
        }
    

    com.mysql.jdbc.MysqlIO.java

    public MysqlIO(String host, int port, Properties props, String socketFactoryClassName, MySQLConnection conn, int socketTimeout,
                int useBufferRowSizeThreshold) throws IOException, SQLException {
            this.connection = conn;
    
            if (this.connection.getEnablePacketDebug()) {
                this.packetDebugRingBuffer = new LinkedList<StringBuffer>();
            }
            this.traceProtocol = this.connection.getTraceProtocol();
    
            this.useAutoSlowLog = this.connection.getAutoSlowLog();
    
            this.useBufferRowSizeThreshold = useBufferRowSizeThreshold;
            this.useDirectRowUnpack = this.connection.getUseDirectRowUnpack();
    
            this.logSlowQueries = this.connection.getLogSlowQueries();
    
            this.reusablePacket = new Buffer(INITIAL_PACKET_SIZE);
            //发送包
            this.sendPacket = new Buffer(INITIAL_PACKET_SIZE);
            //端口和主机名
            this.port = port;
            this.host = host;
    
            this.socketFactoryClassName = socketFactoryClassName;
            //套接字工厂
            this.socketFactory = createSocketFactory();
            this.exceptionInterceptor = this.connection.getExceptionInterceptor();
    
            try {
                //套接字连接,到这里,终于看到连接了
                this.mysqlConnection = this.socketFactory.connect(this.host, this.port, props);
    
                if (socketTimeout != 0) {
                    try {
                        this.mysqlConnection.setSoTimeout(socketTimeout);
                    } catch (Exception ex) {
                        /* Ignore if the platform does not support it */
                    }
                }
    
                this.mysqlConnection = this.socketFactory.beforeHandshake();
                
                //输入流
                if (this.connection.getUseReadAheadInput()) {
                    this.mysqlInput = new ReadAheadInputStream(this.mysqlConnection.getInputStream(), 16384, this.connection.getTraceProtocol(),
                            this.connection.getLog());
                } else if (this.connection.useUnbufferedInput()) {
                    this.mysqlInput = this.mysqlConnection.getInputStream();
                } else {
                    this.mysqlInput = new BufferedInputStream(this.mysqlConnection.getInputStream(), 16384);
                }
                //输出流
                this.mysqlOutput = new BufferedOutputStream(this.mysqlConnection.getOutputStream(), 16384);
    
                this.isInteractiveClient = this.connection.getInteractiveClient();
                this.profileSql = this.connection.getProfileSql();
                this.autoGenerateTestcaseScript = this.connection.getAutoGenerateTestcaseScript();
    
                this.needToGrabQueryFromPacket = (this.profileSql || this.logSlowQueries || this.autoGenerateTestcaseScript);
    
                if (this.connection.getUseNanosForElapsedTime() && Util.nanoTimeAvailable()) {
                    this.useNanosForElapsedTime = true;
    
                    this.queryTimingUnits = Messages.getString("Nanoseconds");
                } else {
                    this.queryTimingUnits = Messages.getString("Milliseconds");
                }
    
                if (this.connection.getLogSlowQueries()) {
                    calculateSlowQueryThreshold();
                }
            } catch (IOException ioEx) {
                throw SQLError.createCommunicationsException(this.connection, 0, 0, ioEx, getExceptionInterceptor());
            }
        }
    

    com.mysql.jdbc.SocketFactory.java

    Socket connect(String host, int portNumber, Properties props) throws SocketException, IOException;
    

    com.mysql.jdbc.StandardSocketFactory.java

    //终于看到底层,我们熟悉的Socket连接了
    public Socket connect(String hostname, int portNumber, Properties props) throws SocketException, IOException {
    
            if (props != null) {
                this.host = hostname;
    
                this.port = portNumber;
    
                String localSocketHostname = props.getProperty("localSocketAddress");
                InetSocketAddress localSockAddr = null;
                if (localSocketHostname != null && localSocketHostname.length() > 0) {
                    localSockAddr = new InetSocketAddress(InetAddress.getByName(localSocketHostname), 0);
                }
    
                String connectTimeoutStr = props.getProperty("connectTimeout");
    
                int connectTimeout = 0;
    
                if (connectTimeoutStr != null) {
                    try {
                        connectTimeout = Integer.parseInt(connectTimeoutStr);
                    } catch (NumberFormatException nfe) {
                        throw new SocketException("Illegal value '" + connectTimeoutStr + "' for connectTimeout");
                    }
                }
    
                if (this.host != null) {
                    InetAddress[] possibleAddresses = InetAddress.getAllByName(this.host);
    
                    if (possibleAddresses.length == 0) {
                        throw new SocketException("No addresses for host");
                    }
    
                    // save last exception to propagate to caller if connection fails
                    SocketException lastException = null;
    
                    // Need to loop through all possible addresses. Name lookup may return multiple addresses including IPv4 and IPv6 addresses. Some versions of
                    // MySQL don't listen on the IPv6 address so we try all addresses.
                    for (int i = 0; i < possibleAddresses.length; i++) {
                        try {
                            this.rawSocket = createSocket(props);
    
                            configureSocket(this.rawSocket, props);
    
                            InetSocketAddress sockAddr = new InetSocketAddress(possibleAddresses[i], this.port);
                            // bind to the local port if not using the ephemeral port
                            if (localSockAddr != null) {
                                //绑定端口
                                this.rawSocket.bind(localSockAddr);
                            }
                            //正式与数据库服务器建立连接
                            this.rawSocket.connect(sockAddr, getRealTimeout(connectTimeout));
    
                            break;
                        } catch (SocketException ex) {
                            lastException = ex;
                            resetLoginTimeCountdown();
                            this.rawSocket = null;
                        }
                    }
    
                    if (this.rawSocket == null && lastException != null) {
                        throw lastException;
                    }
    
                    resetLoginTimeCountdown();
    
                    return this.rawSocket;
                }
            }
    
            throw new SocketException("Unable to create socket");
        }
    

    到这里,终于是与数据库服务器建立了连接。

    建立好接连之后,还要继续通信吧。那接着继续看呗

    com.mysql.jdbc.MysqlIO.java

    有兴趣的看看吧,这个方法有点长

    void doHandshake(String user, String password, String database) throws SQLException {
        //里面有一个重要的方法是,给数据库服务器发送包,与服务器进行通信
        send(packet, packet.getPosition());
    }
    
    private final void send(Buffer packet, int packetLen) throws SQLException {
            try {
                if (this.maxAllowedPacket > 0 && packetLen > this.maxAllowedPacket) {
                    throw new PacketTooBigException(packetLen, this.maxAllowedPacket);
                }
    
                if ((this.serverMajorVersion >= 4)
                        && (packetLen - HEADER_LENGTH >= this.maxThreeBytes || (this.useCompression && packetLen - HEADER_LENGTH >= this.maxThreeBytes
                                - COMP_HEADER_LENGTH))) {
                    sendSplitPackets(packet, packetLen);
    
                } else {
                    this.packetSequence++;
    
                    Buffer packetToSend = packet;
                    packetToSend.setPosition(0);
                    packetToSend.writeLongInt(packetLen - HEADER_LENGTH);
                    packetToSend.writeByte(this.packetSequence);
    
                    if (this.useCompression) {
                        this.compressedPacketSequence++;
                        int originalPacketLen = packetLen;
    
                        packetToSend = compressPacket(packetToSend, 0, packetLen);
                        packetLen = packetToSend.getPosition();
    
                        if (this.traceProtocol) {
                            StringBuffer traceMessageBuf = new StringBuffer();
    
                            traceMessageBuf.append(Messages.getString("MysqlIO.57"));
                            traceMessageBuf.append(getPacketDumpToLog(packetToSend, packetLen));
                            traceMessageBuf.append(Messages.getString("MysqlIO.58"));
                            traceMessageBuf.append(getPacketDumpToLog(packet, originalPacketLen));
    
                            this.connection.getLog().logTrace(traceMessageBuf.toString());
                        }
                    } else {
    
                        if (this.traceProtocol) {
                            StringBuffer traceMessageBuf = new StringBuffer();
    
                            traceMessageBuf.append(Messages.getString("MysqlIO.59"));
                            traceMessageBuf.append("host: '");
                            traceMessageBuf.append(this.host);
                            traceMessageBuf.append("' threadId: '");
                            traceMessageBuf.append(this.threadId);
                            traceMessageBuf.append("'\n");
                            traceMessageBuf.append(packetToSend.dump(packetLen));
    
                            this.connection.getLog().logTrace(traceMessageBuf.toString());
                        }
                    }
                    //终于把数据写到输出流中,这样子就完成通信了
                    this.mysqlOutput.write(packetToSend.getByteBuffer(), 0, packetLen);
                    this.mysqlOutput.flush();
                }
    
                if (this.enablePacketDebug) {
                    enqueuePacketForDebugging(true, false, packetLen + 5, this.packetHeaderBuf, packet);
                }
    
                //
                // Don't hold on to large packets
                //
                if (packet == this.sharedSendPacket) {
                    reclaimLargeSharedSendPacket();
                }
    
                if (this.connection.getMaintainTimeStats()) {
                    this.lastPacketSentTimeMs = System.currentTimeMillis();
                }
            } catch (IOException ioEx) {
                throw SQLError.createCommunicationsException(this.connection, this.lastPacketSentTimeMs, this.lastPacketReceivedTimeMs, ioEx,
                        getExceptionInterceptor());
            }
        }
    

    总结一下上面的步骤吧。

    上面的结果是拿到了Connection接口,实际也就是它的实现类ConnectionImpl类。而ConnectionImpl类的成员变量io,也是MysqlIO类,而MysqlIO类就具备了与数据库服务器进行通信的能力了。

    执行SQL语句

    //这一步就不看
    Statement statement = (Statement) connection.createStatement();
    //看看是怎么执行SQL语句的吧
    ResultSet resultSet = statement.executeQuery("select * from user");
    

    com.mysql.jdbc.StatementImpl.java

    public java.sql.ResultSet executeQuery(String sql) throws SQLException {
      //方法挺长,重点有一行代码
      this.results = locallyScopedConn.execSQL(this, sql, this.maxRows, null, this.resultSetType,       this.resultSetConcurrency, doStreaming,
                            this.currentCatalog, cachedFields);
    }
    

    com.mysql.jdbc.ConnectionImpl.java

    又是这个熟悉的类哈

    public ResultSetInternalMethods execSQL(StatementImpl callingStatement, String sql, int maxRows, Buffer packet, int resultSetType,
            int resultSetConcurrency, boolean streamResults, String catalog, Field[] cachedMetadata, boolean isBatch) throws SQLException {
        synchronized (getConnectionMutex()) {
            //
            // Fall-back if the master is back online if we've issued queriesBeforeRetryMaster queries since we failed over
            //
    
            long queryStartTime = 0;
    
            int endOfQueryPacketPosition = 0;
    
            if (packet != null) {
                endOfQueryPacketPosition = packet.getPosition();
            }
    
            if (getGatherPerformanceMetrics()) {
                queryStartTime = System.currentTimeMillis();
            }
    
            this.lastQueryFinishedTime = 0; // we're busy!
    
            if ((getHighAvailability()) && (this.autoCommit || getAutoReconnectForPools()) && this.needsPing && !isBatch) {
                try {
                    pingInternal(false, 0);
    
                    this.needsPing = false;
                } catch (Exception Ex) {
                    createNewIO(true);
                }
            }
    
            try {
                if (packet == null) {
                    String encoding = null;
    
                    if (getUseUnicode()) {
                        encoding = getEncoding();
                    }
                    //重点
                    return this.io.sqlQueryDirect(callingStatement, sql, encoding, null, maxRows, resultSetType, resultSetConcurrency, streamResults, catalog,
                            cachedMetadata);
                }
                //重点
                return this.io.sqlQueryDirect(callingStatement, null, null, packet, maxRows, resultSetType, resultSetConcurrency, streamResults, catalog,
                        cachedMetadata);
            } catch (java.sql.SQLException sqlE) {
                // don't clobber SQL exceptions
    
                if (getDumpQueriesOnException()) {
                    String extractedSql = extractSqlFromPacket(sql, packet, endOfQueryPacketPosition);
                    StringBuffer messageBuf = new StringBuffer(extractedSql.length() + 32);
                    messageBuf.append("\n\nQuery being executed when exception was thrown:\n");
                    messageBuf.append(extractedSql);
                    messageBuf.append("\n\n");
    
                    sqlE = appendMessageToException(sqlE, messageBuf.toString(), getExceptionInterceptor());
                }
    
                if ((getHighAvailability())) {
                    this.needsPing = true;
                } else {
                    String sqlState = sqlE.getSQLState();
    
                    if ((sqlState != null) && sqlState.equals(SQLError.SQL_STATE_COMMUNICATION_LINK_FAILURE)) {
                        cleanup(sqlE);
                    }
                }
    
                throw sqlE;
            } catch (Exception ex) {
                if (getHighAvailability()) {
                    this.needsPing = true;
                } else if (ex instanceof IOException) {
                    cleanup(ex);
                }
    
                SQLException sqlEx = SQLError.createSQLException(Messages.getString("Connection.UnexpectedException"), SQLError.SQL_STATE_GENERAL_ERROR,
                        getExceptionInterceptor());
                sqlEx.initCause(ex);
    
                throw sqlEx;
            } finally {
                if (getMaintainTimeStats()) {
                    this.lastQueryFinishedTime = System.currentTimeMillis();
                }
    
                if (getGatherPerformanceMetrics()) {
                    long queryTime = System.currentTimeMillis() - queryStartTime;
    
                    registerQueryExecutionTime(queryTime);
                }
            }
        }
    }
    

    com.mysql.jdbc.MysqlIO.java 也是熟悉的类呀

    final ResultSetInternalMethods sqlQueryDirect(StatementImpl callingStatement, String query, String characterEncoding, Buffer queryPacket, int maxRows,
                int resultSetType, int resultSetConcurrency, boolean streamResults, String catalog, Field[] cachedMetadata) throws Exception {
           //又是好长的代码呀
          //只看关键的代码吧
        // Send query command and sql query string
                Buffer resultPacket = sendCommand(MysqlDefs.QUERY, null, queryPacket, false, null, 0);
    }
    //最终还是调用send()与数据库服务器通信
    final Buffer sendCommand(int command, String extraData, Buffer queryPacket, boolean skipCheck, String extraDataCharEncoding, int timeoutMillis)
                throws SQLException {
            this.commandCount++;
    
            //
            // We cache these locally, per-command, as the checks for them are in very 'hot' sections of the I/O code and we save 10-15% in overall performance by
            // doing this...
            //
            this.enablePacketDebug = this.connection.getEnablePacketDebug();
            this.readPacketSequence = 0;
    
            int oldTimeout = 0;
    
            if (timeoutMillis != 0) {
                try {
                    oldTimeout = this.mysqlConnection.getSoTimeout();
                    this.mysqlConnection.setSoTimeout(timeoutMillis);
                } catch (SocketException e) {
                    throw SQLError.createCommunicationsException(this.connection, this.lastPacketSentTimeMs, this.lastPacketReceivedTimeMs, e,
                            getExceptionInterceptor());
                }
            }
    
            try {
    
                checkForOutstandingStreamingData();
    
                // Clear serverStatus...this value is guarded by an external mutex, as you can only ever be processing one command at a time
                this.oldServerStatus = this.serverStatus;
                this.serverStatus = 0;
                this.hadWarnings = false;
                this.warningCount = 0;
    
                this.queryNoIndexUsed = false;
                this.queryBadIndexUsed = false;
                this.serverQueryWasSlow = false;
    
                //
                // Compressed input stream needs cleared at beginning of each command execution...
                //
                if (this.useCompression) {
                    int bytesLeft = this.mysqlInput.available();
    
                    if (bytesLeft > 0) {
                        this.mysqlInput.skip(bytesLeft);
                    }
                }
    
                try {
                    clearInputStream();
    
                    //
                    // PreparedStatements construct their own packets, for efficiency's sake.
                    //
                    // If this is a generic query, we need to re-use the sending packet.
                    //
                    if (queryPacket == null) {
                        int packLength = HEADER_LENGTH + COMP_HEADER_LENGTH + 1 + ((extraData != null) ? extraData.length() : 0) + 2;
    
                        if (this.sendPacket == null) {
                            this.sendPacket = new Buffer(packLength);
                        }
    
                        this.packetSequence = -1;
                        this.compressedPacketSequence = -1;
                        this.readPacketSequence = 0;
                        this.checkPacketSequence = true;
                        this.sendPacket.clear();
    
                        this.sendPacket.writeByte((byte) command);
    
                        if ((command == MysqlDefs.INIT_DB) || (command == MysqlDefs.CREATE_DB) || (command == MysqlDefs.DROP_DB) || (command == MysqlDefs.QUERY)
                                || (command == MysqlDefs.COM_PREPARE)) {
                            if (extraDataCharEncoding == null) {
                                this.sendPacket.writeStringNoNull(extraData);
                            } else {
                                this.sendPacket.writeStringNoNull(extraData, extraDataCharEncoding, this.connection.getServerCharset(),
                                        this.connection.parserKnowsUnicode(), this.connection);
                            }
                        } else if (command == MysqlDefs.PROCESS_KILL) {
                            long id = Long.parseLong(extraData);
                            this.sendPacket.writeLong(id);
                        }
    
                        send(this.sendPacket, this.sendPacket.getPosition());
                    } else {
                        this.packetSequence = -1;
                        this.compressedPacketSequence = -1;
                        //最终还是调用send()与数据库服务器通信
                        send(queryPacket, queryPacket.getPosition()); // packet passed by PreparedStatement
                    }
                } catch (SQLException sqlEx) {
                    // don't wrap SQLExceptions
                    throw sqlEx;
                } catch (Exception ex) {
                    throw SQLError.createCommunicationsException(this.connection, this.lastPacketSentTimeMs, this.lastPacketReceivedTimeMs, ex,
                            getExceptionInterceptor());
                }
    
                Buffer returnPacket = null;
    
                if (!skipCheck) {
                    if ((command == MysqlDefs.COM_EXECUTE) || (command == MysqlDefs.COM_RESET_STMT)) {
                        this.readPacketSequence = 0;
                        this.packetSequenceReset = true;
                    }
    
                    returnPacket = checkErrorPacket(command);
                }
    
                return returnPacket;
            } catch (IOException ioEx) {
                throw SQLError.createCommunicationsException(this.connection, this.lastPacketSentTimeMs, this.lastPacketReceivedTimeMs, ioEx,
                        getExceptionInterceptor());
            } finally {
                if (timeoutMillis != 0) {
                    try {
                        this.mysqlConnection.setSoTimeout(oldTimeout);
                    } catch (SocketException e) {
                        throw SQLError.createCommunicationsException(this.connection, this.lastPacketSentTimeMs, this.lastPacketReceivedTimeMs, e,
                                getExceptionInterceptor());
                    }
                }
            }
        }
    

    到这里为止,我们讲到整个过程了,代码太多,也比较难表达,希望能够帮到你们吧,我也是第一次读这些代码,还是很熟悉。

    相关文章

      网友评论

        本文标题:窥探JDBC连接MySQL的源码实现原理

        本文链接:https://www.haomeiwen.com/subject/tmdmaftx.html