SOFABolt 设计了通用的通信框架线程模型设计(如下图所示),并且设计了极其精细的线程池定制点,使用这些定制点,我们可以方便的实现
- 为不同的 Protocol 指定不同的默认线程池
- 使用不同的线程池来处理请求与响应(SOFABolt 是双工的,以 server 为例,可以处理 client 发来的请求,也可以直接向 client 发起请求,之后需要处理 client 返回的响应,默认情况下,处理 client 发来的请求和响应是使用同一个线程池实现的)
- 使用不同的线程池来处理不同请求类型的数据
- 使用不同的线程池根据 header 的内容来处理同一种请求类型数据(实际上,可以根据 header 的内容和请求数据类型进行高度定制)
一、通用的线程模型
![](https://img.haomeiwen.com/i5842684/9dc62b7cc1338ab6.png)
以默认线程池配置情况下的 Sync 同步调用模式为例,四种调用类型的默认线程池配置情况下的线程模型图详细分析见:
SOFABolt 源码分析4 - Sync 同步通信方式的设计
SOFABolt 源码分析5 - Oneway 单向通信方式的设计
SOFABolt 源码分析6 - Future异步通信方式的设计
SOFABolt 源码分析7 - Callback 异步通信方式的设计
二、精细的线程池定制点
SOFABolt 包含两类线程池:
Netty IO 线程池:
服务端:boss 线程池 + worker 线程池
客户端:worker 线程池
SOFABolt 线程池
附:由于 SOFABolt 是全双工的,不仅 client 可以向 server 发请求,server 也可以直接向 client 发请求。所以两端的线程池种类是相同的,但是从请求处理和响应处理的角度来讲,是不同的。
- 处理请求:
UserProcessor#ExecutorSelector
UserProcessor#getExecutor()
RemotingProcessor#executor
ProcessorManager#defaultExecutor- 处理响应:
RemotingProcessor#executor
ProcessorManager#defaultExecutor
请求处理线程池选择流程
![](https://img.haomeiwen.com/i5842684/eb44daa78b731681.png)
步骤:
- 首先 netty worker 线程池接收请求消息进行处理,如果消息是 List类型(是批量提交过来的 - 具体见 AbstractBatchDecoder 解码处理器),则进行2,否则(是单条消息)进行3
- 如果“bolt.rpc.dispatch-msg-list-in-default-executor”设置为 true(默认为true),则使用 ProcessorManager#defaultExecutor 线程池进行3的处理,否则,直接在当前线程中循环 List 消息列表,进行3的处理
- 接下来就要选择线程池来做反序列化以及具体业务逻辑的处理了,先判断 userProcessor.processInIOThread() == true,默认为 false,如果是,直接在当前线程(如果是单条消息,是 worker 线程,如果是 List 消息,且使用了 ProcessorManager#defaultExecutor 进行消息的处理,则当前线程池defaultExecutor中的线程)进行处理;如果不是
- 判断 userProcessor.getExecutorSelector() != null,如果不是 null,则使用这个自己实现的线程选择器来选择(根据请求数据类型和 header 中的内容)自定义的线程池;如果为 null,则
- 判断 userProcessor.getExecutor() != null,如果不是 null,则使用 userProcessor.getExecutor() 这个自定义的线程池;如果为 null,则
- 判断 RemotingProcessor.getExecutor() != null,如果不是 null,则使用 RemotingProcessor.getExecutor() 这个自定义的线程池,如果为 null,则
- 使用 ProcessorManager#defaultExecutor 默认线程池来处理业务逻辑
- 最后如果 userProcessor instanceof AsyncUserProcessor,则通常会使用我们内置的线程池进行业务逻辑的处理,当然也可以不用线程池,由业务方自己决定
重要
- 如果消息是List类型,推荐使用 ProcessorManager#defaultExecutor 线程池(也是默认配置),这样可以不用占用worker IO线程
- 推荐 userProcessor.processInIOThread() 设置为 false,反序列化和业务逻辑处理交给具体的线程池进行处理,永远不要阻塞 netty IO 线程
- userProcessor.getExecutorSelector() 的设置可以实现: 使用不同的线程池根据 header 的内容来处理同一种请求类型数据(实际上,可以根据 header 的内容和请求数据类型进行高度定制)
- userProcessor.getExecutor() 的设置可以实现:使用不同的线程池来处理不同请求类型的数据(因为不同的请求类型可以使用不同的 UserProcessor,每一个 UserProcessor 都可以设置自己的 executor)
- RemotingProcessor.getExecutor() 的设置可以实现:使用不同的线程池来处理请求与响应(SOFABolt 是双工的,以 server 为例,可以处理 client 发来的请求,也可以直接向 client 发起请求,之后需要处理 client 返回的响应,默认情况下,处理 client 发来的请求和响应是使用同一个线程池实现的)
- ProcessorManager#defaultExecutor 可以进行全局设置,即对所有的 Protocol 中的 ProcessorManager#defaultExecutor 都有效(默认),也可以单独为不同的 Protocol 设置不同的 ProcessorManager#defaultExecutor
总结
从上述分析可以知道,按照下边的线程池定义的顺序,越往上,线程池隔离的粒度越细。UserProcessor#ExecutorSelector
UserProcessor#getExecutor() 用户处理器自定义线程池
RemotingProcessor#executor
ProcessorManager#defaultExecutor
响应处理线程池选择流程
![](https://img.haomeiwen.com/i5842684/16428106ea03ba55.png)
步骤:
与请求处理线程池的选择流程不同,响应不需要 UserProcessor 的处理,所以没有 UserProcessor 相关的内容,且反序列化和响应业务的处理只能在新的线程池中进行,不能在 IO 线程进行。
- 首先 netty worker 线程池接收请求消息进行处理,如果消息是 List类型(是批量提交过来的 - 具体见 AbstractBatchDecoder 解码处理器),则进行2,否则(是单条消息)进行3
- 如果“bolt.rpc.dispatch-msg-list-in-default-executor”设置为 true(默认为true),则使用 ProcessorManager#defaultExecutor 线程池进行3的处理,否则,直接在当前线程中循环 List 消息列表,进行3的处理
- 接下来就要选择线程池来做反序列化以及具体业务逻辑的处理了,判断 RemotingProcessor.getExecutor() != null,如果不是 null,则使用 RemotingProcessor.getExecutor() 这个自定义的线程池,如果为 null,则
- 使用 ProcessorManager#defaultExecutor 默认线程池来处理业务逻辑
对于响应:
RemotingProcessor#executor
ProcessorManager#defaultExecutor
总结:不管对于请求处理的线程池还是对于响应处理的线程池,其实都是
Processor 没有定义线程池,则使用 ProcessorManager 的默认线程池。
2.1 ProcessorManager 默认线程池
2.1.1 全局设置 ProcessorManager 默认线程池
注意:设置是全局的,对所有协议有效
在创建 Protocol 实例的时候,会创建 RpcCommandHandler 实例。
=============================== RpcCommandHandler =================================
public RpcCommandHandler(CommandFactory commandFactory) {
this.commandFactory = commandFactory;
this.processorManager = new ProcessorManager();
...
}
=============================== ProcessorManager =================================
private ExecutorService defaultExecutor;
private int minPoolSize = ConfigManager.default_tp_min_size();
private int maxPoolSize = ConfigManager.default_tp_max_size();
private int queueSize = ConfigManager.default_tp_queue_size();
private long keepAliveTime = ConfigManager.default_tp_keepalive_time();
public ProcessorManager() {
defaultExecutor = new ThreadPoolExecutor(minPoolSize, maxPoolSize, keepAliveTime,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(queueSize), new NamedThreadFactory(
"Bolt-default-executor", true));
}
设置参数的方式:(系统属性级别,所以具有两种设置方式)
-Dxxx = 88
System.setProperty("xxx", "qqq")
- minPoolSize 最小线程数,默认 20
-Dbolt.tp.min = 20
System.setProperty("bolt.tp.min", "20") 或者 key 指定为 Configs.TP_MIN_SIZE
- maxPoolSize 最大线程数,默认 400
-Dbolt.tp.max = 400
System.setProperty(Configs.TP_MAX_SIZE, "400")
- queueSize 队列大小,默认 600
-Dbolt.tp.queue = 600
System.setProperty(Configs.TP_QUEUE_SIZE, "600")
- keepAliveTime 线程保活时间,默认 60s
-Dbolt.tp.keepalive = 60
System.setProperty(Configs.TP_KEEPALIVE_TIME, "60")
2.1.2 为不同的协议设置不同的 ProcessorManager 默认线程池
RpcServer server = new RpcServer(8888);
server.start(); // 启动服务
server.registerDefaultExecutor(RpcProtocol.PROTOCOL_CODE, Executors.newCachedThreadPool());
server.registerDefaultExecutor(RpcProtocolV2.PROTOCOL_CODE, Executors.newCachedThreadPool());
注意:线程池的设置要在 RpcServer 启动之后。原因是:registerDefaultExecutor 方法的底层是调用 ProcessorManager#registerDefaultExecutor,ProcessorManager 的设置是在 RpcServer#initRpcRemoting() 中进行初始化的,RpcServer#initRpcRemoting() 是在 RpcServer.start() 中进行的。
疑问:线程池的设置要在 RpcServer 启动之后,如果在 RpcServer 启动之后在设置 registerDefaultExecutor 之前来了一个请求,请求的处理就放在原始的默认线程池去做了,与用户的初衷不同。所以,SOFABolt 的 init() 和 start() 应该分开,提供类似于如下的代码,这样我们就可以在 init() 之后进行线程池的设置了,设置完成之后再 start()。这个问题会在 SOFABolt 1.6.0解决:https://github.com/alipay/sofa-bolt/issues/100
init() {
doInit();
}
startOnly() {
doStart();
}
start() {
init();
startOnly();
}
源码分析
================================== RpcServer ==================================
public void registerDefaultExecutor(byte protocolCode, ExecutorService executor) {
ProtocolManager.getProtocol(ProtocolCode.fromBytes(protocolCode)).getCommandHandler().registerDefaultExecutor(executor);
}
================================== RpcCommandHandler ==================================
ProcessorManager processorManager;
public void registerDefaultExecutor(ExecutorService executor) {
this.processorManager.registerDefaultExecutor(executor);
}
================================== ProcessorManager ==================================
private ExecutorService defaultExecutor;
public void registerDefaultExecutor(ExecutorService executor) {
this.defaultExecutor = executor;
}
2.2 设置 RemotingProcessor 自定义线程池
RpcServer server = new RpcServer(8888);
server.start(); // 启动服务
server.registerProcessor(RpcProtocol.PROTOCOL_CODE, RpcCommandCode.RPC_REQUEST, new RpcRequestProcessor(Executors.newCachedThreadPool()));
注意:线程池的设置要在 RpcServer 启动之后。
2.3 设置 UserProcessor 自定义线程池
public class MyServerUserProcessor extends SyncUserProcessor<MyRequest> {
@Override
public Object handleRequest(BizContext bizCtx, MyRequest request) throws Exception {
......
}
@Override
public String interest() {
return MyRequest.class.getName();
}
/**
* UserProcessor 线程池
*/
@Override
public Executor getExecutor() {
return Executors.newCachedThreadPool();
}
}
2.4 设置 UserProcessor 自定义线程池选择器
需求:实现在请求类型都是 MyRequest 的情况下,对于“serviceA”服务,使用单独的线程池处理业务逻辑,其他服务使用另外的线程池。
线程池选择器
public class MyExecutorSelector implements UserProcessor.ExecutorSelector {
public static final String SERVICE_A = "serviceA";
// 专门为 serviceA 设置的线程池
private ThreadPoolExecutor executor0;
// 其他业务的线程池
private ThreadPoolExecutor executor1;
public MyExecutorSelector() {
executor0 = (ThreadPoolExecutor) Executors.newCachedThreadPool();
executor1 = (ThreadPoolExecutor) Executors.newCachedThreadPool();
}
@Override
public Executor select(String requestClass, Object requestHeader) {
Assert.assertNotNull(requestClass);
Assert.assertNotNull(requestHeader);
if (StringUtils.equals(SERVICE_A, (String) requestHeader)) {
return executor0;
} else {
return executor1;
}
}
}
自定义 header 序列化器
public class MyCustomHeaderSerializer extends DefaultCustomSerializer {
/**
* 序列化请求头
* 1. 将自定义信息写入 InvokeContext
* 2. 从 InvokeContext 中获取想要的信息(例如,serviceKey)并写入header
*/
@Override
public <T extends RequestCommand> boolean serializeHeader(T request, InvokeContext invokeContext) throws SerializationException {
if (request instanceof RpcRequestCommand) {
RpcRequestCommand requestCommand = (RpcRequestCommand) request;
String serviceKey = invokeContext.get("serviceKey");
if (StringUtils.isNotBlank(serviceKey)) {
try {
requestCommand.setHeader(serviceKey.getBytes("UTF-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
return true;
}
return false;
}
/**
* 反序列化请求头
*/
@Override
public <T extends RequestCommand> boolean deserializeHeader(T request) throws DeserializationException {
if (request instanceof RpcRequestCommand) {
RpcRequestCommand requestCommand = (RpcRequestCommand) request;
byte[] header = requestCommand.getHeader();
try {
requestCommand.setRequestHeader(new String(header, "UTF-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return true;
}
return false;
}
}
服务端
public class MyServer {
static {
// 1. 注册自定义 header 序列化器
CustomSerializerManager.registerCustomSerializer(MyRequest.class.getName(), new MyCustomHeaderSerializer());
}
static RpcServer server = null;
public static boolean start() {
server = new RpcServer(8888);
// 2. 设置自定义的线程池选择器
MyServerUserProcessor processor = new MyServerUserProcessor();
processor.setExecutorSelector(new MyExecutorSelector());
server.registerUserProcessor(processor);
return server.start();
}
public static void main(String[] args) {
if (MyServer.start()) {
System.out.println("server start success!");
} else {
System.out.println("server start fail!");
}
}
}
客户端
public class MyClient {
static {
// 1. 注册 header 自定义序列化器
CustomSerializerManager.registerCustomSerializer(MyRequest.class.getName(), new MyCustomHeaderSerializer());
}
private static RpcClient client;
public static void start() {
client = new RpcClient();
client.init();
}
public static void main(String[] args) throws RemotingException, InterruptedException {
MyClient.start();
// 2. 设置 context,在 MyCustomHeaderSerializer 中会将该兴趣的"serviceKey"的值序列化到 header 中
InvokeContext context = new InvokeContext();
context.put("serviceKey", "serviceA");
MyRequest request = new MyRequest();
request.setReq("hello, bolt-server");
MyResponse response = (MyResponse) client.invokeSync("127.0.0.1:8888", request, context,300 * 1000);
}
}
关于序列化相关的内容,在《序列化的设计》进行分析。
网友评论