美文网首页sofabolt
SOFABolt 源码分析5 - Oneway 单向通信方式的设

SOFABolt 源码分析5 - Oneway 单向通信方式的设

作者: 原水寒 | 来源:发表于2018-10-05 14:38 被阅读88次
 MyRequest request = new MyRequest();
 request.setReq("hello, bolt-server");
 client.oneway("127.0.0.1:8888", request);

注意:oneway 没有请求超时的概念,所以其调用api中没有 int timeoutMillis 参数。但是连接超时时间是有的。

一、线程模型图

image.png

总结:

  1. 客户端用户线程 user-thread 发出请求(实际上是将 netty task 压到 netty 处理队列中,netty 客户端 worker 线程进行真正的请求发出),然后 user-thread 就可以做其他事了,对于该请求的处理就结束
  2. 服务端 worker 线程接收请求,根据是否在 IO 线程执行所有操作来决定是否使用一个 Bolt 线程池(或者自定义的线程池)来处理业务,处理完成之后,结束

二、代码执行流程梯形图

2.1 客户端发出请求

-->RpcClient.oneway(String addr, Object request)
  -->RpcClientRemoting.oneway(String addr, Object request, InvokeContext invokeContext) // invokeContext = null
    -->Url url = this.addressParser.parse(addr) // 将 addr 转化为 Url
    -->RpcClientRemoting.oneway(Url url, Object request, InvokeContext invokeContext)
      <!-- 一、获取连接 -->
      -->Connection conn = getConnectionAndInitInvokeContext(url, invokeContext)
      <!-- 二、检查连接 -->
      -->this.connectionManager.check(conn) // 校验 connection 不为 null && channel 不为 null && channel 是 active 状态 && channel 可写
      <!-- 三、创建请求对象 -->
      -->RpcCommandFactory.createRequestCommand(Object requestObject)
        -->new RpcRequestCommand(Object request) // 设置唯一id + 消息类型为 Request(还有 Response 和 heartbeat)+ MyRequest request
        -->command.serialize() // 序列化
      -->requestCommand.setType(RpcCommandType.REQUEST_ONEWAY) // 设置请求响应模型
      <!-- 四、发起请求 -->
      -->BaseRemoting.oneway(Connection conn, RemotingCommand request, int timeoutMillis)
        <!-- 4.1 使用 netty 发送消息 -->
        -->conn.getChannel().writeAndFlush(request) // netty发送消息

关于连接 Connection 相关的,放在《Connection 连接设计》章节分析,此处跳过;

总结:

  1. 获取连接
  2. 检查连接
  3. 使用 RpcCommandFactory 创建请求对象 + 序列化 + 设置请求响应模型为 REQUEST_ONEWAY
  4. 使用 netty 发送消息

2.2 服务端处理请求

RpcHandler.channelRead(ChannelHandlerContext ctx, Object msg)
<!-- 一、创建上下文 -->
-->new InvokeContext()
-->new RemotingContext(ChannelHandlerContext ctx, InvokeContext invokeContext, boolean serverSide, ConcurrentHashMap<String, UserProcessor<?>> userProcessors)
<!-- 二、根据 channel 中的附加属性获取相应的 Protocol,之后使用该 Protocol 实例的 CommandHandler 处理消息 -->
-->RpcCommandHandler.handle(RemotingContext ctx, Object msg)
  <!-- 2.1 从 CommandHandler 中获取 CommandCode 为 REQUEST 的 RemotingProcessor 实例 RpcRequestProcessor,之后使用 RpcRequestProcessor 进行请求处理-->
  -->RpcRequestProcessor.process(RemotingContext ctx, RpcRequestCommand cmd, ExecutorService defaultExecutor)
    <!-- 2.1.1 反序列化clazz(感兴趣key),用于获取相应的UserProcessor;如果相应的UserProcessor==null,创建异常响应,发送给调用端,否则,继续执行 -->
    -->反序列化 clazz
    <!-- 
        2.1.2 如果 userProcessor.processInIOThread()==true,直接对请求进行反序列化,然后创建ProcessTask任务,最后直接在当前的netty worker线程中执行ProcessTask.run();
              否则,如果用户自定义了ExecutorSelector,则从众多的自定义线程池选择一个线程池,如果没定义,则使用自定义的线程池userProcessor.getExecutor(),如果最后没有自定义的线程池,则使用ProcessorManager的defaultExecutor,
              来执行ProcessTask.run()
    -->
      <!-- ProcessTask.run() -->
      -->RpcRequestProcessor.doProcess(RemotingContext ctx, RpcRequestCommand cmd)
        -->反序列化header、content
        -->dispatchToUserProcessor(RemotingContext ctx, RpcRequestCommand cmd)
          <!-- 构造用户业务上下文 -->
          -->new DefaultBizContext(remotingCtx)
          <!-- 使用用户自定义处理器处理请求 -->
          -->MyServerUserProcessor.handleRequest(BizContext bizCtx, MyRequest request)
          <!-- 创建响应 -->
          -->RemotingCommand response = RpcCommandFactory.createResponse(Object responseObject, RemotingCommand requestCmd) // 这里将response.id = requestCmd.id
          <!-- sync:序列化响应并发送 oneway:logger.debug -->
          -->RpcRequestProcessor.sendResponseIfNecessary(RemotingContext ctx, byte type, RemotingCommand response)
            -->logger.debug

总结:

服务端处理请求步骤与Sync模式几乎完全相同,只是最后Sync会 “构造响应 + 序列化响应 + 发送响应”;而 oneway 只会 “构造响应”(实际上,也不应该构造响应

  1. 创建 InvokeContext 和 RemotingContext
  1. 根据 channel 中的附加属性获取相应的 Protocol,之后使用该 Protocol 实例的 CommandHandler 处理消息
  • 从 CommandHandler 中获取 CommandCode 为 REQUEST 的 RemotingProcessor 实例 RpcRequestProcessor,之后使用 RpcRequestProcessor 进行请求处理
  • 反序列化clazz(感兴趣key),用于获取相应的UserProcessor;如果相应的 UserProcessor==null,创建异常响应,发送给调用端,否则,继续执行
  • 如果 userProcessor.processInIOThread()==true,直接对请求进行反序列化,然后创建 ProcessTask 任务,最后直接在当前的 netty worker 线程中执行 ProcessTask.run();
    否则,如果用户 UserProcessor 自定义了 ExecutorSelector,则从众多的自定义线程池选择一个线程池,如果没定义,则使用 UserProcessor 自定义的线程池 userProcessor.getExecutor(),如果还没有,则使用 RemotingProcessor 自定义的线程池 executor,如果最后没有自定义的线程池,则使用 ProcessorManager 的defaultExecutor,来执行ProcessTask.run()
  • 反序列化 header、content(如果用户自定义了 ExecutorSelector,则header的反序列化需要提前,header 会作为众多自定义线程池的选择参数)
  • 构造用户业务上下文 DefaultBizContext
  • 使用用户自定义处理器处理请求

相关文章

网友评论

    本文标题:SOFABolt 源码分析5 - Oneway 单向通信方式的设

    本文链接:https://www.haomeiwen.com/subject/wzjnoftx.html