sofa-bolt是蚂蚁开源的一款基于Netty的网络通信框架。在Netty的基础上对网络编程常见问题进行了一层简单封装,让中间件开发者更关注于中间件产品本身。项目地址:sofa-bolt
大体功能为:
- 连接管理
- 请求处理
在网络编程时,IO模型的选择自然是首先需要考虑的,sofa-bolt作为蚂蚁内部中间件底层框架,当然是服务于高性能,高并发的场景。这种场景在Java体系下,必然选择非阻塞IO复用了,那么自然选择基于Netty进行开发
连接管理
连接管理,通常需要处理:
- 连接创建与销毁
- 心跳管理
- 空闲连接管理
- 断线重连
- 慢连接处理
- 作为一个框架,当然还需要把各种连接事件分派给用户进行定制
第一个需要明确的是sofa-bolt使用的长连接,这很明显,因为它服务于相对稳定的内部网络,同时对RT有比较高的要求。另外,长连接排查问题也相对更有迹可循
连接创建
可以用socket(localIp,localPort, remoteIp,remotePort )代表一个连接,在Netty中用Channel来表示,在sofa-bolt使用Connection对象来抽象一个连接,一个连接在client跟server端各用一个connection对象表示
有了Connection这个抽象之后,自然的需要提供接口来管理Connection, 这个接口就是ConnectionFactory 那么Connection是如何跟Netty进行联动呢,我们知道在Netty中,client连接到server后,server会回调initChannel
方法,在这个方法我们会初始化各种事件handler, sofa-bolt就在这里创建Connection,并在Netty的Channel对象上打上Connection标,后续通过Channel就可以直接找到这个Connection
protected void initChannel(SocketChannel channel) {
...
createConnection(channel);
}
private void createConnection(SocketChannel channel) {
new Connection(channel,..);
...
}
Connection(channel, ...){
this.channel.attr(CONNECTION).set(this);
}
客户端的处理方式为:
ChannelFuture future = bootstrap.connect(...);
future.awaitUninterruptibly();
Channel channel = future.channel();
Connection conn = new Connection(channel,...);
连接事件分派
sofa-bolt是一个底层的通信框架,需要将连接事情分派给用户自行处理,在sofa-bolt中连接事件有:
- CREATE:连接创建
-
CLOSE:连接销毁
有ConnectionEventHandler转给ConnectionEventListener,再有ConnectionEventListener分派给用户自定义的ConnectionEventProcessor,为啥会有一个listen不得而知。
因为用户的事件逻辑不可预知,放到了一个线程池中运行,防止阻塞IO
心跳
因为使用的是长连接,所以需要知道连接是否还正常,有多种情况可以导致系统认为连接还处在,但是实际上该连接已经死亡
- 中间某个链路断了,此时发送数据要么timeout,要么返回主机不可达
- 对端主机崩溃,同上
- 对端主机崩溃后又重启了服务,返回RST报文
Tcp本身提供了KeepAlive机制,但使用这个功能需要操作系统做相关配置,同时,它不能发现应用进程导致的连接问题。比如,进程还在,但是卡在IO上,或者再FullGC又或者各种原因已经假死了,这个时候实际上也已经无法响应请求了。
所以,通常应用层会自己去做心跳功能,其主要目的不仅仅为了探测网络是否正常,更重要的是探测对端服务进程是否还能正常提供服务。
bolt采用client端定时发送心跳请求,服务端收到了响应心跳,使用了是Netty的IdleHandler机制,具体的心跳协议就不做描述了。
pipeline.addLast("heartbeatHandler", heartbeatHandler);
## heartbeatHandler
public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
ProtocolCode protocolCode = ctx.channel().attr(Connection.PROTOCOL).get();
Protocol protocol = ProtocolManager.getProtocol(protocolCode);
protocol.getHeartbeatTrigger().heartbeatTriggered(ctx);
} else {
super.userEventTriggered(ctx, evt);
}
}
# 心跳发送伪码
heartbeatTriggered()
boolean ack = send()
if ack==false:
failcount++
if failcount > maxCount:
closeConn()
else
failcount = 0
说到Netty的IdleState其实就是在每个连接channelActive
创建了最多3个(write,read,all)任务放到NioEventLoop的任务队列里去。这对于客户端来说,连接比较少,压力应该不大
空闲连接管理
通常一个服务端需要维护很多连接,如果一个连接很长一段时间啥也没干(心跳算不算呢? bolt好像算),那么维持这个连接的收益也不大还白白浪费了服务器资源,那么需要把它关掉。通常做法也有两种:
- 利用Netty的IdleState,每个连接一个定时任务。bolt采用的就是这种方式,简单方便,连接不是巨多特别适用
- 还有一种是自己写一个定时任务,用时间轮管理所有空闲连接,比如muduo
对于Client来说,它会不停发心跳维持连接,但是如果一直没干正事,光在那发心跳包也是蛮浪费,所以有些框架client端也会close这种空闲连接,但是在bolt中好像没有看到
慢连接
这里的慢连接是指,接受请求的一方处理比较慢,比如消息的Consumer方处理特别慢(过载,DB走错索引,bug导致FullGC等等)
Consumer方线程模型会导致不同的情况:
- IO线程直接做业务处理,那么IO线程会被阻塞,消息没有被read,导致数据堆积到tcp的接收缓冲区,进一步导致发送方的发送缓冲器满,消息堆积到应用层的buffer中(Netty的ChannelOutboundBuffer),不做处理的话,导致发送方触发大量GC,直到OOM
- 通常的模式是,IO线程收到数据,解包后交个一个业务线程池处理。这里又有两种case。 第一是业务线程池采用无界队列,那么请求对象会一直堆积到队列中,直到OOM。第二是采用的有界队列,队列满后,采用拒绝策略,直接抛异常
所以,发送方跟接收方都需要处理慢连接,而处理方式就是在发送数据的时候,看下目前堆积在应用层(Netty的ChannelOutboundBuffer)的待发送数据还有多少, 在Netty已经提供了一整套完整的水位控制机制,我们只需要设置好高低水位,然后再发送数据的时候,判断下是否可以发送即可(channel.isWritable
), bolt直接采用了该方法
断线自动重连
Rpc框架会在请求的时候获取连接,获取不到或者连接失效了,重连一次即可,一般也不需要自动重连。但是bolt不仅仅为rpc服务,所以它提供了自动重连的功能,根据根据实际场景选择是否开启。
重连实际上是一个比较简单的事,只需要记住一定要重用bootstrap跟workgroup即可。
它的入口是channelInactive
,在这里把重连事件放到一个任务队列去跑即可
请求处理
对于bolt这种网络框架而已,请求的处理涉及到协议的设计,如何将请求转交给用户的代码进行等等问题。
回调超时机制
nio中io跟业务调用天生是异步,所谓的同步也只是框架在biz线程中调用了future,get(timeout)
,biz线程会阻塞等待,如果超时了,框架会移除相关的数据。
那,还有一种callback
的回调方式,我们的业务向框架注册了一个回调函数,框架收到IO响应后执行回调代码。发现没有,回调的时机是收到IO响应,那如果IO一直没有响应呢,或者想要一个超时的控制呢,所以框架提供一个后台的线程去扫描这些callback是否超时了。
翻了Dubbo等框架发现并没有类似的功能(可能没仔细没找到),solf-bolt提供了这个功能,使用Netty自带的HashedWheelTimer
, 也就是时间轮定时器。在设置进行一个callback调用时,将这个callback任务放到HashedWheelTimer队列中,有HashedWheelTimer的后台线程去扫描是否超时
网友评论