MyRequest request = new MyRequest(); request.setReq("hello, bolt-server"); client.oneway("127.0.0.1:8888", request);
注意:oneway 没有请求超时的概念,所以其调用api中没有 int timeoutMillis 参数。但是连接超时时间是有的。
一、线程模型图
image.png总结:
- 客户端用户线程 user-thread 发出请求(实际上是将 netty task 压到 netty 处理队列中,netty 客户端 worker 线程进行真正的请求发出),然后 user-thread 就可以做其他事了,对于该请求的处理就结束
- 服务端 worker 线程接收请求,根据是否在 IO 线程执行所有操作来决定是否使用一个 Bolt 线程池(或者自定义的线程池)来处理业务,处理完成之后,结束
二、代码执行流程梯形图
2.1 客户端发出请求
-->RpcClient.oneway(String addr, Object request)
-->RpcClientRemoting.oneway(String addr, Object request, InvokeContext invokeContext) // invokeContext = null
-->Url url = this.addressParser.parse(addr) // 将 addr 转化为 Url
-->RpcClientRemoting.oneway(Url url, Object request, InvokeContext invokeContext)
<!-- 一、获取连接 -->
-->Connection conn = getConnectionAndInitInvokeContext(url, invokeContext)
<!-- 二、检查连接 -->
-->this.connectionManager.check(conn) // 校验 connection 不为 null && channel 不为 null && channel 是 active 状态 && channel 可写
<!-- 三、创建请求对象 -->
-->RpcCommandFactory.createRequestCommand(Object requestObject)
-->new RpcRequestCommand(Object request) // 设置唯一id + 消息类型为 Request(还有 Response 和 heartbeat)+ MyRequest request
-->command.serialize() // 序列化
-->requestCommand.setType(RpcCommandType.REQUEST_ONEWAY) // 设置请求响应模型
<!-- 四、发起请求 -->
-->BaseRemoting.oneway(Connection conn, RemotingCommand request, int timeoutMillis)
<!-- 4.1 使用 netty 发送消息 -->
-->conn.getChannel().writeAndFlush(request) // netty发送消息
关于连接 Connection 相关的,放在《Connection 连接设计》章节分析,此处跳过;
总结:
- 获取连接
- 检查连接
- 使用 RpcCommandFactory 创建请求对象 + 序列化 + 设置请求响应模型为 REQUEST_ONEWAY
- 使用 netty 发送消息
2.2 服务端处理请求
RpcHandler.channelRead(ChannelHandlerContext ctx, Object msg)
<!-- 一、创建上下文 -->
-->new InvokeContext()
-->new RemotingContext(ChannelHandlerContext ctx, InvokeContext invokeContext, boolean serverSide, ConcurrentHashMap<String, UserProcessor<?>> userProcessors)
<!-- 二、根据 channel 中的附加属性获取相应的 Protocol,之后使用该 Protocol 实例的 CommandHandler 处理消息 -->
-->RpcCommandHandler.handle(RemotingContext ctx, Object msg)
<!-- 2.1 从 CommandHandler 中获取 CommandCode 为 REQUEST 的 RemotingProcessor 实例 RpcRequestProcessor,之后使用 RpcRequestProcessor 进行请求处理-->
-->RpcRequestProcessor.process(RemotingContext ctx, RpcRequestCommand cmd, ExecutorService defaultExecutor)
<!-- 2.1.1 反序列化clazz(感兴趣key),用于获取相应的UserProcessor;如果相应的UserProcessor==null,创建异常响应,发送给调用端,否则,继续执行 -->
-->反序列化 clazz
<!--
2.1.2 如果 userProcessor.processInIOThread()==true,直接对请求进行反序列化,然后创建ProcessTask任务,最后直接在当前的netty worker线程中执行ProcessTask.run();
否则,如果用户自定义了ExecutorSelector,则从众多的自定义线程池选择一个线程池,如果没定义,则使用自定义的线程池userProcessor.getExecutor(),如果最后没有自定义的线程池,则使用ProcessorManager的defaultExecutor,
来执行ProcessTask.run()
-->
<!-- ProcessTask.run() -->
-->RpcRequestProcessor.doProcess(RemotingContext ctx, RpcRequestCommand cmd)
-->反序列化header、content
-->dispatchToUserProcessor(RemotingContext ctx, RpcRequestCommand cmd)
<!-- 构造用户业务上下文 -->
-->new DefaultBizContext(remotingCtx)
<!-- 使用用户自定义处理器处理请求 -->
-->MyServerUserProcessor.handleRequest(BizContext bizCtx, MyRequest request)
<!-- 创建响应 -->
-->RemotingCommand response = RpcCommandFactory.createResponse(Object responseObject, RemotingCommand requestCmd) // 这里将response.id = requestCmd.id
<!-- sync:序列化响应并发送 oneway:logger.debug -->
-->RpcRequestProcessor.sendResponseIfNecessary(RemotingContext ctx, byte type, RemotingCommand response)
-->logger.debug
总结:
服务端处理请求步骤与Sync模式几乎完全相同,只是最后Sync会 “构造响应 + 序列化响应 + 发送响应”;而 oneway 只会 “构造响应”(
实际上,也不应该构造响应
)
- 创建 InvokeContext 和 RemotingContext
- 根据 channel 中的附加属性获取相应的 Protocol,之后使用该 Protocol 实例的 CommandHandler 处理消息
- 从 CommandHandler 中获取 CommandCode 为 REQUEST 的 RemotingProcessor 实例 RpcRequestProcessor,之后使用 RpcRequestProcessor 进行请求处理
- 反序列化clazz(感兴趣key),用于获取相应的UserProcessor;如果相应的 UserProcessor==null,创建异常响应,发送给调用端,否则,继续执行
- 如果 userProcessor.processInIOThread()==true,直接对请求进行反序列化,然后创建 ProcessTask 任务,最后直接在当前的 netty worker 线程中执行 ProcessTask.run();
否则,如果用户 UserProcessor 自定义了 ExecutorSelector,则从众多的自定义线程池选择一个线程池,如果没定义,则使用 UserProcessor 自定义的线程池 userProcessor.getExecutor(),如果还没有,则使用 RemotingProcessor 自定义的线程池 executor,如果最后没有自定义的线程池,则使用 ProcessorManager 的defaultExecutor,来执行ProcessTask.run()
- 反序列化 header、content(如果用户自定义了 ExecutorSelector,则header的反序列化需要提前,header 会作为众多自定义线程池的选择参数)
- 构造用户业务上下文 DefaultBizContext
- 使用用户自定义处理器处理请求
网友评论