SOFABolt 的超时分为两种:连接超时和调用超时。
- 连接超时
- 仅客户端可设置,因为只有客户端会建连
- 连接超时时间的设置只作用于建连的时候,有两种方式可以指定 -
addr 拼接属性参数
或者url 设置超时时间属性
- 连接超时机制底层实际上使用的 Netty 的超时参数设置方式
- 调用超时:调用端无论是 RpcClient 还是 RpcServer,除了 oneway 模式下,剩余的三种调用模式 sync / future / callback 的方法都提供了一个必填参数 int timeoutMillis,该参数用来指定调用超时时间
调用超时时间对于同步调用和异步调用不同
- sync:使用
CountDownLatch # boolean await(long timeout, TimeUnit unit)
实现当前线程的实现阻塞等待- future 和 callback:使用 Netty 的 HashedWheelTimer 实现 -
在发出请求的时候创建 timeoutMillis 时间的超时任务 task,当在 timeoutMillis 时间内没有返回响应,则执行 task 中的内容(构造超时响应,返回给调用程序);否则,取消 task
fail-fast 机制
- 仅用在被调用端,即请求的处理端
- fail-fast 机制指的是在序列化全部内容之前,会做一个判断,如果处理当前请求的 UserProcessor 开启了 fail-fast 功能,并且此时已经超时,并且不是 oneway 模式(oneway 没有超时概念),则直接丢弃该请求,不再进行后续的序列化和业务逻辑处理操作
- fail-fast 机制是由 UserProcessor#timeoutDiscard() 指定的,如果返回 true,则打开 fail-fast,默认情况下 fail-fast 机制是打开的;如果想关闭该功能,手动覆盖 UserProcessor#timeoutDiscard() 方法即可,直接返回 false,之后是否超时就从 DefaultBizContext#isRequestTimeout() 进行判断
一、连接超时机制
默认情况下,连接超时时间由
RpcConfigs.CONNECT_TIMEOUT_KEY
来指定,默认为 1000ms,可以通过在 addr 上拼接参数或者在 url 上设置值来指定连接超时时间。
使用姿势
// addr 拼接方式
String addr = "127.0.0.1:8888?_CONNECTTIMEOUT=3000";
// url 设置值的方式
Url url = new Url("127.0.0.1",8888);
url.setConnNum(1);
url.setConnectTimeout(3000);
源码分析
以 url 方式为例,addr 只是将 _CONNECTTIMEOUT=3000 properties 属性设置到了url.connectTimeout 中。后续还是使用 url 方式进行调用。在调用的过程中会获取或者创建连接,连接超时作用于创建连接。
public abstract class AbstractConnectionFactory implements ConnectionFactory {
@Override
public Connection createConnection(Url url) throws Exception {
// 创建连接
Channel channel = doCreateConnection(url.getIp(), url.getPort(), url.getConnectTimeout());
...
return conn;
}
protected Channel doCreateConnection(String targetIP, int targetPort, int connectTimeout) {
// 预处理 connectTimeout,最小为 1000
connectTimeout = Math.max(connectTimeout, 1000);
// 设置 Netty 的连接超时时间属性
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout);
// 进行连接
ChannelFuture future = bootstrap.connect(new InetSocketAddress(targetIP, targetPort));
future.awaitUninterruptibly();
...
return future.channel();
}
}
二、调用超时机制(调用端)
调用超时时间对于同步调用和异步调用不同
- sync:使用 CountDownLatch # boolean await(long timeout, TimeUnit unit) 实现当前线程的实现阻塞等待
- future 和 callback:使用 Netty 的 HashedWheelTimer 实现
调用端无论是 RpcClient 还是 RpcServer,除了 oneway 模式下,剩余的三种调用模式 sync / future / callback 的方法都提供了一个必填参数 int timeoutMillis,该参数用来指定调用超时时间
Object invokeSync(String addr, Object request, int timeoutMillis)
RpcResponseFuture invokeWithFuture(String addr, Object request, int timeoutMillis)
void invokeWithCallback(String addr, Object request, InvokeCallback invokeCallback, int timeoutMillis)
同步方式
image.png============================ class RpcRemoting extends BaseRemoting ============================
public Object invokeSync(Connection conn, Object request, InvokeContext invokeContext, int timeoutMillis) {
// 创建请求:设置 int timeout = timeoutMillis(默认为-1,表示永不超时)
// 该 timeout 参数会用在被调用端判断是否超时,进而做 fail-fast 操作
RemotingCommand requestCommand = toRemotingCommand(request, conn, invokeContext, timeoutMillis);
...
// 发起请求
ResponseCommand responseCommand = (ResponseCommand) super.invokeSync(conn, requestCommand, timeoutMillis);
...
// 解析响应
Object responseObject = RpcResponseResolver.resolveResponseObject(responseCommand, RemotingUtil.parseRemoteAddress(conn.getChannel()));
return responseObject;
}
看下解析响应流程
public static Object resolveResponseObject(ResponseCommand responseCommand, String addr) throws RemotingException {
... 预处理响应,服务服务端返回异常信息,直接抛出异常
preProcess(responseCommand, addr);
if (responseCommand.getResponseStatus() == ResponseStatus.SUCCESS) {
return toResponseObject(responseCommand);
} else {
...
}
}
private static void preProcess(ResponseCommand responseCommand, String addr) throws RemotingException {
RemotingException e = null;
// responseCommand == null 超时
if (responseCommand == null) {
e = new InvokeTimeoutException(msg);
} else {
switch (responseCommand.getResponseStatus()) {
case TIMEOUT:
e = new InvokeTimeoutException(msg);
break;
...
}
}
// 直接抛出超时异常
if (null != e) {
throw e;
}
}
============================ BaseRemoting ============================
protected RemotingCommand invokeSync(Connection conn, RemotingCommand request, int timeoutMillis) {
// 创建 InvokeFuture
final InvokeFuture future = createInvokeFuture(request, request.getInvokeContext());
// 添加 InvokeFuture 到 Connection
conn.addInvokeFuture(future);
// 使用 Netty 发送请求
conn.getChannel().writeAndFlush(request)
// 等待 timeoutMillis,如果超时,不等待结果直接返回 response,此时的 response == null,实际代码如下
// public ResponseCommand waitResponse(long timeoutMillis) throws InterruptedException {
// // 如果超时,知己返回false,this.responseCommand 直接返回 null
// this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
// return this.responseCommand;
// }
RemotingCommand response = future.waitResponse(timeoutMillis);
// response == null 客户端自己创建超时响应
if (response == null) {
conn.removeInvokeFuture(request.getId());
response = this.commandFactory.createTimeoutResponse(conn.getRemoteAddress());
}
return response;
}
注意:RpcRequestCommand 中的 timeout 参数会用在被调用端判断是否超时,进而做 fail-fast 操作
HashWheelTimer 简介
future 和 callback 异步模式的超时机制都使用了 Netty 的 HashWheelTimer 机制。
image.pngNetty 的 HashWheelTimer 是一个环形结构,假设有 8 个格子(格子数可以通过 ticksPerWheel 构造器参数指定,默认为 512),一个格子代表一段时间(越短 Timer 精度越高,每个格子代表的时间可以通过 tickDuration 指定,默认为 100ms,SOFABolt 指定为 10ms)。
以上图为例,假设一个格子是 1s,则整个 wheel 能表示的时间段为 8s,假如当前指针指向 2,此时需要调度一个 3s 后执行的任务,显然应该加入到 (2+3=5) 的方格中的链表中,并且 round = 0,指针再走 3 次就可以执行了;如果任务要在 10s 后执行,应该等指针走完一个 round 零 2 格再执行,因此应放入4((2+10)%8=4)的方格中的链表中,其 round 设为 1。检查到期任务时只执行 round 为 0 的,格子上其他任务的 round 减 1。
Netty 中有一条线程控制 HashWheelTimer 中的指针每隔 tickDuration 移动到下一个格子,执行其中 round 为 0 并且不在 cancel 队列的任务,该格子上的其他任务的 round 减 1。
当取消一个超时任务时,直接将该任务添加到 cancel 队列,就不会被执行了。
最后看看 HashWheelTimer 在 SOFABolt 中的最佳实践。
public class TimerHolder {
// 每格 10 ms
private final static long defaultTickDuration = 10;
// HashedWheelTimer 单例
private static class DefaultInstance {
static final Timer INSTANCE = new HashedWheelTimer(new NamedThreadFactory( "DefaultTimer" + defaultTickDuration, true), defaultTickDuration, TimeUnit.MILLISECONDS);
}
private TimerHolder() {
}
public static Timer getTimer() {
return DefaultInstance.INSTANCE;
}
}
// 使用见下边的分析
Timeout timeout = TimerHolder.getTimer().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
InvokeFuture future = conn.removeInvokeFuture(request.getId());
future.putResponse(commandFactory.createTimeoutResponse(conn.getRemoteAddress()));
future.tryAsyncExecuteInvokeCallbackAbnormally();
}
}, timeoutMillis, TimeUnit.MILLISECONDS);
异步方式(future 模式)
image.pngfuture 的阻塞点在 RpcResponseFuture.get()操作上,实际上同同步一样,底层也是阻塞在 countDownLatch.await() 上
============================ class RpcRemoting extends BaseRemoting ==================
public RpcResponseFuture invokeWithFuture(Connection conn, Object request, InvokeContext invokeContext, int timeoutMillis) {
// 创建请求:设置 int timeout = timeoutMillis(默认为-1,表示永不超时)
// 该 timeout 参数会用在被调用端判断是否超时,进而做 fail-fast 操作
RemotingCommand requestCommand = toRemotingCommand(request, conn, invokeContext, timeoutMillis);
...
// 发起请求
InvokeFuture future = super.invokeWithFuture(conn, requestCommand, timeoutMillis);
// 直接返回响应,在 get 的时候,依然使用 RpcResponseResolver.resolveResponseObject 解析响应,如果超时,直接抛出超时异常
return new RpcResponseFuture(RemotingUtil.parseRemoteAddress(conn.getChannel()), future);
}
============================ BaseRemoting ============================
protected InvokeFuture invokeWithFuture(Connection conn, RemotingCommand request, int timeoutMillis) {
// 创建 InvokeFuture
final InvokeFuture future = createInvokeFuture(request, request.getInvokeContext());
// 添加 InvokeFuture 到 Connection
conn.addInvokeFuture(future);
try {
// 创建超时任务,添加到 Netty 的 hashWheel 中,在 timeoutMillis 之后如果该超时任务没有被取消,则执行其 run 方法
// 进而创建超时响应
Timeout timeout = TimerHolder.getTimer().newTimeout(new TimerTask() {
public void run(Timeout timeout) throws Exception {
InvokeFuture future = conn.removeInvokeFuture(request.getId());
// 创建超时响应 + 设置超时响应 + 唤醒阻塞线程
future.putResponse(commandFactory.createTimeoutResponse(conn.getRemoteAddress()));
}
}, timeoutMillis, TimeUnit.MILLISECONDS);
// 将 Timeout 设置到 InvokeFuture 中,后续可以从 InvokeFuture 中取出 Timeout,执行取消超时任务的操作
// 超时任务的取消有两种情况:正常返回响应 + 如下发生异常
future.addTimeout(timeout);
conn.getChannel().writeAndFlush(request);
} catch (Exception e) {
InvokeFuture f = conn.removeInvokeFuture(request.getId());
// 如果发生异常,取消超时任务
f.cancelTimeout();
f.putResponse(commandFactory.createSendFailedResponse(conn.getRemoteAddress(), e));
}
return future;
}
============================ RpcResponseProcessor ============================
public void doProcess(RemotingContext ctx, RemotingCommand cmd) {
// 获取 Connection
Connection conn = ctx.getChannelContext().channel().attr(Connection.CONNECTION).get();
// 获取 InvokeFuture
InvokeFuture future = conn.removeInvokeFuture(cmd.getId());
// 设置响应,唤醒阻塞线程
future.putResponse(cmd);
// 取消 timeout task
future.cancelTimeout();
// 回调 callback
future.executeInvokeCallback();
}
异步方式(callback 模式)
image.png============================ class RpcRemoting extends BaseRemoting ==================
public void invokeWithCallback(Connection conn, Object request, InvokeContext invokeContext, InvokeCallback invokeCallback, int timeoutMillis) {
// 创建请求:设置 int timeout = timeoutMillis(默认为-1,表示永不超时)
// 该 timeout 参数会用在被调用端判断是否超时,进而做 fail-fast 操作
RemotingCommand requestCommand = toRemotingCommand(request, conn, invokeContext, timeoutMillis);
...
// 发出请求
super.invokeWithCallback(conn, requestCommand, invokeCallback, timeoutMillis);
}
============================ BaseRemoting ============================
protected void invokeWithCallback(Connection conn, RemotingCommand request, InvokeCallback invokeCallback, int timeoutMillis) {
// 创建 InvokeFuture
InvokeFuture future = createInvokeFuture(conn, request, request.getInvokeContext(), invokeCallback);
// 添加 InvokeFuture 到 Connection
conn.addInvokeFuture(future);
try {
// 创建超时任务,添加到 Netty 的 hashWheel 中,在 timeoutMillis 之后如果该超时任务没有被取消,则执行其 run 方法
// 进而创建超时响应
Timeout timeout = TimerHolder.getTimer().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
InvokeFuture future = conn.removeInvokeFuture(request.getId());
// 创建超时响应 + 设置超时响应
future.putResponse(commandFactory.createTimeoutResponse(conn.getRemoteAddress()));
// 想较于 future 而言,多出这一步,执行异常回调 callback.onException(e)
future.tryAsyncExecuteInvokeCallbackAbnormally();
}
}, timeoutMillis, TimeUnit.MILLISECONDS);
// 将 Timeout 设置到 InvokeFuture 中,后续可以从 InvokeFuture 中取出 Timeout,执行取消超时任务的操作
// 超时任务的取消有两种情况:正常返回响应 + 如下发生异常
future.addTimeout(timeout);
conn.getChannel().writeAndFlush(request);
} catch (Exception e) {
InvokeFuture f = conn.removeInvokeFuture(request.getId());
// 如果发生异常,取消超时任务
f.cancelTimeout();
f.putResponse(commandFactory.createSendFailedResponse(conn.getRemoteAddress(), e));
// 想较于 future 而言,多出这一步,执行异常回调 callback.onException(e)
f.tryAsyncExecuteInvokeCallbackAbnormally();
}
}
============================ RpcResponseProcessor ============================
// 这里与 future 的处理方式一样
public void doProcess(RemotingContext ctx, RemotingCommand cmd) {
// 获取 Connection
Connection conn = ctx.getChannelContext().channel().attr(Connection.CONNECTION).get();
// 获取 InvokeFuture
InvokeFuture future = conn.removeInvokeFuture(cmd.getId());
// 设置响应,唤醒阻塞线程
future.putResponse(cmd);
// 取消 timeout task
future.cancelTimeout();
// 回调 callback
future.executeInvokeCallback();
}
心跳请求:RpcHeartbeatTrigger#heartbeatTriggered(final ChannelHandlerContext ctx) 在发送了心跳请求之后,也是用了 TimeOut 做超时任务,与 Callback 使用一致,不再分析。
三、快速失败机制(被调用端)
以上都是在分析调用端的超时逻辑,对于三种调用方式,被调用端的处理都是一套逻辑。
public class RpcRequestCommand extends RequestCommand {
// 调用超时时间,调用端进行设置,默认值为 -1,代表永不超时
private int timeout = -1;
// 不会序列化到对端,该值会在被调用端的解码器收到该消息时,设置为收到的当前时间
private transient long arriveTime = -1;
}
public class RpcCommandDecoder implements CommandDecoder {
@Override
public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
...
RequestCommand command;
if (cmdCode == CommandCode.HEARTBEAT_VALUE) {
// 如果是心跳请求消息,直接创建一个 HeartbeatCommand
command = new HeartbeatCommand();
} else {
// 如果是正常请求,创建一个 RpcRequestCommand
command = createRequestCommand(cmdCode);
}
...
out.add(command);
}
private RpcRequestCommand createRequestCommand(short cmdCode) {
RpcRequestCommand command = new RpcRequestCommand();
...
// 设置请求到达时间
command.setArriveTime(System.currentTimeMillis());
return command;
}
}
public class RpcRequestProcessor {
public void process(RemotingContext ctx, RpcRequestCommand cmd, ExecutorService defaultExecutor) {
// 反序列化 className
if (!deserializeRequestCommand(ctx, cmd, RpcDeserializeLevel.DESERIALIZE_CLAZZ)) {
return;
}
// 获取 userProcessor
UserProcessor userProcessor = ctx.getUserProcessor(cmd.getRequestClass());
// 将 userProcessor.timeoutDiscard() 设置到 RemotingContext 中
// userProcessor.timeoutDiscard() 用于 fail-fast,该值默认为 true,如果要关闭 fail-fast 功能,需要手动覆盖 UserProcessor#timeoutDiscard()
ctx.setTimeoutDiscard(userProcessor.timeoutDiscard());
... 不管是 IO 线程执行还是业务线程池执行
// ProcessTask.run() 调用 doProcess()
executor.execute(new ProcessTask(ctx, cmd));
}
public void doProcess(final RemotingContext ctx, RpcRequestCommand cmd) throws Exception {
long currentTimestamp = System.currentTimeMillis();
// 设置超时时间 timeout 与到达时间 arriveTime 到 RemotingContext 中,为后续计算是否超时(ctx.isRequestTimeout())做准备
preProcessRemotingContext(ctx, cmd, currentTimestamp);
// 如果开启了 fail-fast 并且 (System.currentTimeMillis() - this.arriveTimestamp) > this.timeout 确实超时,直接返回
// 不再进行后续的反序列化和业务逻辑处理
if (ctx.isTimeoutDiscard() && ctx.isRequestTimeout()) {
return;// then, discard this request
}
// decode request all
if (!deserializeRequestCommand(ctx, cmd, RpcDeserializeLevel.DESERIALIZE_ALL)) {
return;
}
// 派发请求,做真正的业务逻辑处理
dispatchToUserProcessor(ctx, cmd);
}
private void preProcessRemotingContext(RemotingContext ctx, RpcRequestCommand cmd, long currentTimestamp) {
// 将 RpcRequestCommand.getArriveTime()(该值在解码器中进行设置)设置到 RemotingContext 中
ctx.setArriveTimestamp(cmd.getArriveTime());
// 将 RpcRequestCommand.getTimeout() 设置到 RemotingContext 中
ctx.setTimeout(cmd.getTimeout());
...
}
// RemotingContext#isRequestTimeout()
public boolean isRequestTimeout() {
// timeout = -1 表示永不超时
// oneway 没有超时概念
// 是否超时是按照当前时间减去响应达到时间,而非被调用端的当前时间 - 调用端的请求发送时间(这样计算就是使用了两个机器的时钟,跨机器的时钟会有时钟差)
if (this.timeout > 0
&& (this.rpcCommandType != RpcCommandType.REQUEST_ONEWAY)
&& (System.currentTimeMillis() - this.arriveTimestamp) > this.timeout) {
return true;
}
return false;
}
}
注意
- 是否超时是按照当前时间减去响应达到时间,而非被调用端的当前时间 - 调用端的请求发送时间(这样计算就是使用了两个机器的时钟,跨机器的时钟会有时钟差)
- fail-fast 机制是由 UserProcessor#timeoutDiscard() 指定的,如果返回 true,则打开 fail-fast,默认情况下 fail-fast 机制是打开的;如果想关闭该功能,手动覆盖 UserProcessor#timeoutDiscard() 方法即可,直接返回 false,之后是否超时就从
DefaultBizContext#isRequestTimeout()
进行判断
DefaultBizContext#isRequestTimeout() 实际上还是调用了 RemotingContext#isRequestTimeout()
private RemotingContext remotingCtx;
@Override
public boolean isRequestTimeout() {
return this.remotingCtx.isRequestTimeout();
}
网友评论