美文网首页
rocketMq broker busy分析

rocketMq broker busy分析

作者: 圣村的希望 | 来源:发表于2019-11-02 16:35 被阅读0次

      这个分析的思路来源于前同事的提问,前同事线上环境阿里云Ons日志显示broker busy异常,异常信息如下:

    at com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeHandler.handleRequest(HeaderExchangeHandler.java:86)
     at com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeHandler.received(HeaderExchangeHandler.java:172)
     at com.alibaba.dubbo.remoting.rpc.http.codec.HttpRequestConvertHandler.received(HttpRequestConvertHandler.java:149)
     at com.alibaba.dubbo.remoting.transport.DecodeHandler.received(DecodeHandler.java:52)
     at com.alibaba.dubbo.remoting.transport.dispatcher.ChannelEventRunnable.run(ChannelEventRunnable.java:82)
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
     at java.lang.Thread.run(Thread.java:745)
    Caused by: com.aliyun.openservices.ons.api.exception.ONSClientException: defaultMQProducer send exception
     at com.aliyun.openservices.ons.api.impl.rocketmq.ProducerImpl.checkProducerException(ProducerImpl.java:208)
     at com.aliyun.openservices.ons.api.impl.rocketmq.ProducerImpl.send(ProducerImpl.java:107)
     at com.souche.optimus.mq.aliyunons.ONSProducerInvoker.send(ONSProducerInvoker.java:170)
     ... 40 more
    Caused by: com.aliyun.openservices.shade.com.alibaba.rocketmq.client.exception.MQBrokerException: CODE: 2  DESC: [TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: 3008ms, size of queue: 3086
    For more information, please visit the url, http://rocketmq.apache.org/docs/faq/
     at com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.MQClientAPIImpl.processSendResponse(MQClientAPIImpl.java:551)
     at com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:353)
     at com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:335)
     at com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:298)
     at com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:696)
     at com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:463)
     at com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1065)
     at com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1023)
     at com.aliyun.openservices.shade.com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:212)
     at com.aliyun.openservices.ons.api.impl.rocketmq.ProducerImpl.send(ProducerImpl.java:98)
     ... 41 m
    

        拿到异常信息,第一时间根据异常提示信息broker busy, start flow去源码中进行搜索,定位打印异常信息位置。

    case OS_PAGECACHE_BUSY:
                    response.setCode(ResponseCode.SYSTEM_ERROR);
                    response.setRemark("[PC_SYNCHRONIZED]broker busy, start flow control for a while");
                    break;
    

        可以看到这里是broker端os pageCache busy,rocketMq消息持久化是先写入page cache中后由操作系统进行刷盘操作。所以这里是消息写入到cache中时就报错了,总结说,就是broker在处理消息写入page cache持久化的时候系统写繁忙异常。

    //操作系统是否页繁忙
    if (this.isOSPageCacheBusy()) {
                return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
    }
    //操作系统是否繁忙
    public boolean isOSPageCacheBusy() {
            long begin = this.getCommitLog().getBeginTimeInLock();
            long diff = this.systemClock.now() - begin;
            return diff < 10000000
                    && diff > this.messageStoreConfig.getOsPageCacheBusyTimeOutMills();
    }
    
    //默认是用自旋锁,现货去锁再把message存到MappedBuffer中
            putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
            long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
            this.beginTimeInLock = beginLockTimestamp;
    

        操作系统判断是否页写入繁忙逻辑:

    • 获取CommitLog获取锁的开始时间,因为broker把本机所有topic消息的写入都采用追加到同一个CommitLog中,所以采用锁来进行并发写入的控制。
    • CommitLog中的锁有两种实现形式,ReentrantLock(非公平)和Spin Lock两种形式,默认采用Spin Lock形式,锁的实现比较简单,这里就不进行展开。
    • 计算CommitLog锁持有时间 = 当前时间 - 获取锁开始时间,page cache timeout默认配置的时间是1000ms,也即1s,所以broker在处理上一个消息写入的时候占有锁的时间超过1s的时候就会被认为page cache busy。
    case OS_PAGECACHE_BUSY:
                    response.setCode(ResponseCode.SYSTEM_ERROR);
                    response.setRemark("[PC_SYNCHRONIZED]broker busy, start flow control for a while");
                    break;
    

        在broker端出现异常的时候,给到客户端的都是对应的错误码,而不是把异常往外抛,这里指的借鉴。在这里broker给到client端的是ResponseCode.SYSTEM_ERROR。很重要一点,rocketMq中没有把具体业务异常往client端抛,而是都封装起来通过错误码作为结果给到客户端,除了发生了Netty网络通信的异常。
        接着看下client端对broker page cache是怎么处理的:

    private SendResult processSendResponse(
            final String brokerName,
            final Message msg,
            final RemotingCommand response
        ) throws MQBrokerException, RemotingCommandException {
            switch (response.getCode()) {
                case ResponseCode.FLUSH_DISK_TIMEOUT:
                case ResponseCode.FLUSH_SLAVE_TIMEOUT:
                case ResponseCode.SLAVE_NOT_AVAILABLE: {
                }
                case ResponseCode.SUCCESS: {
                    SendStatus sendStatus = SendStatus.SEND_OK;
                    switch (response.getCode()) {
                        case ResponseCode.FLUSH_DISK_TIMEOUT:
                            sendStatus = SendStatus.FLUSH_DISK_TIMEOUT;
                            break;
                        case ResponseCode.FLUSH_SLAVE_TIMEOUT:
                            sendStatus = SendStatus.FLUSH_SLAVE_TIMEOUT;
                            break;
                        case ResponseCode.SLAVE_NOT_AVAILABLE:
                            sendStatus = SendStatus.SLAVE_NOT_AVAILABLE;
                            break;
                        case ResponseCode.SUCCESS:
                            sendStatus = SendStatus.SEND_OK;
                            break;
                        default:
                            assert false;
                            break;
                    }
    
                    SendMessageResponseHeader responseHeader =
                        (SendMessageResponseHeader) response.decodeCommandCustomHeader(SendMessageResponseHeader.class);
    
                    //...
                    
                    sendResult.setRegionId(regionId);
                    return sendResult;
                }
                default:
                    break;
            }
    
            throw new MQBrokerException(response.getCode(), response.getRemark());
        }
    

        可以看到Client端对broker返回的system error是包装成MQBrokerException抛出去的,最后回到producer send message的地方:

    for (; times < timesTotal; times++) {
                    String lastBrokerName = null == mq ? null : mq.getBrokerName();
                    //选择一个MessageQueue进行发送,会进行client端发送消息的负载均衡,总是不会发送到上次刚发送的broker
                    MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);//选择哪个messageQueue
                    if (mqSelected != null) {
                        mq = mqSelected;
                        brokersSent[times] = mq.getBrokerName();
                        try {
                            beginTimestampPrev = System.currentTimeMillis();
                            //发送消息的和兴,组装发送消息请求对象进行发送消息
                            sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
                            endTimestamp = System.currentTimeMillis();
                            this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                            //进行消息发送模式匹配
                            switch (communicationMode) {
                                case ASYNC:
                                    return null;
                                case ONEWAY:
                                    return null;
                                case SYNC:
                                    if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                        //发送失败时是否重试另一个broker
                                        if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                            continue;
                                        }
                                    }
    
                                    return sendResult;
                                default:
                                    break;
                            }
                        } catch (RemotingException e) {
                            endTimestamp = System.currentTimeMillis();
                            this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                            log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                            log.warn(msg.toString());
                            exception = e;
                            continue;
                        } catch (MQClientException e) {
                            endTimestamp = System.currentTimeMillis();
                            this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                            log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                            log.warn(msg.toString());
                            exception = e;
                            continue;
                        } catch (MQBrokerException e) {
                            endTimestamp = System.currentTimeMillis();
                            this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                            log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                            log.warn(msg.toString());
                            exception = e;
                            switch (e.getResponseCode()) {
                                case ResponseCode.TOPIC_NOT_EXIST:
                                case ResponseCode.SERVICE_NOT_AVAILABLE:
                                case ResponseCode.SYSTEM_ERROR:
                                case ResponseCode.NO_PERMISSION:
                                case ResponseCode.NO_BUYER_ID:
                                case ResponseCode.NOT_IN_CURRENT_UNIT:
                                    continue;
                                default:
                                    if (sendResult != null) {
                                        return sendResult;
                                    }
    
                                    throw e;
                            }
                        } catch (InterruptedException e) {
                            endTimestamp = System.currentTimeMillis();
                            this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                            log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                            log.warn(msg.toString());
    
                            log.warn("sendKernelImpl exception", e);
                            log.warn(msg.toString());
                            throw e;
                        }
                    } else {
                        break;
                    }
                }
    

        可以看到,在client端发送消息的时候,如果发生了MQBrokerException的时候,获取到broker端返回的response,response code = system_error的时候就continue,同步消息会进行重试两次,异步消息和oneway消息不会进行重试,所以这两种模式在这种情况下就会丢消息。
    注意:
        broker在处理消息写入的时候出现page cache busy的时候,因为是在等待broker CommitLog的锁的,还没有获取锁成功,是还没有写入到PageCache的,所以更没有写入到磁盘中。

    出现这个问题的解决办法:
        出现这个问题就是broker处理message发送处理不过来,所以可以通过提升当前机器的性能或者再添加一个broker分走当前broker相应的message queue走。

    相关文章

      网友评论

          本文标题:rocketMq broker busy分析

          本文链接:https://www.haomeiwen.com/subject/rwthbctx.html