美文网首页druidDruid
Druid 获取连接流程

Druid 获取连接流程

作者: 晴天哥_王志 | 来源:发表于2022-02-13 11:17 被阅读0次

    连接获取流程

    • 在druidDataSource中有一个重入锁和衍生的两个condition,一个监控连接池是否为空,一个监控连接池不为空。
    • 在druidDataSource中有两个线程,一个生成连接,一个回收连接。在创建、获取、回收的时候都会使用这些锁和condition。
    • 每次获取Connection都会调用init,内部使用inited标识DataSource是否已经初始化OK。
    • 每次获取 Connection 都会需要进行加锁保证线程安全,所有操作都在加锁后执行。
    • 如果连接池内没有连接了,则调用empty.signal(),通知CreateThread创建连接,并且等待指定的时间,被唤醒之后再去查看是否有可用连接。
    • 应用侧通过 getConnection 获取连接,如果连接为空那么就通知 CreateConnectionThread 负责创建连接。

    连接获取源码

        public DruidPooledConnection getConnectionDirect(long maxWaitMillis) throws SQLException {
            int notFullTimeoutRetryCnt = 0;
            for (;;) {
                // handle notFullTimeoutRetry
                DruidPooledConnection poolableConnection;
                try {
                    // 通过getConnectionInternal来获取连接
                    poolableConnection = getConnectionInternal(maxWaitMillis);
                } catch (GetConnectionTimeoutException ex) {
                    throw ex;
                }
              
                // 检测连接的可用性等操作
                if (testOnBorrow) {
                    // 获取连接的时候检测连接可用性
                } else {
                    Connection realConnection = poolableConnection.conn;
    
                    if (testWhileIdle) {
                        // 检测是否超过空闲时间
                    }
                }
    
                // 省略部分代码
                return poolableConnection;
            }
        }
    
    • 调用getConnectionInternal获取经过各种包装的Connection,这个是获取连接的主要逻辑,支持超时时间(由DruidDataSource的maxWait参数指定,单位毫秒)。
    • 如果testOnBorrow为true,则进行对连接进行校验,校验失败则进行清理并重新进入循环,否则跳到下一步。
    • 如果testWhileIdle为true,距离上次激活时间超过timeBetweenEvictionRunsMillis,则进行清理。
    • 如果removeAbandoned为true,则会把连接存放在activeConnections中,清理线程会对其定期进行处理。
        private DruidPooledConnection getConnectionInternal(long maxWait) throws SQLException {
    
            final long nanos = TimeUnit.MILLISECONDS.toNanos(maxWait);
            final int maxWaitThreadCount = this.maxWaitThreadCount;
    
            // 创建数据库连接的逻辑
            DruidConnectionHolder holder;
            for (boolean createDirect = false;;) {
                try {
                    lock.lockInterruptibly();
                } catch (InterruptedException e) {
                    throw new SQLException("interrupt", e);
                }
    
                try {
                    connectCount++;
                    // 获取连接对象
                    if (maxWait > 0) {
                        holder = pollLast(nanos);
                    } else {
                        holder = takeLast();
                    }
    
                    if (holder != null) {
                        activeCount++;
                        if (activeCount > activePeak) {
                            activePeak = activeCount;
                            activePeakTime = System.currentTimeMillis();
                        }
                    }
                } catch (InterruptedException e) {
                    throw new SQLException(e.getMessage(), e);
                } catch (SQLException e) {
                    throw e;
                } finally {
                    lock.unlock();
                }
                break;
            }
    
            holder.incrementUseCount();
    
            DruidPooledConnection poolalbeConnection = new DruidPooledConnection(holder);
            return poolalbeConnection;
        }
    
    • 通过lock.lockInterruptibly()获取锁,然后通过pollLast / takeLast来获取连接。
        DruidConnectionHolder takeLast() throws InterruptedException, SQLException {
            try {
                // 信号量的通知
                while (poolingCount == 0) {
                    emptySignal(); // send signal to CreateThread create connection
    
                    try {
                        notEmpty.await(); // signal by recycle or creator
                    } finally {
                        notEmptyWaitThreadCount--;
                    }
                }
            } catch (InterruptedException ie) {
                throw ie;
            }
    
            // 从connections获取最后一个连接
            decrementPoolingCount(); // 下标减一后返回最后一个连接
            DruidConnectionHolder last = connections[poolingCount];
            connections[poolingCount] = null;
    
            return last;
        }
    
        private DruidConnectionHolder pollLast(long nanos) throws InterruptedException, SQLException {
            long estimate = nanos;
    
            for (;;) {
                if (poolingCount == 0) {
                    emptySignal(); // send signal to CreateThread create connection
    
                    try {
                        long startEstimate = estimate;
                        estimate = notEmpty.awaitNanos(estimate); 
                        notEmptyWaitCount++;
                        notEmptyWaitNanos += (startEstimate - estimate);
                    } catch (InterruptedException ie) {
                        throw ie;
                    } finally {
                        notEmptyWaitThreadCount--;
                    }
    
                    if (poolingCount == 0) {
                        if (estimate > 0) {
                            continue;
                        }
                        waitNanosLocal.set(nanos - estimate);
                        return null;
                    }
                }
    
                // 从connections获取最后一个连接
                decrementPoolingCount();// 下标减一后返回最后一个连接
                DruidConnectionHolder last = connections[poolingCount];
                connections[poolingCount] = null;
    
                long waitNanos = nanos - estimate;
                last.setLastNotEmptyWaitNanos(waitNanos);
    
                return last;
            }
        }
    
    • 如果连接通过 poolingCount=0判断连接池为空的情况下通过emptySignal来通知CreateThread创建连接,并且等待指定的时间,被唤醒之后再去查看是否有可用连接。
    • 如果连接池不为空就从连接池对象connections获取一个可用连接。
    • take和poll存在等待时间的差异,take会多次尝试获取连接,获取成功才会返回。

    CreateConnectionThread创建连接

        public class CreateConnectionThread extends Thread {
    
            public CreateConnectionThread(String name){
                super(name);
                this.setDaemon(true);
            }
    
            public void run() {
    
                initedLatch.countDown();
    
                long lastDiscardCount = 0;
                int errorCount = 0;
                for (;;) {
                    try {
                        lock.lockInterruptibly();
                    } catch (InterruptedException e2) {
                        break;
                    }
    
                    long discardCount = DruidDataSource.this.discardCount;
                    boolean discardChanged = discardCount - lastDiscardCount > 0;
                    lastDiscardCount = discardCount;
    
                    try {
                        boolean emptyWait = true;
    
                        // 判断是否需要等待连接池为空的信号量 empty.wait()
                        if (emptyWait) {
                            // 必须存在线程等待,才创建连接
                            if (poolingCount >= notEmptyWaitThreadCount //
                                    && (!(keepAlive && activeCount + poolingCount < minIdle))
                                    && !isFailContinuous()
                            ) {
                                empty.await();
                            }
    
                            // 防止创建超过maxActive数量的连接
                            if (activeCount + poolingCount >= maxActive) {
                                empty.await();
                                continue;
                            }
                        }
    
                    } catch (InterruptedException e) {
                        break;
                    } finally {
                        lock.unlock();
                    }
    
                    // 负责创建连接
                    PhysicalConnectionInfo connection = null;
                    try {
                        connection = createPhysicalConnection();
                    } catch (Error e) {
                        setFailContinuous(true);
                        break;
                    }
    
                    // 添加新建的连接
                    boolean result = put(connection);
    
                    errorCount = 0; // reset errorCount
                }
            }
        }
    
    • CreateConnectionThread是一个守护线程,在需要时创建连接。
    • 创建好物理连接之后,需要使用DruidConnectionHolder代理实际的物理连接,该对象持有DruidDataSource的引用,调用Connection最终会调用DruidDataSource的recyle(DruidPooledConnection conn)回收该连接,创建物理连接的过程是不加锁的,避免影响性能。
    • 创建好连接之后,还需要把该连接put到连接池中,重新进行加锁。
        protected boolean put(PhysicalConnectionInfo physicalConnectionInfo) {
            DruidConnectionHolder holder = null;
            try {
                holder = new DruidConnectionHolder(DruidDataSource.this, physicalConnectionInfo);
            } catch (SQLException ex) {
                lock.lock();
                try {
                    if (createScheduler != null) {
                        clearCreateTask(physicalConnectionInfo.createTaskId);
                    }
                } finally {
                    lock.unlock();
                }
                LOG.error("create connection holder error", ex);
                return false;
            }
    
            return put(holder, physicalConnectionInfo.createTaskId);
        }
    
        private boolean put(DruidConnectionHolder holder, long createTaskId) {
            lock.lock();
            try {
                if (poolingCount >= maxActive) {
                    return false;
                }
                // 放置新增的连接
                connections[poolingCount] = holder;
                incrementPoolingCount();
    
                if (poolingCount > poolingPeak) {
                    poolingPeak = poolingCount;
                    poolingPeakTime = System.currentTimeMillis();
                }
                // 通知获取连接的线程
                notEmpty.signal();
                notEmptySignalCount++;
    
                if (createScheduler != null) {
                    clearCreateTask(createTaskId);
    
                    if (poolingCount + createTaskCount < notEmptyWaitThreadCount //
                        && activeCount + poolingCount + createTaskCount < maxActive) {
                        emptySignal();
                    }
                }
            } finally {
                lock.unlock();
            }
            return true;
        }
    
    • put过程是将连接存放在connections索引,并且通知notEmpty取走连接,也就是需要获取连接的线程。

    相关文章

      网友评论

        本文标题:Druid 获取连接流程

        本文链接:https://www.haomeiwen.com/subject/ypyxlrtx.html