美文网首页sofabolt
SOFABolt 源码分析21 - 超时与快速失败机制的设计

SOFABolt 源码分析21 - 超时与快速失败机制的设计

作者: 原水寒 | 来源:发表于2018-10-21 22:26 被阅读117次

    SOFABolt 的超时分为两种:连接超时和调用超时。

    • 连接超时
    • 仅客户端可设置,因为只有客户端会建连
    • 连接超时时间的设置只作用于建连的时候,有两种方式可以指定 - addr 拼接属性参数 或者 url 设置超时时间属性
    • 连接超时机制底层实际上使用的 Netty 的超时参数设置方式
    • 调用超时:调用端无论是 RpcClient 还是 RpcServer,除了 oneway 模式下,剩余的三种调用模式 sync / future / callback 的方法都提供了一个必填参数 int timeoutMillis,该参数用来指定调用超时时间
      调用超时时间对于同步调用和异步调用不同
    • sync:使用 CountDownLatch # boolean await(long timeout, TimeUnit unit) 实现当前线程的实现阻塞等待
    • future 和 callback:使用 Netty 的 HashedWheelTimer 实现 - 在发出请求的时候创建 timeoutMillis 时间的超时任务 task,当在 timeoutMillis 时间内没有返回响应,则执行 task 中的内容(构造超时响应,返回给调用程序);否则,取消 task

    fail-fast 机制

    • 仅用在被调用端,即请求的处理端
    • fail-fast 机制指的是在序列化全部内容之前,会做一个判断,如果处理当前请求的 UserProcessor 开启了 fail-fast 功能,并且此时已经超时,并且不是 oneway 模式(oneway 没有超时概念),则直接丢弃该请求,不再进行后续的序列化和业务逻辑处理操作
    • fail-fast 机制是由 UserProcessor#timeoutDiscard() 指定的,如果返回 true,则打开 fail-fast,默认情况下 fail-fast 机制是打开的;如果想关闭该功能,手动覆盖 UserProcessor#timeoutDiscard() 方法即可,直接返回 false,之后是否超时就从 DefaultBizContext#isRequestTimeout() 进行判断

    一、连接超时机制

    默认情况下,连接超时时间由RpcConfigs.CONNECT_TIMEOUT_KEY来指定,默认为 1000ms,可以通过在 addr 上拼接参数或者在 url 上设置值来指定连接超时时间。

    使用姿势

    // addr 拼接方式
    String addr = "127.0.0.1:8888?_CONNECTTIMEOUT=3000";
    
    // url 设置值的方式
    Url url = new Url("127.0.0.1",8888);
    url.setConnNum(1);
    url.setConnectTimeout(3000);
    

    源码分析

    以 url 方式为例,addr 只是将 _CONNECTTIMEOUT=3000 properties 属性设置到了url.connectTimeout 中。后续还是使用 url 方式进行调用。在调用的过程中会获取或者创建连接,连接超时作用于创建连接。

    public abstract class AbstractConnectionFactory implements ConnectionFactory {
        @Override
        public Connection createConnection(Url url) throws Exception {
            // 创建连接
            Channel channel = doCreateConnection(url.getIp(), url.getPort(), url.getConnectTimeout());
            ...
            return conn;
        }
    
        protected Channel doCreateConnection(String targetIP, int targetPort, int connectTimeout) {
            // 预处理 connectTimeout,最小为 1000
            connectTimeout = Math.max(connectTimeout, 1000);
            // 设置 Netty 的连接超时时间属性
            bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout);
            // 进行连接
            ChannelFuture future = bootstrap.connect(new InetSocketAddress(targetIP, targetPort));
    
            future.awaitUninterruptibly();
            ...
            return future.channel();
        }
    }
    

    二、调用超时机制(调用端)

    调用超时时间对于同步调用和异步调用不同

    • sync:使用 CountDownLatch # boolean await(long timeout, TimeUnit unit) 实现当前线程的实现阻塞等待
    • future 和 callback:使用 Netty 的 HashedWheelTimer 实现

    调用端无论是 RpcClient 还是 RpcServer,除了 oneway 模式下,剩余的三种调用模式 sync / future / callback 的方法都提供了一个必填参数 int timeoutMillis,该参数用来指定调用超时时间

    Object invokeSync(String addr, Object request, int timeoutMillis)
    RpcResponseFuture invokeWithFuture(String addr, Object request, int timeoutMillis)
    void invokeWithCallback(String addr, Object request, InvokeCallback invokeCallback, int timeoutMillis)
    

    同步方式

    image.png
    ============================ class RpcRemoting extends BaseRemoting ============================
        public Object invokeSync(Connection conn, Object request, InvokeContext invokeContext, int timeoutMillis) {
            // 创建请求:设置 int timeout = timeoutMillis(默认为-1,表示永不超时)
            // 该 timeout 参数会用在被调用端判断是否超时,进而做 fail-fast 操作
            RemotingCommand requestCommand = toRemotingCommand(request, conn, invokeContext, timeoutMillis);
            ...
            // 发起请求
            ResponseCommand responseCommand = (ResponseCommand) super.invokeSync(conn, requestCommand, timeoutMillis);
            ...
            // 解析响应
            Object responseObject = RpcResponseResolver.resolveResponseObject(responseCommand, RemotingUtil.parseRemoteAddress(conn.getChannel()));
            return responseObject;
        }
    
    看下解析响应流程
        public static Object resolveResponseObject(ResponseCommand responseCommand, String addr) throws RemotingException {
            ... 预处理响应,服务服务端返回异常信息,直接抛出异常
            preProcess(responseCommand, addr);
            if (responseCommand.getResponseStatus() == ResponseStatus.SUCCESS) {
                return toResponseObject(responseCommand);
            } else {
                ...
            }
        }
    
        private static void preProcess(ResponseCommand responseCommand, String addr)  throws RemotingException {
            RemotingException e = null;
            // responseCommand == null 超时
            if (responseCommand == null) {
                e = new InvokeTimeoutException(msg);
            } else {
                switch (responseCommand.getResponseStatus()) {
                    case TIMEOUT:
                        e = new InvokeTimeoutException(msg);
                        break;
                    ...
                }
            }
            // 直接抛出超时异常
            if (null != e) {
                throw e;
            }
        }
    
    ============================ BaseRemoting ============================
        protected RemotingCommand invokeSync(Connection conn, RemotingCommand request, int timeoutMillis) {
            // 创建 InvokeFuture
            final InvokeFuture future = createInvokeFuture(request, request.getInvokeContext());
            // 添加 InvokeFuture 到 Connection
            conn.addInvokeFuture(future);
            // 使用 Netty 发送请求
            conn.getChannel().writeAndFlush(request)
            // 等待 timeoutMillis,如果超时,不等待结果直接返回 response,此时的 response == null,实际代码如下
            //    public ResponseCommand waitResponse(long timeoutMillis) throws InterruptedException {
            //        // 如果超时,知己返回false,this.responseCommand 直接返回 null
            //        this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
            //        return this.responseCommand;
            //    }
            RemotingCommand response = future.waitResponse(timeoutMillis);
            // response == null 客户端自己创建超时响应
            if (response == null) {
                conn.removeInvokeFuture(request.getId());
                response = this.commandFactory.createTimeoutResponse(conn.getRemoteAddress());
            }
            return response;
        }
    
    

    注意:RpcRequestCommand 中的 timeout 参数会用在被调用端判断是否超时,进而做 fail-fast 操作

    HashWheelTimer 简介

    future 和 callback 异步模式的超时机制都使用了 Netty 的 HashWheelTimer 机制。

    image.png

    Netty 的 HashWheelTimer 是一个环形结构,假设有 8 个格子(格子数可以通过 ticksPerWheel 构造器参数指定,默认为 512),一个格子代表一段时间(越短 Timer 精度越高,每个格子代表的时间可以通过 tickDuration 指定,默认为 100ms,SOFABolt 指定为 10ms)。

    以上图为例,假设一个格子是 1s,则整个 wheel 能表示的时间段为 8s,假如当前指针指向 2,此时需要调度一个 3s 后执行的任务,显然应该加入到 (2+3=5) 的方格中的链表中,并且 round = 0,指针再走 3 次就可以执行了;如果任务要在 10s 后执行,应该等指针走完一个 round 零 2 格再执行,因此应放入4((2+10)%8=4)的方格中的链表中,其 round 设为 1。检查到期任务时只执行 round 为 0 的,格子上其他任务的 round 减 1。

    Netty 中有一条线程控制 HashWheelTimer 中的指针每隔 tickDuration 移动到下一个格子,执行其中 round 为 0 并且不在 cancel 队列的任务,该格子上的其他任务的 round 减 1。

    当取消一个超时任务时,直接将该任务添加到 cancel 队列,就不会被执行了。

    最后看看 HashWheelTimer 在 SOFABolt 中的最佳实践。

    public class TimerHolder {
        // 每格 10 ms
        private final static long defaultTickDuration = 10;
        // HashedWheelTimer 单例
        private static class DefaultInstance {
            static final Timer INSTANCE = new HashedWheelTimer(new NamedThreadFactory( "DefaultTimer" + defaultTickDuration, true), defaultTickDuration, TimeUnit.MILLISECONDS);
        }
    
        private TimerHolder() {
        }
    
        public static Timer getTimer() {
            return DefaultInstance.INSTANCE;
        }
    }
    
    // 使用见下边的分析
    Timeout timeout = TimerHolder.getTimer().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            InvokeFuture future = conn.removeInvokeFuture(request.getId());
            future.putResponse(commandFactory.createTimeoutResponse(conn.getRemoteAddress()));
            future.tryAsyncExecuteInvokeCallbackAbnormally();
        }
    }, timeoutMillis, TimeUnit.MILLISECONDS);
    

    异步方式(future 模式)

    image.png

    future 的阻塞点在 RpcResponseFuture.get()操作上,实际上同同步一样,底层也是阻塞在 countDownLatch.await() 上

    ============================ class RpcRemoting extends BaseRemoting ==================
        public RpcResponseFuture invokeWithFuture(Connection conn, Object request, InvokeContext invokeContext, int timeoutMillis) {
            // 创建请求:设置 int timeout = timeoutMillis(默认为-1,表示永不超时)
            // 该 timeout 参数会用在被调用端判断是否超时,进而做 fail-fast 操作
            RemotingCommand requestCommand = toRemotingCommand(request, conn, invokeContext, timeoutMillis);
            ...
            // 发起请求
            InvokeFuture future = super.invokeWithFuture(conn, requestCommand, timeoutMillis);
            // 直接返回响应,在 get 的时候,依然使用 RpcResponseResolver.resolveResponseObject 解析响应,如果超时,直接抛出超时异常
            return new RpcResponseFuture(RemotingUtil.parseRemoteAddress(conn.getChannel()), future);
        }
    
    ============================ BaseRemoting ============================
        protected InvokeFuture invokeWithFuture(Connection conn, RemotingCommand request, int timeoutMillis) {
            // 创建 InvokeFuture
            final InvokeFuture future = createInvokeFuture(request, request.getInvokeContext());
            // 添加 InvokeFuture 到 Connection
            conn.addInvokeFuture(future);
            try {
                // 创建超时任务,添加到 Netty 的 hashWheel 中,在 timeoutMillis 之后如果该超时任务没有被取消,则执行其 run 方法
                // 进而创建超时响应
                Timeout timeout = TimerHolder.getTimer().newTimeout(new TimerTask() {
                    public void run(Timeout timeout) throws Exception {
                        InvokeFuture future = conn.removeInvokeFuture(request.getId());
                        // 创建超时响应 + 设置超时响应 + 唤醒阻塞线程
                        future.putResponse(commandFactory.createTimeoutResponse(conn.getRemoteAddress()));
                    }
                }, timeoutMillis, TimeUnit.MILLISECONDS);
                // 将 Timeout 设置到 InvokeFuture 中,后续可以从 InvokeFuture 中取出 Timeout,执行取消超时任务的操作
                // 超时任务的取消有两种情况:正常返回响应 + 如下发生异常
                future.addTimeout(timeout);
                conn.getChannel().writeAndFlush(request);
            } catch (Exception e) {
                InvokeFuture f = conn.removeInvokeFuture(request.getId());
                // 如果发生异常,取消超时任务
                f.cancelTimeout();
                f.putResponse(commandFactory.createSendFailedResponse(conn.getRemoteAddress(), e));
            }
            return future;
        }
    
    ============================ RpcResponseProcessor ============================
        public void doProcess(RemotingContext ctx, RemotingCommand cmd) {
            // 获取 Connection
            Connection conn = ctx.getChannelContext().channel().attr(Connection.CONNECTION).get();
            // 获取 InvokeFuture
            InvokeFuture future = conn.removeInvokeFuture(cmd.getId());
            // 设置响应,唤醒阻塞线程
            future.putResponse(cmd);
            // 取消 timeout task
            future.cancelTimeout();
            // 回调 callback
            future.executeInvokeCallback();
        }
    

    异步方式(callback 模式)

    image.png
    ============================ class RpcRemoting extends BaseRemoting ==================
        public void invokeWithCallback(Connection conn, Object request, InvokeContext invokeContext, InvokeCallback invokeCallback, int timeoutMillis) {
            // 创建请求:设置 int timeout = timeoutMillis(默认为-1,表示永不超时)
            // 该 timeout 参数会用在被调用端判断是否超时,进而做 fail-fast 操作
            RemotingCommand requestCommand = toRemotingCommand(request, conn, invokeContext, timeoutMillis);
            ...
            // 发出请求
            super.invokeWithCallback(conn, requestCommand, invokeCallback, timeoutMillis);
        }
    
    ============================ BaseRemoting ============================
           protected void invokeWithCallback(Connection conn, RemotingCommand request, InvokeCallback invokeCallback, int timeoutMillis) {
            // 创建 InvokeFuture
            InvokeFuture future = createInvokeFuture(conn, request, request.getInvokeContext(), invokeCallback);
            // 添加 InvokeFuture 到 Connection
            conn.addInvokeFuture(future);
    
            try {
                // 创建超时任务,添加到 Netty 的 hashWheel 中,在 timeoutMillis 之后如果该超时任务没有被取消,则执行其 run 方法
                // 进而创建超时响应
                Timeout timeout = TimerHolder.getTimer().newTimeout(new TimerTask() {
                    @Override
                    public void run(Timeout timeout) throws Exception {
                        InvokeFuture future = conn.removeInvokeFuture(request.getId());
                        // 创建超时响应 + 设置超时响应
                        future.putResponse(commandFactory.createTimeoutResponse(conn.getRemoteAddress()));
                        // 想较于 future 而言,多出这一步,执行异常回调 callback.onException(e)
                        future.tryAsyncExecuteInvokeCallbackAbnormally();
                    }
    
                }, timeoutMillis, TimeUnit.MILLISECONDS);
                // 将 Timeout 设置到 InvokeFuture 中,后续可以从 InvokeFuture 中取出 Timeout,执行取消超时任务的操作
                // 超时任务的取消有两种情况:正常返回响应 + 如下发生异常
                future.addTimeout(timeout);
                conn.getChannel().writeAndFlush(request);
            } catch (Exception e) {
                InvokeFuture f = conn.removeInvokeFuture(request.getId());
                // 如果发生异常,取消超时任务
                f.cancelTimeout();
                f.putResponse(commandFactory.createSendFailedResponse(conn.getRemoteAddress(), e));
                // 想较于 future 而言,多出这一步,执行异常回调 callback.onException(e)
                f.tryAsyncExecuteInvokeCallbackAbnormally();
            }
        }
    
    ============================ RpcResponseProcessor ============================
        // 这里与 future 的处理方式一样
        public void doProcess(RemotingContext ctx, RemotingCommand cmd) {
            // 获取 Connection
            Connection conn = ctx.getChannelContext().channel().attr(Connection.CONNECTION).get();
            // 获取 InvokeFuture
            InvokeFuture future = conn.removeInvokeFuture(cmd.getId());
            // 设置响应,唤醒阻塞线程
            future.putResponse(cmd);
            // 取消 timeout task
            future.cancelTimeout();
            // 回调 callback
            future.executeInvokeCallback();
        }
    

    心跳请求:RpcHeartbeatTrigger#heartbeatTriggered(final ChannelHandlerContext ctx) 在发送了心跳请求之后,也是用了 TimeOut 做超时任务,与 Callback 使用一致,不再分析。

    三、快速失败机制(被调用端)

    以上都是在分析调用端的超时逻辑,对于三种调用方式,被调用端的处理都是一套逻辑。

    public class RpcRequestCommand extends RequestCommand {
        // 调用超时时间,调用端进行设置,默认值为 -1,代表永不超时
        private int timeout = -1;
        // 不会序列化到对端,该值会在被调用端的解码器收到该消息时,设置为收到的当前时间
        private transient long arriveTime = -1;
    }
    
    public class RpcCommandDecoder implements CommandDecoder {
        @Override
        public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
            ...
            RequestCommand command;
            if (cmdCode == CommandCode.HEARTBEAT_VALUE) {
                // 如果是心跳请求消息,直接创建一个 HeartbeatCommand
                command = new HeartbeatCommand();
            } else {
                // 如果是正常请求,创建一个 RpcRequestCommand
                command = createRequestCommand(cmdCode);
            }
            ...
            out.add(command);
        }
    
        private RpcRequestCommand createRequestCommand(short cmdCode) {
            RpcRequestCommand command = new RpcRequestCommand();
            ...
            // 设置请求到达时间
            command.setArriveTime(System.currentTimeMillis());
            return command;
        }
    }
    
    public class RpcRequestProcessor {
        public void process(RemotingContext ctx, RpcRequestCommand cmd, ExecutorService defaultExecutor) {
            // 反序列化 className
            if (!deserializeRequestCommand(ctx, cmd, RpcDeserializeLevel.DESERIALIZE_CLAZZ)) {
                return;
            }
            // 获取 userProcessor
            UserProcessor userProcessor = ctx.getUserProcessor(cmd.getRequestClass());
    
            // 将 userProcessor.timeoutDiscard() 设置到 RemotingContext 中
            // userProcessor.timeoutDiscard() 用于 fail-fast,该值默认为 true,如果要关闭 fail-fast 功能,需要手动覆盖 UserProcessor#timeoutDiscard()
            ctx.setTimeoutDiscard(userProcessor.timeoutDiscard());
    
            ... 不管是 IO 线程执行还是业务线程池执行
            // ProcessTask.run() 调用 doProcess()
            executor.execute(new ProcessTask(ctx, cmd));
        }
    
        public void doProcess(final RemotingContext ctx, RpcRequestCommand cmd) throws Exception {
            long currentTimestamp = System.currentTimeMillis();
            // 设置超时时间 timeout 与到达时间 arriveTime 到 RemotingContext 中,为后续计算是否超时(ctx.isRequestTimeout())做准备
            preProcessRemotingContext(ctx, cmd, currentTimestamp);
            // 如果开启了 fail-fast 并且 (System.currentTimeMillis() - this.arriveTimestamp) > this.timeout 确实超时,直接返回
            // 不再进行后续的反序列化和业务逻辑处理
            if (ctx.isTimeoutDiscard() && ctx.isRequestTimeout()) {
                return;// then, discard this request
            }
            // decode request all
            if (!deserializeRequestCommand(ctx, cmd, RpcDeserializeLevel.DESERIALIZE_ALL)) {
                return;
            }
            // 派发请求,做真正的业务逻辑处理
            dispatchToUserProcessor(ctx, cmd);
        }
    
        private void preProcessRemotingContext(RemotingContext ctx, RpcRequestCommand cmd, long currentTimestamp) {
            // 将 RpcRequestCommand.getArriveTime()(该值在解码器中进行设置)设置到 RemotingContext 中
            ctx.setArriveTimestamp(cmd.getArriveTime());
            // 将 RpcRequestCommand.getTimeout() 设置到 RemotingContext 中
            ctx.setTimeout(cmd.getTimeout());
            ...
        }
    
        // RemotingContext#isRequestTimeout()
        public boolean isRequestTimeout() {
            // timeout = -1 表示永不超时
            // oneway 没有超时概念
            // 是否超时是按照当前时间减去响应达到时间,而非被调用端的当前时间 - 调用端的请求发送时间(这样计算就是使用了两个机器的时钟,跨机器的时钟会有时钟差)
            if (this.timeout > 0 
                    && (this.rpcCommandType != RpcCommandType.REQUEST_ONEWAY)
                    && (System.currentTimeMillis() - this.arriveTimestamp) > this.timeout) {
                return true;
            }
            return false;
        }
    }
    

    注意

    • 是否超时是按照当前时间减去响应达到时间,而非被调用端的当前时间 - 调用端的请求发送时间(这样计算就是使用了两个机器的时钟,跨机器的时钟会有时钟差)
    • fail-fast 机制是由 UserProcessor#timeoutDiscard() 指定的,如果返回 true,则打开 fail-fast,默认情况下 fail-fast 机制是打开的;如果想关闭该功能,手动覆盖 UserProcessor#timeoutDiscard() 方法即可,直接返回 false,之后是否超时就从 DefaultBizContext#isRequestTimeout() 进行判断
    DefaultBizContext#isRequestTimeout() 实际上还是调用了 RemotingContext#isRequestTimeout()
    
        private RemotingContext remotingCtx;
        @Override
        public boolean isRequestTimeout() {
            return this.remotingCtx.isRequestTimeout();
        }
    

    相关文章

      网友评论

        本文标题:SOFABolt 源码分析21 - 超时与快速失败机制的设计

        本文链接:https://www.haomeiwen.com/subject/yrsnoftx.html