美文网首页dubbo
Dubbo 2.6.5 心跳机制

Dubbo 2.6.5 心跳机制

作者: 晴天哥_王志 | 来源:发表于2020-02-23 11:38 被阅读0次

    开篇

    • 这篇文章的目的是理解下Dubbo 在consumer和provider两侧的心跳检测机制。
    • consumer侧心跳检测的以Connection作为维度进行检测,每一个连接生成一个心跳检测任务。
    • provider侧心跳检测以server端口监听作为维度进行检测,每个端口监听生成一个心跳检测任务。
    • consumer侧的心跳检测任务的初始化在HeaderExchangeClient当中。
    • provider侧的心跳检测任务的初始化在HeaderExchangeServer当中。
    • provider和consumer在处理发送接收请求过程中会记录读写的时间戳,根据该时间戳判断是否超过心跳检测时间。

    HeaderExchangeClient

    public class HeaderExchangeClient implements ExchangeClient {
    
        private static final Logger logger = LoggerFactory.getLogger(HeaderExchangeClient.class);
    
        private static final ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("dubbo-remoting-client-heartbeat", true));
        private final Client client;
        private final ExchangeChannel channel;
        // heartbeat timer
        private ScheduledFuture<?> heartbeatTimer;
        // heartbeat(ms), default value is 0 , won't execute a heartbeat.
        private int heartbeat;
        private int heartbeatTimeout;
    
        public HeaderExchangeClient(Client client, boolean needHeartbeat) {
            if (client == null) {
                throw new IllegalArgumentException("client == null");
            }
            this.client = client;
            this.channel = new HeaderExchangeChannel(client);
            String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY);
            this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo != null && dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0);
            this.heartbeatTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
            if (heartbeatTimeout < heartbeat * 2) {
                throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
            }
            if (needHeartbeat) {
                startHeartbeatTimer();
            }
        }
    
    
        private void startHeartbeatTimer() {
            stopHeartbeatTimer();
            if (heartbeat > 0) {
                heartbeatTimer = scheduled.scheduleWithFixedDelay(
                        new HeartBeatTask(new HeartBeatTask.ChannelProvider() {
                            @Override
                            public Collection<Channel> getChannels() {
                                return Collections.<Channel>singletonList(HeaderExchangeClient.this);
                            }
                        }, heartbeat, heartbeatTimeout),
                        heartbeat, heartbeat, TimeUnit.MILLISECONDS);
            }
        }
    }
    
    • HeaderExchangeClient的startHeartbeatTimer()方法会创建heartbeatTimer对象。
    • heartbeatTimer对象是通过线程池执行的HeartBeatTask对象。
    • 具体的任务心跳检测在HeartBeatTask当中实现。

    HeaderExchangeServer

    public class HeaderExchangeServer implements ExchangeServer {
    
        protected final Logger logger = LoggerFactory.getLogger(getClass());
    
        private final ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1,
                new NamedThreadFactory(
                        "dubbo-remoting-server-heartbeat",
                        true));
        private final Server server;
        // heartbeat timer
        private ScheduledFuture<?> heartbeatTimer;
        // heartbeat timeout (ms), default value is 0 , won't execute a heartbeat.
        private int heartbeat;
        private int heartbeatTimeout;
        private AtomicBoolean closed = new AtomicBoolean(false);
    
        public HeaderExchangeServer(Server server) {
            if (server == null) {
                throw new IllegalArgumentException("server == null");
            }
            this.server = server;
            this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
            this.heartbeatTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
            if (heartbeatTimeout < heartbeat * 2) {
                throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
            }
            startHeartbeatTimer();
        }
    
        private void startHeartbeatTimer() {
            stopHeartbeatTimer();
            if (heartbeat > 0) {
                heartbeatTimer = scheduled.scheduleWithFixedDelay(
                        new HeartBeatTask(new HeartBeatTask.ChannelProvider() {
                            @Override
                            public Collection<Channel> getChannels() {
                                return Collections.unmodifiableCollection(
                                        HeaderExchangeServer.this.getChannels());
                            }
                        }, heartbeat, heartbeatTimeout),
                        heartbeat, heartbeat, TimeUnit.MILLISECONDS);
            }
        }
    }
    
    • HeaderExchangeServer的startHeartbeatTimer()方法会创建heartbeatTimer对象。
    • heartbeatTimer对象是通过线程池执行的HeartBeatTask对象。
    • 具体的任务心跳检测在HeartBeatTask当中实现。

    HeartBeatTask

    final class HeartBeatTask implements Runnable {
    
        private static final Logger logger = LoggerFactory.getLogger(HeartBeatTask.class);
    
        private ChannelProvider channelProvider;
    
        private int heartbeat;
    
        private int heartbeatTimeout;
    
        HeartBeatTask(ChannelProvider provider, int heartbeat, int heartbeatTimeout) {
            this.channelProvider = provider;
            this.heartbeat = heartbeat;
            this.heartbeatTimeout = heartbeatTimeout;
        }
    
        @Override
        public void run() {
            try {
                long now = System.currentTimeMillis();
                for (Channel channel : channelProvider.getChannels()) {
                    if (channel.isClosed()) {
                        continue;
                    }
                    try {
                        Long lastRead = (Long) channel.getAttribute(
                                HeaderExchangeHandler.KEY_READ_TIMESTAMP);
                        Long lastWrite = (Long) channel.getAttribute(
                                HeaderExchangeHandler.KEY_WRITE_TIMESTAMP);
                        if ((lastRead != null && now - lastRead > heartbeat)
                                || (lastWrite != null && now - lastWrite > heartbeat)) {
                            Request req = new Request();
                            req.setVersion(Version.getProtocolVersion());
                            req.setTwoWay(true);
                            req.setEvent(Request.HEARTBEAT_EVENT);
                            channel.send(req);
                            if (logger.isDebugEnabled()) {
                                logger.debug("Send heartbeat to remote channel " + channel.getRemoteAddress()
                                        + ", cause: The channel has no data-transmission exceeds a heartbeat period: " + heartbeat + "ms");
                            }
                        }
                        if (lastRead != null && now - lastRead > heartbeatTimeout) {
                            logger.warn("Close channel " + channel
                                    + ", because heartbeat read idle time out: " + heartbeatTimeout + "ms");
                            if (channel instanceof Client) {
                                try {
                                    ((Client) channel).reconnect();
                                } catch (Exception e) {
                                    //do nothing
                                }
                            } else {
                                channel.close();
                            }
                        }
                    } catch (Throwable t) {
                        logger.warn("Exception when heartbeat to remote channel " + channel.getRemoteAddress(), t);
                    }
                }
            } catch (Throwable t) {
                logger.warn("Unhandled exception when heartbeat, cause: " + t.getMessage(), t);
            }
        }
    
        interface ChannelProvider {
            Collection<Channel> getChannels();
        }
    
    }
    
    • HeartBeatTask会根据channel的lastRead或lastWrite的时间戳来判断需要重发心跳或者重新建立连接。
    • 针对需要重发心跳的逻辑就是简单的发送HEARTBEAT_EVENT心跳类型的报文即可。
    • lastRead和lastWrite时间戳会在HeartbeatHandler的读写事件中被更新。
    • Dubbo的心跳报文会在需要发送的时候才会发送,因为复用了正常报文的收发来修改上次读写时间戳。

    HeartbeatHandler

    public class HeartbeatHandler extends AbstractChannelHandlerDelegate {
    
        private static final Logger logger = LoggerFactory.getLogger(HeartbeatHandler.class);
    
        public static String KEY_READ_TIMESTAMP = "READ_TIMESTAMP";
    
        public static String KEY_WRITE_TIMESTAMP = "WRITE_TIMESTAMP";
    
        public HeartbeatHandler(ChannelHandler handler) {
            super(handler);
        }
    
        @Override
        public void connected(Channel channel) throws RemotingException {
            setReadTimestamp(channel);
            setWriteTimestamp(channel);
            handler.connected(channel);
        }
    
        @Override
        public void disconnected(Channel channel) throws RemotingException {
            clearReadTimestamp(channel);
            clearWriteTimestamp(channel);
            handler.disconnected(channel);
        }
    
        @Override
        public void sent(Channel channel, Object message) throws RemotingException {
            setWriteTimestamp(channel);
            handler.sent(channel, message);
        }
    
        @Override
        public void received(Channel channel, Object message) throws RemotingException {
            setReadTimestamp(channel);
            if (isHeartbeatRequest(message)) {
                Request req = (Request) message;
                if (req.isTwoWay()) {
                    Response res = new Response(req.getId(), req.getVersion());
                    res.setEvent(Response.HEARTBEAT_EVENT);
                    channel.send(res);
                    if (logger.isInfoEnabled()) {
                        int heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
                        if (logger.isDebugEnabled()) {
                            logger.debug("Received heartbeat from remote channel " + channel.getRemoteAddress()
                                    + ", cause: The channel has no data-transmission exceeds a heartbeat period"
                                    + (heartbeat > 0 ? ": " + heartbeat + "ms" : ""));
                        }
                    }
                }
                return;
            }
            if (isHeartbeatResponse(message)) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Receive heartbeat response in thread " + Thread.currentThread().getName());
                }
                return;
            }
            handler.received(channel, message);
        }
    
        private void setReadTimestamp(Channel channel) {
            channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
        }
    
        private void setWriteTimestamp(Channel channel) {
            channel.setAttribute(KEY_WRITE_TIMESTAMP, System.currentTimeMillis());
        }
    }
    
    • HeartbeatHandler的任何读写事件和连接事件的过程中都会刷新lastRead和lastWrite的值,保证心跳检测正常。
    • HeartbeatHandler的received操作当中会直接解析心跳报文并发送心跳响应。

    Handler的封装关系

    Handler的封装关系

    相关文章

      网友评论

        本文标题:Dubbo 2.6.5 心跳机制

        本文链接:https://www.haomeiwen.com/subject/tnpdqhtx.html