美文网首页
RocketMQ源码之HA(高可用)

RocketMQ源码之HA(高可用)

作者: 激情的狼王 | 来源:发表于2018-05-09 15:47 被阅读0次

    RocketMQ的HA主要体现在服务器端的Namesrv和broker的配置上,既然是进行了HA,那么肯定是集群来保证的,我们一一来看。

    Namesrv高可用

    Namesrv集群启动多个 Namesrv实例实现高可用,各个Namesrv节点之间的关系比较特殊:

    各个Namesrv节点之间没有任何关联关系,不进行通信和数据交换,仅仅作为负载节点而存在,当有节点挂掉时,其它节点不会受影响,而是继续提供服务,除非所有机器都挂掉,这时Namesrv集群才会瘫痪。

    1.broker注册namesrv的代码

    public RegisterBrokerResult registerBrokerAll( 一堆入参) {
            RegisterBrokerResult registerBrokerResult = null;
    //步骤1
            List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
            if (nameServerAddressList != null) {
                for (String namesrvAddr : nameServerAddressList) {
                    try {
    //步骤2
                        RegisterBrokerResult result = this.registerBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId,
                            haServerAddr, topicConfigWrapper, filterServerList, oneway, timeoutMills);
                        if (result != null) {
                            registerBrokerResult = result;
                        }
    
                        log.info("register broker to name server {} OK", namesrvAddr);
                    } catch (Exception e) {
                        log.warn("registerBroker Exception, {}", namesrvAddr, e);
                    }
                }
            }
    
            return registerBrokerResult;
    }
    

    1.获取namesrv全部节点列表nameServerAddressList
    2.循环namesrv,该broker把自己注册到每个namesrv上,由此来保证所有的namesrv节点的数据是一致的。

    2.Producer访问 Namesrv

    private Channel getAndCreateNameserverChannel() throws InterruptedException {
            ···
            ···
            final List<String> addrList = this.namesrvAddrList.get();
            if (this.lockNamesrvChannel.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
                try {
                    addr = this.namesrvAddrChoosed.get();
                    if (addr != null) {
                        ChannelWrapper cw = this.channelTables.get(addr);
                        if (cw != null && cw.isOK()) {
                            return cw.getChannel();
                        }
                    }
    
                    if (addrList != null && !addrList.isEmpty()) {
                        for (int i = 0; i < addrList.size(); i++) {
                            int index = this.namesrvIndex.incrementAndGet();
                            index = Math.abs(index);
                            index = index % addrList.size();
                            String newAddr = addrList.get(index);
    
                            this.namesrvAddrChoosed.set(newAddr);
                            log.info("new name server is chosen. OLD: {} , NEW: {}. namesrvIndex = {}", addr, newAddr, namesrvIndex);
                            Channel channelNew = this.createChannel(newAddr);
                            if (channelNew != null)
                                return channelNew;
                        }
                    }
                } catch (Exception e) {
                    log.error("getAndCreateNameserverChannel: create name server channel exception", e);
                } finally {
                    this.lockNamesrvChannel.unlock();
                }
            } else {
                log.warn("getAndCreateNameserverChannel: try to lock name server, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
            }
    
            return null;
        }
    

    生产者、消费者都是从namesrv列表中选择可用的节点进行连接。

    Broker高可用

    Broker的一个主从组:Master节点x1 + Slave节点xN,
    Master节点提供读写服务,Slave节点只提供读服务。

    每个主从组,Master节点 不断发送新的 CommitLog 给 Slave节点。 Slave节点 不断上报本地的 CommitLog 已经同步到的位置给 Master节点。

    集群内,Master节点 有两种类型:同步和异步:前者在 Producer 发送消息时,等待 Slave节点 存储完毕后再返回发送结果,而后者不需要等待。

    1.主节点逻辑代码

    public void run() {
                HAConnection.log.info(this.getServiceName() + " service started");
    
                while (!this.isStopped()) {
                    try {
                        this.selector.select(1000);
    //步骤1
                        boolean ok = this.processReadEvent();
                        if (!ok) {
                            HAConnection.log.error("processReadEvent error");
                            break;
                        }
    //步骤2
                        long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastReadTimestamp;
                        if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) {
                            log.warn("ha housekeeping, found this connection[" + HAConnection.this.clientAddr + "] expired, " + interval);
                            break;
                        }
                    } catch (Exception e) {
                        HAConnection.log.error(this.getServiceName() + " service has exception.", e);
                        break;
                    }
                }
              ···
              ···
              ···
            }
    

    1.this.processReadEvent()来计算slave请求同步的CommitLog的位置
    2.判断是否连接超时

    2.WriteSocketService向 Slave 传输新的 CommitLog数据

    01.png

    3.Slave 循环
    实现从 Master 传输 CommitLog 数据,上传 Master 自己本地的 CommitLog 已经同步物理位置。

    public void run() {
                log.info(this.getServiceName() + " service started");
    
                while (!this.isStopped()) {
                    try {
    //步骤1
                        if (this.connectMaster()) {
    
                            if (this.isTimeToReportOffset()) {
                                boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
                                if (!result) {
                                    this.closeMaster();
                                }
                            }
    
                            this.selector.select(1000);
    //步骤2
                            boolean ok = this.processReadEvent();
                            if (!ok) {
                                this.closeMaster();
                            }
    
                            if (!reportSlaveMaxOffsetPlus()) {
                                continue;
                            }
    //步骤3
                            long interval =
                                HAService.this.getDefaultMessageStore().getSystemClock().now()
                                    - this.lastWriteTimestamp;
                            if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig()
                                .getHaHousekeepingInterval()) {
                                log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress
                                    + "] expired, " + interval);
                                this.closeMaster();
                                log.warn("HAClient, master not response some time, so close connection");
                            }
                        } else {
                            this.waitForRunning(1000 * 5);
                        }
                    } catch (Exception e) {
                        log.warn(this.getServiceName() + " service has exception. ", e);
                        this.waitForRunning(1000 * 5);
                    }
                }
    
                log.info(this.getServiceName() + " service end");
            }
    

    1.判断上报master的时间间隔并返回result,起到了心跳作用
    2.同master的该方法,计算CommitLog偏移量
    3.Master过久未返回数据,关闭连接

    4.Producer 发送消息

    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
    //判断是否开启了开关
            if (this.sendLatencyFaultEnable) {
                try {
    //获取一个可用的并且brokerName=lastBrokerName的消息队列
                    int index = tpInfo.getSendWhichQueue().getAndIncrement();
                    for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                        int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                        if (pos < 0)
                            pos = 0;
                        MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                        if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                            if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                                return mq;
                        }
                    }
    //选择一个相对好的broker,不考虑可用性的消息队列
                    final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                    int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                    if (writeQueueNums > 0) {
                        final MessageQueue mq = tpInfo.selectOneMessageQueue();
                        if (notBestBroker != null) {
                            mq.setBrokerName(notBestBroker);
                            mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                        }
                        return mq;
                    } else {
                        latencyFaultTolerance.remove(notBestBroker);
                    }
                } catch (Exception e) {
                    log.error("Error occurred when selecting message queue", e);
                }
    //随机选择一个消息队列
                return tpInfo.selectOneMessageQueue();
            }
    //获得 lastBrokerName 对应的一个消息队列,不考虑该队列的可用性
            return tpInfo.selectOneMessageQueue(lastBrokerName);
    }
    

    我们来看选择broker的逻辑:

    1.首先选择一个broker==lastBrokerName并且可用的一个队列(也就是该队列并没有因为延迟过长而被加进了延迟容错对象latencyFaultTolerance 中)
    2.如果第一步中没有找到合适的队列,此时舍弃broker==lastBrokerName这个条件,选择一个相对较好的broker来发送
    3.随机选择一个队列来发送

    总结

    1.RocketMQ通过启动多个 【Broker主从组】 形成 集群 实现Broker的高可用。
    2.Broker主从组 与 Broker主从组 之间没有任何关系,不进行通信与数据同步。
    3.Namesrv各节点之间类似于Broker主从组之间的关系,相互独立,共同负载,不进行通信与数据同步。

    相关文章

      网友评论

          本文标题:RocketMQ源码之HA(高可用)

          本文链接:https://www.haomeiwen.com/subject/ppbnrftx.html