美文网首页druid 源码之旅Druid
[druid 源码解析] 4 获取连接

[druid 源码解析] 4 获取连接

作者: AndyWei123 | 来源:发表于2021-11-11 00:21 被阅读0次

    我们回头看DataSource的接口,它里面只定义了两个方法,如下,我们今天来分析第一个也是最重要的方法 getConnection:

    public interface DataSource  extends CommonDataSource, Wrapper {
      Connection getConnection() throws SQLException;
    
      Connection getConnection(String username, String password)
        throws SQLException;
    }
    

    1.1 具体实现

    我们先看一下 DruidDatasource 的具体实现:

        @Override
        public DruidPooledConnection getConnection() throws SQLException {
            return getConnection(maxWait);
        }
    
        public DruidPooledConnection getConnection(long maxWaitMillis) throws SQLException {
            init();
    
            if (filters.size() > 0) {
                FilterChainImpl filterChain = new FilterChainImpl(this);
                // 遍历所有 Filter 
                return filterChain.dataSource_connect(this, maxWaitMillis);
            } else {
                return getConnectionDirect(maxWaitMillis);
            }
        }
    

    这里先调用了 init() 方法,来初始化,这个流程我们上面已经提到,假如已经初始化完成就会直接返回,接下来遍历所有的filter,这里是一种责任链模式,FilterChainImpl负责遍历所有的 Filter,主要流程是 FilterChainImpl先判断,当前filter的位置是不是最后的 ,假如是,就调用实际需要执行的方法,假如不是,就获取下一个 filter,并将自己传给 filterfilter在处理的时候是先调用 FilterChainImpl来获取实际的结果,最后自己才对结果进行处理,有点像入栈出栈流程。

    1.2 责任链模式

    一开始看责任链模式会有点绕,所以我直接写了个简单的例子来模拟这个流程,首先我们有两个接口,一个是 Filter 一个是 FilterChain:

    public interface Filter {
        public int filter(FilterChain chain);
    }
    public interface FilterChain {
        public int doFilter();
    }
    

    接着我们做 FilterChain 的实现类, 这里的关键就是他需要持有 filter 的链,然后自己定义具体链的位置,最后最重要的是这个判断。(这里可以先忽略构造方法)

    public class FilterChainImpl implements FilterChain {
    
        List<Filter> filters;
    
        int pos;
    
        public FilterChainImpl() {
            filters = new ArrayList<>();
            filters.add(new CFilter());
            filters.add(new BFilter());
            filters.add(new AFilter());
            pos = 0;
        }
    
        @Override
        public int doFilter() {
            if (pos < filters.size()) {
                getNexFilter().filter(this);
            }
            return 1;
        }
    
        public Filter getNexFilter() {
            return filters.get(pos++);
        }
    }
    

    接着我们简答地实现一个 Filter,最关键的逻辑是需要先调用 chain 去获取结果,在对结果进行处理:

    public class AFilter implements Filter {
        @Override
        public int filter(FilterChain chain) {
            int result = chain.doFilter();
            System.out.println("AFilter filter " + result);
            return 0;
        }
    }
    

    最终我们的输出如下:


    image.png

    内部获取链接

    有了上述的例子,我们其实最后调用的就是 FilterChainImplgetConnection 方法的最后一行即可,即调用了 getConnectionDirect 方法。

    public DruidPooledConnection getConnectionDirect(long maxWaitMillis) throws SQLException {
            int notFullTimeoutRetryCnt = 0;
            for (; ; ) {
                // handle notFullTimeoutRetry
                DruidPooledConnection poolableConnection;
                try {
                    // 真正去获取 connection
                    poolableConnection = getConnectionInternal(maxWaitMillis);
                } catch (GetConnectionTimeoutException ex) {
                    if (notFullTimeoutRetryCnt <= this.notFullTimeoutRetryCount && !isFull()) {
                        notFullTimeoutRetryCnt++;
                        if (LOG.isWarnEnabled()) {
                            LOG.warn("get connection timeout retry : " + notFullTimeoutRetryCnt);
                        }
                        continue;
                    }
                    throw ex;
                }
    
                if (testOnBorrow) {
                    // 测试 connection 是否可用
                    boolean validate = testConnectionInternal(poolableConnection.holder, poolableConnection.conn);
                    if (!validate) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("skip not validate connection.");
                        }
                        // 假如不可用就断开链接
                        discardConnection(poolableConnection.holder);
                        continue;
                    }
                } else {
                  // 对链接进行校验
                    .....
                }
              // 产看是否需要检查活动线程,假如需要就放到  activeConnections 集合中。
                if (removeAbandoned) {
                    StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
                    poolableConnection.connectStackTrace = stackTrace;
                    poolableConnection.setConnectedTimeNano();
                    poolableConnection.traceEnable = true;
    
                    activeConnectionLock.lock();
                    try {
                        activeConnections.put(poolableConnection, PRESENT);
                    } finally {
                        activeConnectionLock.unlock();
                    }
                }
    
                if (!this.defaultAutoCommit) {
                    poolableConnection.setAutoCommit(false);
                }
    
                return poolableConnection;
            }
        }
    

    流程如下:

    • 调用getConnectionInternal获取经过各种包装的Connection,这个是获取连接的主要逻辑,支持超时时间,由DruidDataSource的maxWait参数指定,单位毫秒。
    • 如果testOnBorrow为true,则进行对连接进行校验,校验失败则进行清理并重新进入循环,否则跳到下一步。
    • 如果testWhileIdle为true,距离上次激活时间超过timeBetweenEvictionRunsMillis,则进行清理。
    • 如果removeAbandoned为true,则会把连接存放在activeConnections中,清理线程会对其定期进行处理。
      接下来,我们看一下 getConnectionInternal 方法:
    private DruidPooledConnection getConnectionInternal(long maxWait) throws SQLException {
              ......
            final long nanos = TimeUnit.MILLISECONDS.toNanos(maxWait);
            final int maxWaitThreadCount = this.maxWaitThreadCount;
    
            DruidConnectionHolder holder;
    
            for (boolean createDirect = false; ; ) {
              // 每次都是重新创建模式,就执行下面逻辑。
                if (createDirect) {
                 ........
                }
    
                try {
                // 获取锁
                    lock.lockInterruptibly();
                } catch (InterruptedException e) {
                    connectErrorCountUpdater.incrementAndGet(this);
                    throw new SQLException("interrupt", e);
                }
    
                try {
                  // 检查是否到达最大等待线程数量线程
                    if (maxWaitThreadCount > 0
                        && notEmptyWaitThreadCount >= maxWaitThreadCount) {
                        connectErrorCountUpdater.incrementAndGet(this);
                        throw new ***
                    }
                  // 查看是否有报错,有就抛出去
                   .....
    
                    connectCount++;
    // 检查创建的线程池是否已经不够了,不够就直接创建
                    if (createScheduler != null
                        && poolingCount == 0
                        && activeCount < maxActive
                        && creatingCountUpdater.get(this) == 0
                        && createScheduler instanceof ScheduledThreadPoolExecutor) {
                        ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor) createScheduler;
                        if (executor.getQueue().size() > 0) {
                            createDirect = true;
                            continue;
                        }
                    }
                    // 这两个方法仅仅是有是否有超时时间决定。
                    if (maxWait > 0) {
                        holder = pollLast(nanos);
                    } else {
                        holder = takeLast();
                    }
    
                    if (holder != null) {
                        if (holder.discard) {
                            continue;
                        }
    
                        activeCount++;
                        holder.active = true;
                        if (activeCount > activePeak) {
                            activePeak = activeCount;
                            activePeakTime = System.currentTimeMillis();
                        }
                    }
                } catch (InterruptedException e) {
                    connectErrorCountUpdater.incrementAndGet(this);
                    throw new SQLException(e.getMessage(), e);
                } catch (SQLException e) {
                    connectErrorCountUpdater.incrementAndGet(this);
                    throw e;
                } finally {
                    lock.unlock();
                }
    
                break;
            }
    
            if (holder == null) {
                    // 创建错误信息
                      ....
                }
    
                String errorMessage = buf.toString();
    
                if (createError != null) {
                    throw new GetConnectionTimeoutException(errorMessage, createError);
                } else {
                    throw new GetConnectionTimeoutException(errorMessage);
                }
            }
    
            holder.incrementUseCount();
    
            DruidPooledConnection poolalbeConnection = new DruidPooledConnection(holder);
            return poolalbeConnection;
        }
    
    • 检查创建的线程池是否已经不够了,不够就直接创建。
    • 调用 pollLast(nanos); 或者 takeLast(); 这两者仅仅是是否有超时时间的区别。

    和创建连接线程协作

    我们直接来分析 takeLast();

    DruidConnectionHolder takeLast() throws InterruptedException, SQLException {
            try {
                while (poolingCount == 0) {
                    // 发送信号通知创建链接线程去创建连接
                    emptySignal(); // send signal to CreateThread create connection
    
                    if (failFast && isFailContinuous()) {
                        throw new DataSourceNotAvailableException(createError);
                    }
                    // 将等待线程加一
                    notEmptyWaitThreadCount++;
                    if (notEmptyWaitThreadCount > notEmptyWaitThreadPeak) {
                        notEmptyWaitThreadPeak = notEmptyWaitThreadCount;
                    }
                    try {
                        // 等待创建好连接
                        notEmpty.await(); // signal by recycle or creator
                    } finally {
                        notEmptyWaitThreadCount--;
                    }
                    notEmptyWaitCount++;
    
                    if (!enable) {
                        connectErrorCountUpdater.incrementAndGet(this);
                        if (disableException != null) {
                            throw disableException;
                        }
    
                        throw new DataSourceDisableException();
                    }
                }
            } catch (InterruptedException ie) {
                notEmpty.signal(); // propagate to non-interrupted thread
                notEmptySignalCount++;
                throw ie;
            }
            // 获取空闲连接数组 connections 的最后一个线程,并返回
            decrementPoolingCount();
            DruidConnectionHolder last = connections[poolingCount];
            connections[poolingCount] = null;
    
            return last;
        }
    

    这里很多细节就和我们之前的对应上了,首先是发送信号,让创建线程创建线程池,然后判断等待线程首先是否当前等待线程大于阈值,是的话就抛错。然后调用 notEmpty.await() 等待创建线程的通知。
    最后将 Connection 从活动线程借出来。

    检查链接

    我们回到上面,当拿connection 后需要检查链接是否存活,调用 testConnectionInternal 方法,最终调用 MySqlValidConnectionCheckerisValidConnection 方法。我们通过Debug 发现最终调用的就是 JDBC4MysqlpingInternal 方法, 如下:

    check
    到这里,我们完成了 getConnection 的工作。

    相关文章

      网友评论

        本文标题:[druid 源码解析] 4 获取连接

        本文链接:https://www.haomeiwen.com/subject/ojhwzltx.html