美文网首页
rocketMq broker主从复制(HA策略)

rocketMq broker主从复制(HA策略)

作者: 圣村的希望 | 来源:发表于2018-12-07 21:32 被阅读0次

        rocketmq为了保证高可用,可以为一个master broker添加一个或多个slave broker,这样就可以在一个broker宕机后,不至于影响分布式消息队列的工作。在broker的配置中,brokerId为0表示master broker,brokerId非0表示slave broker(但是slave broker会配置为1),brokerRole用来确认broker的角色。

    public enum BrokerRole {
        ASYNC_MASTER,//异步复制master
        SYNC_MASTER,//同步复制master
        SLAVE;//slave broker
    }
    

      broker主从复制主要是复制两方面信息:
        1.同步元数据信息
        2.同步master消息(CommitLog数据)

      1、broker元数据信息同步:

    //如果broker为slave时,在初始化的时候启用定时任务同步master的broker元数据信息
                if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
                    //如果broker配置了master broker的ip时, 更新HAService类haClient字段的masterAddress属性
                    if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
                        this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
                        this.updateMasterHAServerAddrPeriodically = false;
                    } else {
                        this.updateMasterHAServerAddrPeriodically = true;
                    }
                    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                        @Override
                        public void run() {
                            try {
                                BrokerController.this.slaveSynchronize.syncAll();
                            } catch (Throwable e) {
                                log.error("ScheduledTask syncAll slave exception", e);
                            }
                        }
                    }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
                } else {
                    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                        @Override
                        public void run() {
                            try {
                                BrokerController.this.printMasterAndSlaveDiff();
                            } catch (Throwable e) {
                                log.error("schedule printMasterAndSlaveDiff error.", e);
                            }
                        }
                    }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
                }
    

      在broker启动初始化的时候,如果配置的broker角色为slave的时候会进行同步master broker的元数据信息,这里如果broker配置文件里面配置了master broker的ip的时候就直接更新HAService的haClient字段里面的masterAddress信息,否则是在broker启动的时候start()方法里面进行slave broker的masterAddress的设置。这里同步也是启动一个定时任务每隔60秒进行元数据同步,具体同步是在SlaveSynchronize类里面进行的。

    //在broker启动的时候,start()的时候如果broker为slave角色时会进行设置masterAddress
    if (registerBrokerResult != null) {
                if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
                    this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
                }
    
    //这个是设置broker元数据同步时的master broker地址          
     this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());
     if (checkOrderConfig) {
            this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
                }
           }
    

      这是broker在启动的时候start()方法里面进行的masterAddress的设置

    public void syncAll() {
            this.syncTopicConfig();//同步topic的配置信息
            this.syncConsumerOffset();//同步consumer的offset信息
            this.syncDelayOffset();//同步延迟队列信息
            this.syncSubscriptionGroupConfig();//同步订阅信息
        }
    

      SlaveSynchronize类里的syncAll里面实现了所有同步的元数据信息,主要包含topic的配置信息,consumer的offset信息等。这里面最终是通过Netty去master broker获取元数据信息。
      2、消息体同步(CommitLog同步):
        消息体的同步就稍微比较复杂,需要进行实时同步,如果同步master的commitlog数据不及时,一旦master宕机就会出现消息丢失。所以这里就有同步复制和异步复制,通过前面提到的brokerRole来决定。消息体同步主要用到了HAService、HAClient和WaitNotifyObject这三个类。

    • 同步复制消息体:在master broker处理producer发送的消息的时候,会进行主从同步处理,CommitLog->putMessage(msg)。
    //处理高可用,主从同步的
    handleHA(result, putMessageResult, msg);
    
    public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
            //如果broker为SYNC_MASTER角色时,会进行同步复制
            if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
                HAService service = this.defaultMessageStore.getHaService();
                if (messageExt.isWaitStoreMsgOK()) {
                    // Determine whether to wait
                    if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
                        GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
                        service.putRequest(request);
                        service.getWaitNotifyObject().wakeupAll();
                        boolean flushOK =
                            request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
                        if (!flushOK) {
                            log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: "
                                + messageExt.getTags() + " client address: " + messageExt.getBornHostNameString());
                            putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
                        }
                    }
                    // Slave problem
                    else {
                        // Tell the producer, slave not available
                        putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
                    }
                }
            }
    
        }
    

    HAService会向其groupTransferService添加一个GroupCommitRequest任务,然后唤醒WriteSocketService通知master进行commitLog同步,然后master收到通知开始把commitLog发送到slave,request.waitForFlush等待slave获取master的commitLog同步,在slave broker获取到master broker的通知后,其HAService的GroupTransferService中的requestsRead列表中就有数据,就可以在GroupTransferService中进行处理。

    private void doWaitTransfer() {
                synchronized (this.requestsRead) {
                    if (!this.requestsRead.isEmpty()) {
                        for (CommitLog.GroupCommitRequest req : this.requestsRead) {
                            boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
                            for (int i = 0; !transferOK && i < 5; i++) {
                                this.notifyTransferObject.waitForRunning(1000);
                                transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
                            }
    
                            if (!transferOK) {
                                log.warn("transfer messsage to slave timeout, " + req.getNextOffset());
                            }
    
                            req.wakeupCustomer(transferOK);
                        }
    
                        this.requestsRead.clear();
                    }
                }
            }
    

    HAService在启动的时候也会启动HAService中的GroupTransferService,他会循环执行doWaitTransfer( )方法,处理GroupTransferService中的master已同步的GroupCommitRequest,req.wakeupCustomer(transferOK)唤醒前面等待的request.waitForFlush线程,让其handleHA得以返回。

    • 异步复制消息体:异步复制master消息体是在slave第一次启动的时候会上报自己的maxOffset,后台线程会默认每隔5秒向master上报自己的maxOffset。
    public HAService(final DefaultMessageStore defaultMessageStore) throws IOException {
            this.defaultMessageStore = defaultMessageStore;
            this.acceptSocketService =
                new AcceptSocketService(defaultMessageStore.getMessageStoreConfig().getHaListenPort());
            this.groupTransferService = new GroupTransferService();
            this.haClient = new HAClient();
        }
    
    public void run() {
                log.info(this.getServiceName() + " service started");
    
                while (!this.isStopped()) {
                    try {
                        if (this.connectMaster()) { //和master建立链接
    
                            if (this.isTimeToReportOffset()) {//每隔5秒向master上报一次maxOffset信息
                                boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
                                if (!result) {
                                    this.closeMaster();
                                }
                            }
    
                            this.selector.select(1000);
    
                            boolean ok = this.processReadEvent();
                            if (!ok) {
                                this.closeMaster();
                            }
    
                            if (!reportSlaveMaxOffsetPlus()) {
                                continue;
                            }
    
                            long interval =
                                HAService.this.getDefaultMessageStore().getSystemClock().now()
                                    - this.lastWriteTimestamp;
                            if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig()
                                .getHaHousekeepingInterval()) {
                                log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress
                                    + "] expired, " + interval);
                                this.closeMaster();
                                log.warn("HAClient, master not response some time, so close connection");
                            }
                        } else {
                            this.waitForRunning(1000 * 5);
                        }
                    } catch (Exception e) {
                        log.warn(this.getServiceName() + " service has exception. ", e);
                        this.waitForRunning(1000 * 5);
                    }
                }
    
                log.info(this.getServiceName() + " service end");
            }
    

    在HAService创建的时候会去创建HAClient,他是一个后台线程,他会默认每隔5秒向master报告自己的maxOffset信息,这个时候才触发开启了消息体的同步,报告失败会关闭连接。slave和master之间的互相传输的第一步是slave迈出的,因为他不知道slave broker的maxOffset,不知道从哪里同步起。

    class AcceptSocketService extends ServiceThread {
    @Override
            public void run() {
                log.info(this.getServiceName() + " service started");
    
                while (!this.isStopped()) {
                    try {
                        this.selector.select(1000);
                        Set<SelectionKey> selected = this.selector.selectedKeys();
    
                        if (selected != null) {
                            for (SelectionKey k : selected) {
                                if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
                                    SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
    
                                    if (sc != null) {
                                        HAService.log.info("HAService receive new connection, "
                                            + sc.socket().getRemoteSocketAddress());
    
                                        try {
                                            HAConnection conn = new HAConnection(HAService.this, sc);
                                            conn.start();
                                            HAService.this.addConnection(conn);
                                        } catch (Exception e) {
                                            log.error("new HAConnection exception", e);
                                            sc.close();
                                        }
                                    }
                                } else {
                                    log.warn("Unexpected ops in select " + k.readyOps());
                                }
                            }
    
                            selected.clear();
                        }
                    } catch (Exception e) {
                        log.error(this.getServiceName() + " service has exception.", e);
                    }
                }
    
                log.info(this.getServiceName() + " service end");
            }
    }
    

    HAService在创建的时候也会创建AcceptSocketService对象,也是一个后台线程,用来接收处理socket请求的,然后创建一个HAConnection封装对应的socket连接进行请求处理。

    public HAConnection(final HAService haService, final SocketChannel socketChannel) throws IOException {
            this.haService = haService;
            this.socketChannel = socketChannel;
            this.clientAddr = this.socketChannel.socket().getRemoteSocketAddress().toString();
            this.socketChannel.configureBlocking(false);
            this.socketChannel.socket().setSoLinger(false, -1);
            this.socketChannel.socket().setTcpNoDelay(true);
            this.socketChannel.socket().setReceiveBufferSize(1024 * 64);
            this.socketChannel.socket().setSendBufferSize(1024 * 64);
            this.writeSocketService = new WriteSocketService(this.socketChannel);
            this.readSocketService = new ReadSocketService(this.socketChannel);
            this.haService.getConnectionCount().incrementAndGet();
        }
    

    HAConnection实例化的时候会生成writeSocketService和readSocketService两个对象,用来写commitLog数据到通道和从通道读取commitLog数据。

    master broker收到slave broker上传的maxOffset信息时,会交给ReadSocketService进行处理,获得slave的maxOffset后设置slaveRequestOffset属性值,WriteSocketService在发现slaveRequestOffset不等于-1时就知道从哪里开始同步commitLog给slave broker了。

    相关文章

      网友评论

          本文标题:rocketMq broker主从复制(HA策略)

          本文链接:https://www.haomeiwen.com/subject/oekxhqtx.html