SOFABolt 的设计非常优雅,灵活性极高,
代码很 clean
!!!在前边的源码分析中,分析了 SOFABolt 1.5.1 的全部源码,具体目录见 SOFABolt 源码分析,本节会高度概括 SOFABolt 的各组件的设计。一、四种通信模型
二、线程池隔离技术
三、高可扩展的协议框架
四、编解码与序列化
五、连接管理与心跳
六、超时与快速失败
七、双工通信
一、四种通信模型
同步 sync:SOFABolt 源码分析4 - Sync 同步通信方式设计
image.png
- 客户端用户线程 user-thread 发出请求(实际上是将 netty task 压到 netty 处理队列中,netty 客户端 worker 线程进行真正的请求发出),然后阻塞等待响应
- 服务端 worker 线程接收请求,根据是否在 IO 线程执行所有操作来决定是否使用一个 Bolt 线程池(或者自定义的线程池)来处理业务
- 服务端返回响应后,客户端 worker 线程接收到响应,将响应转发给 Bolt 线程池(或者自定义的线程池)
- Bolt 线程池(或者自定义的线程池)中的线程将响应设置到相应的 InvokeFuture 中,之后唤醒阻塞的 user-thread
- user-thread 进行反序列化和响应的抽取,最后返回给调用处
单向 oneway:SOFABolt 源码分析5 - Oneway 单向通信方式设计
image.png
- 客户端用户线程 user-thread 发出请求(实际上是将 netty task 压到 netty 处理队列中,netty 客户端 worker 线程进行真正的请求发出),然后 user-thread 就可以做其他事了,对于该请求的处理就结束
- 服务端 worker 线程接收请求,根据是否在 IO 线程执行所有操作来决定是否使用一个 Bolt 线程池(或者自定义的线程池)来处理业务,处理完成之后,结束
客户端要注意控制请求发送的节奏,避免压垮服务端
异步 future:SOFABolt 源码分析6 - Future异步通信方式设计
image.png
future 与 sync 只在“发出请求后是否阻塞等待”处不同,其他相同(包括阻塞点都是 InvokeFuture.waitResponse() 阻塞在一个 CountDownLatch 上)
- 客户端用户线程 user-thread 发出请求(实际上是将 netty task 压到 netty 处理队列中,netty 客户端 worker 线程进行真正的请求发出),然后直接返回一个 RpcResponseFuture 实例(该实例是 InvokeFuture 的代理类),之后 user-thread 就可以去做其他事了,当调用 RpcResponseFuture.get() 时会阻塞等待响应(底层调用的是 InvokeFuture.waitResponse())
- 服务端 worker 线程接收请求,根据是否在 IO 线程执行所有操作来决定是否使用一个 Bolt 线程池(或者自定义的线程池)来处理业务
- 服务端返回响应后,客户端 worker 线程接收到响应,将响应转发给 Bolt 线程池(或者自定义的线程池)
- Bolt 线程池(或者自定义的线程池)中的线程将响应设置到相应的 InvokeFuture 中,之后唤醒阻塞的 user-thread
- user-thread 进行反序列化和响应的抽取,最后返回给调用处
异步 callback:SOFABolt 源码分析7 - Callback 异步通信方式设计
image.png
- 客户端用户线程 user-thread 发出请求(实际上是将 netty task 压到 netty 处理队列中,netty 客户端 worker 线程进行真正的请求发出),之后 user-thread 就可以去做其他事了
- 服务端 worker 线程接收请求,根据是否在 IO 线程执行所有操作来决定是否使用一个 Bolt 线程池(或者自定义的线程池)来处理业务
- 服务端返回响应后,客户端 worker 线程接收到响应,将响应转发给 Bolt 线程池(或者自定义的线程池)
- Bolt 线程池(或者自定义的线程池)中的线程将响应设置到相应的 InvokeFuture 中,取消超时任务,并封装 CallbackTask 回调任务
- 如果自定义的回调函数 InvokeCallback 实现类自己实现了线程池,则使用该线程池来执行 CallbackTask,否则,由当前线程直接执行 CallbackTask。在 CallbackTask 任务中,如果是失败响应,则执行 InvokeCallback#onException 回调函数;如果是成功响应,反序列化响应消息,抽取实际响应信息,然后执行 InvokeCallback#onResponse 回调函数。
二、线程池隔离技术
SOFABolt 源码分析10 - 精细的线程模型的设计
SOFABolt 设计了通用的通信框架线程模型设计(如 “一、四种通信模型” 所示),并且设计了极其精细的线程池定制点,使用这些定制点,可以方便的实现各种维度的线程池隔离
- 为不同的 Protocol 协议指定不同的默认线程池 -
ProcessorManager # defaultExecutor
- 使用不同的线程池来处理请求与响应(SOFABolt 是双工的,以 server 为例,可以处理 client 发来的请求,也可以直接向 client 发起请求,之后需要处理 client 返回的响应,默认情况下,处理 client 发来的请求和响应是使用同一个线程池实现的) -
RemotingProcessor # executor
- 使用不同的线程池来处理不同请求类型的数据 -
UserProcessor # executor
- 使用不同的线程池根据 header 的内容来处理同一种请求类型数据(实际上,可以根据 header 的内容和请求数据类型进行高度定制) -
UserProcessor # executorSelector
请求处理线程池选择流程
image.png
响应处理线程池选择流程
image.png
三、高可扩展的协议框架
协议框架:SOFABolt 源码分析18 - Protocol 私有协议的设计
image.png
每一种协议都会包含四个点:编解码器、业务处理器、心跳管理器以及命令工厂
命令及其处理器框架:
image.png
SOFABolt 源码分析8 - RemotingCommand 命令协议的设计
SOFABolt 源码分析9 - UserProcessor 自定义处理器的设计
- 使用命令模式封装 Request、Response 以及 HeartBeat
- 使用相应的命令处理器处理命令:命令处理器分为两层,一层 RemotingProcessor 用在 remoting 层,SOFABolt 用其来处理 Response、HeartBeat 以及 Request 的分发;另外一层 UserProcessor 用在业务层,用于真正的业务逻辑的处理
四、编解码与序列化
4.1 编解码
image.png
4.1.1 编码
本质:序列化会将业务数据转化为 byte[],编码按照私有协议将 byte[] 写入到 ByteBuf 中
基本流程
- 判断传入的数据是否是 Serializable 类型(该类型由 MessageToByteEncoder 的泛型指定),如果不是,直接传播给 pipeline 中的下一个 handler;否则
- 创建一个 ByteBuf 实例,用于存储最终的编码数据
- 从 channel 的附加属性中获取协议标识 protocolCode,之后从协议管理器中获取相应的 Protocol 对象
- 从 Protocol 对象中获取相应的 CommandEncoder 实现类实例,使用 CommandEncoder 实现类实例按照 SOFABolt 源码分析18 - Protocol 私有协议的设计 所介绍的协议规则将数据写入到第二步创建好的 ByteBuf 实例中
- 如果原始数据是 ReferenceCounted 实现类,则释放原始数据
- 如果 ByteBuf 中有数据了,则传播给 pipeline 中的下一个 handler;否则,释放该 ByteBuf 对象,传递一个空的 ByteBuf 给下一个 handler
4.1.2 解码
本质:将 byte[] 按照私有协议转化为中间数据,再通过反序列化将请求信息转化为业务数据
基本流程
- 创建或者从 Netty 的回收池中获取一个 RecyclableArrayList 实例,用于存储最终的解码数据
- 将传入的 ByteBuf 添加到 Cumulator 累加器实例中
- 之后
不断
的从 ByteBuf 中读取数据:首先解码出 protocolCode,之后从协议管理器中获取相应的协议对象,再从协议对象中获取相应的 CommandDecoder 实现类实例 -Netty 的 ByteToMessageDecoder 具备 accumulate 批量解包能力,可以尽可能的从 socket 里读取字节,然后同步调用 decode 方法,解码出业务对象,并组成一个 List
- 使用 CommandDecoder 实现类实例按照上文所介绍的协议规则进行解码,将解码好的数据放到 RecyclableArrayList 实例中,需要注意的是在解码之前必须先记录当前 ByteBuf 的 readerIndex,如果发现数据不够一个整包长度(发生了拆包粘包问题),则将当前 ByteBuf 的 readerIndex 复原到解码之前,然后直接返回,等待读取更多的数据
- 为了防止发送端发送数据太快导致OOM,会清理 Cumulator 累加器实例或者其空间,将已经读取的字节删除,向左压缩 ByteBuf 空间
- 判断 RecyclableArrayList 中的元素个数,如果是1个,则将这个元素单个发送给 pipeline 的下一个 handler;如果元素大于1个,则将整个 RecyclableArrayList 以 List 形式发送给 pipeline 的下一个 handler。- 这就是 SOFABolt 相较于 Netty 改进的地方,提供了
批量提交
的功能(Netty 本身的做法是循环遍历该 List ,依次提交到 ChannelPipeline 进行处理。Bolt 是将提交的内容从单个 command ,改为整个 List 一起提交,如此能减少 pipeline 的执行次数,同时提升吞吐量。这个模式在低并发场景,并没有什么优势,而在高并发场景下对提升吞吐量有不小的性能提升
)- 回收 RecyclableArrayList 实例
4.2 序列化
SOFABolt 源码分析20 - Serializer 序列化机制设计
image.png
调用入口
- 当发起请求时,例如 invokeSync() 时,RpcRemoting 会先对请求数据进行序列化,之后编码发送
- 当收到请求时,对请求消息进行解码,然后 RpcRequestProcessor 会对解码后的请求数据进行精细的反序列化;
- 处理请求完成之后,RpcRequestProcessor 会对响应消息进行序列化,之后编码发送
- 收到响应消息后,对响应消息进行解码,然后会在 RpcInvokeCallbackListener 或者 RpcResponseResolver 中对解码后的响应消息进行反序列化
五、连接管理与心跳
5.1 连接管理
SOFABolt 源码分析12 - Connection 连接管理设计
image.png
SOFABolt 源码分析13 - Connection 事件处理机制的设计
SOFABolt 源码分析14 - Connection 连接监控机制的设计
- Connection 连接元数据:包裹了 Netty channel 实例
- ConnectionFactory 连接工厂:创建连接、检测连接等
- ConnectionPool 连接池:存储 { uniqueKey, List<Connection> } ,uniqueKey 默认为 ip:port;包含 ConnectionSelectStrategy,从 pool 中选择 Connection
- ConnectionEventHandler 和 ConnectionEventListener:事件处理器和监听器
- ConnectionManager 连接管理器:是对外的门面,包含所有与 Connection 相关的对外的接口操作
- Scanner 扫描器:Bolt 提供的一个统一的扫描器,用于执行一些后台任务
5.2 心跳技术
image.png
客户端基本流程
- 在 15s 内没有读或者写事件,IdleStateHandler 就会发布一个 IdleStateEvent 事件
- HeartbeatHandler 进行该事件的处理:
- 首先从当前 Channel 的附属属性中获取相关的 ProtocolCode
- 再从 ProtocolManager 中获取 ProtocolCode 的 Protocol 实现类
- 再从 Protocol 实现类获取 HeartbeatTrigger 实例,最终调用该实例进行 IdleStateEvent 的处理
- HeartbeatTrigger 处理 IdleStateEvent 事件
- 首先从当前 Channel 的附属属性中获取已经发送心跳但是没有接收到响应的次数 heartbeatTimes,如果 heartbeatTimes 已经大于 3 次,则直接关闭连接,否则
- 从当前 Channel 的附属属性中获取心跳开关,如果关闭了心跳,则直接返回,表示对 IdleStateEvent 不做任何处理;如果开启了心跳
- 创建心跳请求命令 HeartbeatCommand + 创建本次请求的 InvokeFuture 对象 + 将 InvokeFuture 对象加入到当前的 Connection 中
InvokeFuture 中会设置心跳响应回调函数:当接收到了正常的心跳响应后,将 heartbeatTimes 置为 0;否则,将该连接的heartbeatTimes+1
- 使用 Netty 发送 HeartbeatCommand 到服务端
- 设置超时任务(1s内没有接收到心跳响应,则直接返回超时失败响应,实现快速失败)
服务端基本流程
- 在 90s 内没有读或者写事件,IdleStateHandler 就会发布一个 IdleStateEvent 事件(如果客户端还正常,那么在 90s 内,会发送至少 6 次心跳,那么服务端将不会触发 IdleStateEvent 事件)
- ServerIdleHandler 进行该事件的处理:直接关闭连接
心跳处理流程
- 心跳请求的处理:服务端接收到 HeartbeatCommand 后,构造心跳响应 HeartbeatAckCommand,之后使用 Netty 返回 HeartbeatAckCommand 给客户端
- 心跳响应的处理:客户端接收到 HeartbeatAckCommand 后,设置心跳响应消息到 InvokeFuture + 取消超时任务 + 执行 InvokeFuture 中的回调方法
注意
- 只有客户端会主动发送心跳请求;但是双端都会开启空闲检测
- 心跳除了上述应用端提供的这种之外,还有 tcp 提供的 keepAlive
六、超时与快速失败
6.1 超时机制
SOFABolt 的超时分为两种:连接超时和调用超时。
- 连接超时
- 仅客户端可设置,因为只有客户端会建连
- 连接超时时间的设置只作用于建连的时候
- 连接超时机制底层实际上使用的 Netty 的超时参数设置方式
- 调用超时:调用端无论是 RpcClient 还是 RpcServer,除了 oneway 模式下,剩余的三种调用模式 sync / future / callback 的方法都提供了一个必填参数 int timeoutMillis,该参数用来指定调用超时时间
调用超时时间对于同步调用和异步调用不同
- sync:使用
CountDownLatch # boolean await(long timeout, TimeUnit unit)
实现当前线程的实现阻塞等待- future 和 callback:使用 Netty 的 HashedWheelTimer 实现 -
在发出请求的时候创建 timeoutMillis 时间的超时任务 task,当在 timeoutMillis 时间内没有返回响应,则执行 task 中的内容(构造超时响应,返回给调用程序);否则,取消 task
6.2 快速失败(fail-fast)机制
- 仅用在被调用端,即请求的处理端
- fail-fast 机制指的是在序列化全部内容之前,会做一个判断,如果处理当前请求用户业务逻辑处理器 UserProcessor 开启了 fail-fast 功能,并且此时已经超时,并且不是 oneway 模式(oneway 没有超时概念),则直接丢弃该请求,不再进行后续的序列化和业务逻辑处理操作
七、双工通信
image.png
SOFABolt 提供了双工通信能力,使得不仅客户端可以调用服务端,服务端也可以主动调用客户端(当然,客户端也就需要可以注册 UserProcessor 的功能)
结语:到目前为止,SOFABolt 源码就分析完成了,整个框架设计的非常优雅,代码非常 clean,扩展点极其灵活。但是无论多么牛逼的框架,总会存在一些改进的地方,例如 SOFABolt
网友评论