一、调用方式
TestResponse response = (TestResponse) client.invokeSync("127.0.0.1:8888", request, 30 * 1000);
二、源码分析
1.final Connection conn = getConnectionAndInitInvokeContext(url, invokeContext);//创建连接对象
2.this.connectionManager.check(conn);//连接有效性校验
3.this.invokeSync(conn, request, invokeContext, timeoutMillis);
3.1RemotingCommand requestCommand = toRemotingCommand(request, conn, invokeContext,timeoutMillis);//初始化远程命令对象,包括序列化器(用户可以自定义)和crc(可关闭)
3.2ResponseCommand responseCommand = (ResponseCommand) super.invokeSync(conn, requestCommand,timeoutMillis);
3.2.1final InvokeFuture future = createInvokeFuture(request, request.getInvokeContext());//创建默认的InvokeFuture
3.2.2conn.addInvokeFuture(future);//将InvokeFuture设置到连接下的invokeFutureMap,考虑到连接的复用,invokeFutureMap通过每次请求的唯一ID来保存
3.2.3RemotingCommand response = future.waitResponse(timeoutMillis);//因为是同步请求,通过countDownLatch来阻塞等待异步获取到请求接口,有超时时间
3.3RpcResponseResolver.resolveResponseObject(responseCommand,
RemotingUtil.parseRemoteAddress(conn.getChannel()));//字节码反序列化
但是通过上面的代码,我们不难发现,这里并没有像netty里的inbound的channelRead,那这就要回到我们上一章讲的RcpClient的new过程了,源码如下:
RpcClient
private ConnectionFactory connectionFactory = new RpcConnectionFactory(userProcessors,this);//初始化链接工厂,这里就是RpcHandler初始化的地方,RpcHandler是在后续客户端的初始化过程中,被添加到Bootstrap里的,详细分析可以看上一章;这里还初始化了HeartbeatHandler,也是后面客户端初始化过程需要用到的,心跳设计会在后续的章节讲到
RpcHandler
channelRead(ChannelHandlerContext ctx, Object msg)
1.ProtocolCode protocolCode = ctx.channel().attr(Connection.PROTOCOL).get();
2.Protocol protocol = ProtocolManager.getProtocol(protocolCode);//获取链接的协议,协议设计会在后续讲到
3.protocol.getCommandHandler().handleCommand(new RemotingContext(ctx, new InvokeContext(), serverSide, userProcessors), msg);
3.1final RemotingProcessor processor = processorManager.getProcessor(cmd.getCmdCode());//获取远程调用处理器
3.2processor.process(ctx, cmd, processorManager.getDefaultExecutor());//使用默认的线程池处理请求,也可以用户自定义
AbstractRemotingProcessor
3.2.1ProcessTask task = new ProcessTask(ctx, msg);//每一次请求一个子任务,负责相应接收
RpcResponseProcessor.doProcess
3.2.1.1Connection conn = ctx.getChannelContext().channel().attr(Connection.CONNECTION).get();//获取当前的链接
3.2.1.2InvokeFuture future = conn.removeInvokeFuture(cmd.getId());//从invokeFutureMap获取当前请求的InvokeFuture
3.2.1.3future.putResponse(cmd)//将还未反序列化的RemotingCommand对象放入到InvokeFuture,等待用户进程获取
3.2.1.4future.executeInvokeCallback()//通知回调,但因为我们是同步模式,其实callbackListener是空的,并没有相应的回调任务
总结
SOFA的这样由一个单独的线程对inbound进去读取,再有线程池去分发处理,能一定的减少对于inbound区IO压力
网友评论