美文网首页
SOFA Bolt源码分析 3-Sync同步请求

SOFA Bolt源码分析 3-Sync同步请求

作者: 折浪君 | 来源:发表于2019-03-06 20:47 被阅读0次

    一、调用方式

    TestResponse response = (TestResponse) client.invokeSync("127.0.0.1:8888", request, 30 * 1000);
    

    二、源码分析

    1.final Connection conn = getConnectionAndInitInvokeContext(url, invokeContext);//创建连接对象
    2.this.connectionManager.check(conn);//连接有效性校验
    3.this.invokeSync(conn, request, invokeContext, timeoutMillis);
        3.1RemotingCommand requestCommand = toRemotingCommand(request, conn, invokeContext,timeoutMillis);//初始化远程命令对象,包括序列化器(用户可以自定义)和crc(可关闭)
        3.2ResponseCommand responseCommand = (ResponseCommand) super.invokeSync(conn, requestCommand,timeoutMillis);
          3.2.1final InvokeFuture future = createInvokeFuture(request, request.getInvokeContext());//创建默认的InvokeFuture
          3.2.2conn.addInvokeFuture(future);//将InvokeFuture设置到连接下的invokeFutureMap,考虑到连接的复用,invokeFutureMap通过每次请求的唯一ID来保存
          3.2.3RemotingCommand response = future.waitResponse(timeoutMillis);//因为是同步请求,通过countDownLatch来阻塞等待异步获取到请求接口,有超时时间
        3.3RpcResponseResolver.resolveResponseObject(responseCommand,
                RemotingUtil.parseRemoteAddress(conn.getChannel()));//字节码反序列化
    

    但是通过上面的代码,我们不难发现,这里并没有像netty里的inbound的channelRead,那这就要回到我们上一章讲的RcpClient的new过程了,源码如下:
    RpcClient

    private ConnectionFactory connectionFactory = new RpcConnectionFactory(userProcessors,this);//初始化链接工厂,这里就是RpcHandler初始化的地方,RpcHandler是在后续客户端的初始化过程中,被添加到Bootstrap里的,详细分析可以看上一章;这里还初始化了HeartbeatHandler,也是后面客户端初始化过程需要用到的,心跳设计会在后续的章节讲到
    

    RpcHandler

    channelRead(ChannelHandlerContext ctx, Object msg)
    1.ProtocolCode protocolCode = ctx.channel().attr(Connection.PROTOCOL).get();
    2.Protocol protocol = ProtocolManager.getProtocol(protocolCode);//获取链接的协议,协议设计会在后续讲到
    3.protocol.getCommandHandler().handleCommand(new RemotingContext(ctx, new InvokeContext(), serverSide, userProcessors), msg);
        3.1final RemotingProcessor processor = processorManager.getProcessor(cmd.getCmdCode());//获取远程调用处理器
        3.2processor.process(ctx, cmd, processorManager.getDefaultExecutor());//使用默认的线程池处理请求,也可以用户自定义
            AbstractRemotingProcessor
            3.2.1ProcessTask task = new ProcessTask(ctx, msg);//每一次请求一个子任务,负责相应接收
                RpcResponseProcessor.doProcess
                3.2.1.1Connection conn = ctx.getChannelContext().channel().attr(Connection.CONNECTION).get();//获取当前的链接
                3.2.1.2InvokeFuture future = conn.removeInvokeFuture(cmd.getId());//从invokeFutureMap获取当前请求的InvokeFuture
                3.2.1.3future.putResponse(cmd)//将还未反序列化的RemotingCommand对象放入到InvokeFuture,等待用户进程获取
                3.2.1.4future.executeInvokeCallback()//通知回调,但因为我们是同步模式,其实callbackListener是空的,并没有相应的回调任务
                
    

    总结
    SOFA的这样由一个单独的线程对inbound进去读取,再有线程池去分发处理,能一定的减少对于inbound区IO压力

    相关文章

      网友评论

          本文标题:SOFA Bolt源码分析 3-Sync同步请求

          本文链接:https://www.haomeiwen.com/subject/noffpqtx.html