本文背景
去年11月我才接触mycat代码的时候,公司就有严重的连接泄露问题需要排查,本文是基于那次排查对代码的理解整理而成。
- 文中的连接池指的的是后端连接的连接池,因为前端连接的连接池在业务进程那里啊。
- 本文基于dble代码,dble在mycat连接池做了一些调整,代码有一些差异。
水平有限,如果有错误欢迎指正,个人邮箱 tankilo@126.com
DBLE(Mycat)连接池的独特之处
1. 后端连接NIO
mycat的连接池因为后端也使用原生nio,和一般jdbc连接池(例如druid)有很大的不同,讲究祖传手艺纯手工。。。
据我所知,目前这些分库分表中间件,后端连接也使用nio的只有mycat一家?欢迎指正。
2. 一个mysql上的多个Database共享一个连接池
一个MySQLDataSource(PhysicalDatasource)对应一个mysql上多个Database的连接,通过PhysicalDatasource#conMap保存空闲可用连接,key为database物理分库名称来区分,避免每个database固定连接带来的空闲浪费。
关键API
操作 | 代码 |
---|---|
借出方式一 | com.actiontech.dble.backend.datasource.PhysicalDatasource#getConnection(java.lang.String, boolean, com.actiontech.dble.backend.mysql.nio.handler.ResponseHandler, java.lang.Object) |
借出方式二 | com.actiontech.dble.backend.datasource.PhysicalDatasource#getConnection(java.lang.String, boolean, java.lang.Object) |
归还 | com.actiontech.dble.backend.mysql.nio.MySQLConnection#release |
关闭物理连接 | com.actiontech.dble.backend.mysql.nio.MySQLConnection#close com.actiontech.dble.backend.mysql.nio.MySQLConnection#quit |
借出方式一 异步
public void getConnection(String schema, boolean autocommit, final ResponseHandler handler, final Object attachment) throws IOException {
注意方法的返回值是void, 异步拿连接的时候,getConnection方法是没有返回值,连接池通过调用你的回调函数,告诉你可以使用连接了。
调用回调函数的线程在有空闲连接的时候是用户线程,在没空闲连接需要创建的时候是NIO Sub Reactor线程。
下面仔细介绍下:
1. 有空闲连接的话
(1)获取空闲连接
com.actiontech.dble.backend.ConMap#tryTakeCon(String schema, boolean autocommit)
获取逻辑是按照schema和autocommit的顺序,先获取符合要求的链接,没有符合的时候,就随意拿一个连接,因为后面还会重新检查并否覆盖一次database之类的配置。
具体逻辑:
-
先根据【物理库名称schema】作为key到获取对应物理库的空闲连接队列
如果可以获取到队列,就再按照autocommit先获取符合要求的链接,获取不到就随意拿链接
-
如果没有对应【物理库名称schema的链接】,就随机选取一个库的链接
(2)更新借出标志和上次使用时间
拿到空闲连接后,调用com.actiontech.dble.backend.datasource.PhysicalDatasource#takeCon
更新下借出标志和上次使用时间,这个时间点会用在sql执行超时的时候。
conn.setBorrowed(true);
......
conn.setLastTime(System.currentTimeMillis());
(3)调用用户的ResponseHandler#connectionAcquired, 通知连接已经拿到
下面会直接调用传入参数ResponseHandler的connectionAcquired方法,你可以在这个方法里使用MySQLConnection的方法执行sql,注意这些网络通信都是nio异步的,所以MySQLConnection的方法也会立刻返回。
2. 没有空闲连接的话,创建新的链接
使用新的线程异步创建连接,并且使用com.actiontech.dble.backend.mysql.nio.handler.DelegateResponseHandler包装一下用户传入的handler,因为连接池自己在把新创建的连接给用户的回调函数之前,会和拿空闲连接一样做些更新借出标志和上次使用时间的工作。
private void createNewConnection(final ResponseHandler handler, final Object attachment,
final String schema) throws IOException {
// aysn create connection
DbleServer.getInstance().getComplexQueryExecutor().execute(new Runnable() {
public void run() {
try {
createNewConnection(new DelegateResponseHandler(handler) {
@Override
public void connectionError(Throwable e, BackendConnection conn) {
// 监控告警
.................................
handler.connectionError(e, conn);
}
@Override
public void connectionAcquired(BackendConnection conn) {
// 也是设置一些变量,然后再调用户的ResponseHandler#connectionAcquired
takeCon(conn, handler, attachment, schema);
}
}, schema);
} catch (IOException e) {
handler.connectionError(e, null);
}
}
});
}
具体创建连接的操作是com.actiontech.dble.backend.mysql.nio.MySQLDataSource#createNewConnection,详细流程可以看后文单个连接的生命周期
新创建的链接的会经过三个回调的包装,【MySQLConnectionAuthenticator(NIOHandler)】-->【DelegateResponseHandler匿名内部类(ResponseHandler)】-->【用户传入的ResponseHandler】
而上面已经存在的空闲连接就直接走【用户传入的ResponseHandler】
在NIO Sub Reactor线程里回调,线程堆栈示例
"$_NIO_REACTOR_BACKEND-1-RW@3428" prio=5 tid=0x2b nid=NA runnable
java.lang.Thread.State: RUNNABLE
at com.actiontech.dble.sqlengine.SQLJob.connectionAcquired(SQLJob.java:87)
at com.actiontech.dble.backend.datasource.PhysicalDatasource.takeCon(PhysicalDatasource.java:333)
at com.actiontech.dble.backend.datasource.PhysicalDatasource.access$200(PhysicalDatasource.java:37)
at com.actiontech.dble.backend.datasource.PhysicalDatasource$1$1.connectionAcquired(PhysicalDatasource.java:353)
at com.actiontech.dble.backend.mysql.nio.MySQLConnectionAuthenticator.handle(MySQLConnectionAuthenticator.java:61)
at com.actiontech.dble.net.AbstractConnection.handle(AbstractConnection.java:262)
at com.actiontech.dble.net.AbstractConnection.onReadData(AbstractConnection.java:321)
at com.actiontech.dble.net.NIOSocketWR.asyncRead(NIOSocketWR.java:171)
at com.actiontech.dble.net.AbstractConnection.asyncRead(AbstractConnection.java:272)
at com.actiontech.dble.net.NIOReactor$RW.executeKeys(NIOReactor.java:119)
at com.actiontech.dble.net.NIOReactor$RW.run(NIOReactor.java:90)
at java.lang.Thread.run(Thread.java:748)
借出方式二 同步
public BackendConnection getConnection(String schema, boolean autocommit, final Object attachment) throws IOException {
- 有空闲连接的话直接返回 com.actiontech.dble.backend.datasource.PhysicalDatasource#conMap。
更新一下连接的借出标志和最后使用时间,直接返回MySQLConnection。
2. 没有空闲连接的时候
因为用户没有传入responseHandler,所以需要用一个占位的临时ResponseHandler,在同步创建连接的时候是com.actiontech.dble.backend.mysql.nio.handler.NewConnectionRespHandler。
用户线程阻塞com.actiontech.dble.backend.mysql.nio.handler.NewConnectionRespHandler#getBackConn方法上, 等待异步创建连接操作完成,具体实现是通过NewConnectionRespHandler里的java.util.concurrent.locks.Condition完成阻塞和唤醒,正常操作。
获取连接的线程阻塞在Condition#await()
上
"main@1" prio=5 tid=0x1 nid=NA waiting
java.lang.Thread.State: WAITING
at sun.misc.Unsafe.park(Unsafe.java:-1)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at com.actiontech.dble.backend.mysql.nio.handler.NewConnectionRespHandler.getBackConn(NewConnectionRespHandler.java:29)
at com.actiontech.dble.backend.datasource.PhysicalDatasource.getConnection(PhysicalDatasource.java:415)
at com.actiontech.dble.DbleServer.startup(DbleServer.java:494)
at com.actiontech.dble.DbleStartup.main(DbleStartup.java:30)
占位的NewConnectionRespHandler会在connectionAcquired方法里调用Condition#signal
"$_NIO_REACTOR_BACKEND-3-RW@3425" prio=5 tid=0x2c nid=NA runnable
java.lang.Thread.State: RUNNABLE
at com.actiontech.dble.backend.mysql.nio.handler.NewConnectionRespHandler.connectionAcquired(NewConnectionRespHandler.java:60)
at com.actiontech.dble.backend.mysql.nio.MySQLConnectionAuthenticator.handle(MySQLConnectionAuthenticator.java:61)
at com.actiontech.dble.net.AbstractConnection.handle(AbstractConnection.java:262)
at com.actiontech.dble.net.AbstractConnection.onReadData(AbstractConnection.java:321)
at com.actiontech.dble.net.NIOSocketWR.asyncRead(NIOSocketWR.java:171)
at com.actiontech.dble.net.AbstractConnection.asyncRead(AbstractConnection.java:272)
at com.actiontech.dble.net.NIOReactor$RW.executeKeys(NIOReactor.java:119)
at com.actiontech.dble.net.NIOReactor$RW.run(NIOReactor.java:90)
at java.lang.Thread.run(Thread.java:748)
获取连接的线程通过NewConnectionRespHandler#backConn拿到连接,更新一下连接的借出标志和最后使用时间,然后返回MySQLConnection。
这就是同步从连接池获取连接的逻辑,连接池自动帮你把异步转成了同步。
正常归还
com.actiontech.dble.backend.mysql.nio.MySQLConnection#release
一个典型的jdbc连接池,会返回实现jdbc接口的包装类,这样在你调用java.sql.Connection#close
的时候,其实走的是连接池的实现,连接不会被物理关闭,而是归还到连接池里面去。
在mycat里面,nio纯手工,所以是调用MySQLConnection#release方法来归还连接。
同步借出连接后的归还
自己写了示例使用代码,如下:
PhysicalDatasource datasource = DbleServer.getInstance().getConfig().getDataNodes().get("dn1").getDbPool().getWriteSources()[0];
BackendConnection backendConnection = null;
try {
backendConnection = datasource.getConnection("db1", true, null);
} finally {
if (null != backendConnection) {
backendConnection.release();
}
}
异步借出后的连接归还
异步借出的时候MySQLConnection#release 在ResponseHandler的函数里进行调用,那么仔细看一下com.actiontech.dble.backend.mysql.nio.handler.ResponseHandler的接口定义。
public interface ResponseHandler {
void connectionError(Throwable e, BackendConnection conn);
void connectionAcquired(BackendConnection conn);
void errorResponse(byte[] err, BackendConnection conn);
void okResponse(byte[] ok, BackendConnection conn);
void fieldEofResponse(byte[] header, List<byte[]> fields, List<FieldPacket> fieldPackets, byte[] eof,
boolean isLeft, BackendConnection conn);
boolean rowResponse(byte[] rowNull, RowDataPacket rowPacket, boolean isLeft, BackendConnection conn);
void rowEofResponse(byte[] eof, boolean isLeft, BackendConnection conn);
void writeQueueAvailable();
void connectionClose(BackendConnection conn, String reason);
}
归还的地方其实和mysql协议的命令执行阶段,命令发出后客户端接受到的响应有关系,不同的终点在不同的方法里归还。
- OK包(ResponseHandler#okResponse)
- ERROR包(ResponseHandler#errorResponse)
- Result Set包(在ResponseHandler#rowEofResponse读完结果集后归还)
除此之外,也可以借出后就立马归还,请看本文连接池初始化部分com.actiontech.dble.backend.mysql.nio.handler.GetConnectionHandler
关闭物理连接
物理连接关闭是一个不能忽视的操作,需要进行资源的清理。
需要注意不能重复扣减一些变量,例如连接池的总数等。
连接池的初始化
入口 com.actiontech.dble.backend.datasource.PhysicalDBPool#initSource
首先连接池最小连接数如果设置的比这个【dataHost上关联的的database数量+1】还小的话,例如下面的minCon="1"
,小于关联的4个database。
<dataNode name="dn1" dataHost="localhost1" database="db1"/>
<dataNode name="dn2" dataHost="localhost1" database="db2"/>
<dataNode name="dn3" dataHost="localhost1" database="db3"/>
<dataNode name="dn4" dataHost="localhost1" database="db4"/>
<dataHost balance="0" maxCon="1000" minCon="1" name="localhost1" switchType="1" slaveThreshold="100">
<heartbeat>show slave status</heartbeat>
<writeHost host="hostS1" url="localhost:3306" password="root123" user="root"/>
</dataHost>
那么连接池初始化大小initSize会被设置为【dataHost上关联的的database数量+1】,加1是因为有的sql不需要在mysql上指定database就可以执行,例如SHOW VARIABLES,所以创建一个【database为空】的后端连接,所以上面xml配置对应的initSize会被设置为5。
前面说到异步创建连接是需要一个com.actiontech.dble.backend.mysql.nio.handler.ResponseHandler的,这里使用的是com.actiontech.dble.backend.mysql.nio.handler.GetConnectionHandler
这里的逻辑首先会检查是否有【database为空的连接】,没有的话,就创建一个, GetConnectionHandler#finishedCount的完成数目也会加1.
然后创建initSize-1个后端连接,这里会让连接平均分布在dataHost上关联的每个database上。
for (int i = 0; i < initSize - 1; i++) {
try {
ds.initMinConnection(this.schemas[i % schemas.length], true, getConHandler, null);
} catch (Exception e) {
LOGGER.warn(getMessage(index, " init connection error."), e);
}
}
NIO Sub Reactor线程会回调com.actiontech.dble.backend.mysql.nio.handler.GetConnectionHandler#connectionAcquired
可以看到拿到连接后,没有使用就直接归还到连接池中了。
@Override
public void connectionAcquired(BackendConnection conn) {
successCons.add(conn);
finishedCount.addAndGet(1);
LOGGER.info("connected successfully " + conn);
conn.release();
}
初始化连接池的线程会在默认超时时间内,间隔检查GetConnectionHandler#finishedCount是否达到任务目标数量,不会无限期等待。
连接池的扩容和收缩
(1)出现一些后端连接池异常造成连接被关闭,或者业务繁忙,连接池缺少空闲连接后,在每次获取连接的时候才创建连接会增加业务等待时间,可以考虑一个后台任务去扩容连接到minCon。
(2)在业务高峰的低谷时刻,长期空闲的链接可以被关闭,连接池容量收缩到minCon。
(3)一些执行太久的连接,需要按照超时时间强制关闭。
这一切都是为了保持连接池的可用性。
扩容
低于minCon阈值批量补充空闲连接
后台定时器线程负责检查是否调用PhysicalDatasource#createByIdleLittle方法
DBLE这边默认值是SystemConfig#dataNodeIdleCheckPeriod 5分钟检查一次所有连接池的空闲连接。
"Timer0@3992" daemon prio=5 tid=0x3a nid=NA runnable
java.lang.Thread.State: RUNNABLE
at com.actiontech.dble.backend.datasource.PhysicalDatasource.createByIdleLittle(PhysicalDatasource.java:241)
at com.actiontech.dble.backend.datasource.PhysicalDatasource.connectionHeatBeatCheck(PhysicalDatasource.java:192)
at com.actiontech.dble.backend.datasource.PhysicalDBPool.heartbeatCheck(PhysicalDBPool.java:456)
at com.actiontech.dble.DbleServer$9$1.run(DbleServer.java:916)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
这里会把空闲连接数目按照一定规则和minCon,maxCon对比,判断是否要补充连接。
com.actiontech.dble.backend.datasource.PhysicalDatasource#createByIdleLittle
这里是使用同步获取连接时候使用com.actiontech.dble.backend.mysql.nio.handler.NewConnectionRespHandler
可以看到simpleHandler.getBackConn().release();
,连接拿到后被立马释放。
private void createByIdleLittle(int idleCons, int createCount) {
......................
for (int i = 0; i < createCount; i++) {
NewConnectionRespHandler simpleHandler = new NewConnectionRespHandler();
try {
if (this.createNewCount()) {
// creat new connection
this.createNewConnection(simpleHandler, null, schemas[i % schemas.length]);
simpleHandler.getBackConn().release();
} else {
break;
}
......................
}
}
单次借出连接造成的扩容
前面关键API说过了两种借出方式
收缩
超过minCon阈值的空闲连接进行批量回收
com.actiontech.dble.backend.datasource.PhysicalDatasource#closeByIdleMany
会按照一定规则,确定一些需要被回收的空闲连接,调用ClosableConnection#close关闭连接
DBLE这边默认值是SystemConfig#dataNodeIdleCheckPeriod 5分钟检查一次所有连接池的空闲连接。
单个连接的超时回收
代码位置 com.actiontech.dble.net.NIOProcessor#backendCheck
(1)借出的连接 执行超时
连接池被借出的时间点(MySQLConnection#lastTime ),如果距离现在超过了SystemConfig#sqlExecuteTimeout秒的话(默认300s), 以sql timeout的原因关闭废弃连接。
(2)空闲连接 空闲超时
检查AbstractConnection#lastWriteTime和AbstractConnection#lastReadTime中较近的一次,如果距离现在超过了com.actiontech.dble.config.model.SystemConfig#DEFAULT_IDLE_TIMEOUT(默认30分钟),那么以idle timeout的原因关闭。
连接泄露
连接泄露的定义是某个使用方借出连接后,永远不会归还,这就发生了连接泄露。如果使用方在一定时间内调用频率过高,会直接导致连接池超过连接池的maxCon。
去年我才接手mycat代码时,发现在某个场景下存在严重的连接泄露问题。当时为了减小干扰,我把
idleTimeout增加到很大的时间,这样发现了另外的问题,环境放了一阵后,居然连接池全部满了。
在我分析后,发现是mycat已经修复的一个bug。。。嗯,这肯定功能测试都没测过。
https://github.com/MyCATApache/Mycat-Server/commit/b3198c8c2288014f80f6b3862c0b43f399d2c589
mycat这里NewConnectionRespHandler用来在连接空闲连接不足时,把连接补充到minCon,但是拿到连接后没有归还连接池,所以造成了连接泄露。但是本身有空闲连接超时清理线程,所以idleTimeout增加到很大的时间后才暴露出来。
注意,这里的代码dble已经和mycat很不同了,举这个例子只是说明,mycat系的这种nio线程池,需要谨慎处理连接的归还。
还有你可以看到mycat代码管理的混乱,这个问题居然没有issue记录,导致在我来之前,其他同事也没有发现并且同步过来。如果是你【铁了心拦不住地】要学习mycat代码,我更建议你选择dble!
后端连接生命周期
对象创建
前面提到过com.actiontech.dble.backend.mysql.nio.MySQLDataSource#createNewConnection,现在分析一下流程
com.actiontech.dble.backend.mysql.nio.MySQLConnectionFactory#make 负责构造后端连接对象,返回还未发起连接操作的Native TCP连接 包装类,com.actiontech.dble.backend.mysql.nio.MySQLConnection
看下类图,可以看到后端连接,manger前端连接,server前端连接,还挺清晰的。
image.png
(MySQLConnection) AbstractConnection#handler
前面说过创建连接时的调用链,MySQLConnectionAuthenticator(NIOHandler)-->DelegateResponseHandler匿名内部类(ResponseHandler)-->用户传入的ResponseHandler
对象创建后交给 com.actiontech.dble.net.NIOConnector#postConnect,加入到NIOConnector的待处理队列里connectQueue,NIOConnector的Reactor线程会自己从待处理队列里取出MySQLConnection对象处理。
private void connect(Selector finalSelector) {
AbstractConnection c;
while ((c = connectQueue.poll()) != null) {
try {
SocketChannel channel = (SocketChannel) c.getChannel();
channel.register(finalSelector, SelectionKey.OP_CONNECT, c);
channel.connect(new InetSocketAddress(c.host, c.port));
} catch (Exception e) {
LOGGER.warn("error:", e);
c.close(e.toString());
if (c instanceof BackendAIOConnection) {
((BackendAIOConnection) c).onConnectFailed(e);
}
}
}
}
当TCP连接建立成功,NIOConnector的reactor线程会调用com.actiontech.dble.net.NIOConnector#finishConnect(java.nio.channels.SelectionKey, java.lang.Object),
将建立好的连接交给Sub Reactor(NIOReactor类)处理后续读写事件,这里也是放到NIOReactor.RW#registerQueue的待处理队列里,异步注册。
com.actiontech.dble.backend.mysql.nio.MySQLConnectionAuthenticator#handle
握手认证阶段,处理mysql发来的握手初始化消息,并且发送登陆认证消息给mysql,代码如下:
default:
packet = source.getHandshake();
if (packet == null) {
processHandShakePacket(data);
// send auth packet
source.authenticate();
break;
} else {
throw new RuntimeException("Unknown Packet!");
}
如果mysql返回OK Packet,处理认证成功的响应,并将后续的AbstractConnection#handler设置为com.actiontech.dble.backend.mysql.nio.MySQLConnectionHandler,处理mysql协议的命令执行阶段。
代码如下:
@Override
public void handle(byte[] data) {
try {
switch (data[4]) {
case OkPacket.FIELD_COUNT:
..................................
// execute auth response
source.setHandler(new MySQLConnectionHandler(source));
source.setAuthenticated(true);
..................................
if (listener != null) {
listener.connectionAcquired(source);
}
break;
同时上面调用【DelegateResponseHandler匿名内部类(ResponseHandler)】的connectionAcquired方法,通知到mysql的链接已经建立完成。
最终通过【DelegateResponseHandler匿名内部类(ResponseHandler)】调用到【用户传入的ResponseHandler】,用户可以在ResponseHandler#connectionAcquired里面开始执行sql了。
线程堆栈示例如下:
"$_NIO_REACTOR_BACKEND-1-RW@3423" prio=5 tid=0x2a nid=NA runnable
java.lang.Thread.State: RUNNABLE
at com.actiontech.dble.sqlengine.SQLJob.connectionAcquired(SQLJob.java:87)
at com.actiontech.dble.backend.datasource.PhysicalDatasource.takeCon(PhysicalDatasource.java:335)
at com.actiontech.dble.backend.datasource.PhysicalDatasource.access$200(PhysicalDatasource.java:37)
at com.actiontech.dble.backend.datasource.PhysicalDatasource$1$1.connectionAcquired(PhysicalDatasource.java:355)
at com.actiontech.dble.backend.mysql.nio.MySQLConnectionAuthenticator.handle(MySQLConnectionAuthenticator.java:61)
at com.actiontech.dble.net.AbstractConnection.handle(AbstractConnection.java:262)
at com.actiontech.dble.net.AbstractConnection.onReadData(AbstractConnection.java:321)
at com.actiontech.dble.net.NIOSocketWR.asyncRead(NIOSocketWR.java:171)
at com.actiontech.dble.net.AbstractConnection.asyncRead(AbstractConnection.java:272)
at com.actiontech.dble.net.NIOReactor$RW.executeKeys(NIOReactor.java:119)
at com.actiontech.dble.net.NIOReactor$RW.run(NIOReactor.java:90)
at java.lang.Thread.run(Thread.java:748)
总结
清楚连接池的使用方法和连接的生命周期,有助于在连接池出现连接泄露的时候,从原理上有效的去排查出问题。mycat后端连接池容易出bug,我觉得根因出在mycat代码本身功能组件边界就不清晰,很难拆开来单独用,不好进行细粒度的单元测试。
总的来说,mycat的nio异步后端连接池令我印象深刻,和传统的jdbc bio同步连接池有很大的不同。虽然bio的jdbc也有整合rxjava的非阻塞连接池,但是明显mycat的这种更加纯粹。
网友评论