美文网首页
rocketMq broker busy分析

rocketMq broker busy分析

作者: 圣村的希望 | 来源:发表于2019-11-02 16:35 被阅读0次

  这个分析的思路来源于前同事的提问,前同事线上环境阿里云Ons日志显示broker busy异常,异常信息如下:

at com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeHandler.handleRequest(HeaderExchangeHandler.java:86)
 at com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeHandler.received(HeaderExchangeHandler.java:172)
 at com.alibaba.dubbo.remoting.rpc.http.codec.HttpRequestConvertHandler.received(HttpRequestConvertHandler.java:149)
 at com.alibaba.dubbo.remoting.transport.DecodeHandler.received(DecodeHandler.java:52)
 at com.alibaba.dubbo.remoting.transport.dispatcher.ChannelEventRunnable.run(ChannelEventRunnable.java:82)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
Caused by: com.aliyun.openservices.ons.api.exception.ONSClientException: defaultMQProducer send exception
 at com.aliyun.openservices.ons.api.impl.rocketmq.ProducerImpl.checkProducerException(ProducerImpl.java:208)
 at com.aliyun.openservices.ons.api.impl.rocketmq.ProducerImpl.send(ProducerImpl.java:107)
 at com.souche.optimus.mq.aliyunons.ONSProducerInvoker.send(ONSProducerInvoker.java:170)
 ... 40 more
Caused by: com.aliyun.openservices.shade.com.alibaba.rocketmq.client.exception.MQBrokerException: CODE: 2  DESC: [TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: 3008ms, size of queue: 3086
For more information, please visit the url, http://rocketmq.apache.org/docs/faq/
 at com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.MQClientAPIImpl.processSendResponse(MQClientAPIImpl.java:551)
 at com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:353)
 at com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:335)
 at com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:298)
 at com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:696)
 at com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:463)
 at com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1065)
 at com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1023)
 at com.aliyun.openservices.shade.com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:212)
 at com.aliyun.openservices.ons.api.impl.rocketmq.ProducerImpl.send(ProducerImpl.java:98)
 ... 41 m

    拿到异常信息,第一时间根据异常提示信息broker busy, start flow去源码中进行搜索,定位打印异常信息位置。

case OS_PAGECACHE_BUSY:
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark("[PC_SYNCHRONIZED]broker busy, start flow control for a while");
                break;

    可以看到这里是broker端os pageCache busy,rocketMq消息持久化是先写入page cache中后由操作系统进行刷盘操作。所以这里是消息写入到cache中时就报错了,总结说,就是broker在处理消息写入page cache持久化的时候系统写繁忙异常。

//操作系统是否页繁忙
if (this.isOSPageCacheBusy()) {
            return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
}
//操作系统是否繁忙
public boolean isOSPageCacheBusy() {
        long begin = this.getCommitLog().getBeginTimeInLock();
        long diff = this.systemClock.now() - begin;
        return diff < 10000000
                && diff > this.messageStoreConfig.getOsPageCacheBusyTimeOutMills();
}

//默认是用自旋锁,现货去锁再把message存到MappedBuffer中
        putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
        long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
        this.beginTimeInLock = beginLockTimestamp;

    操作系统判断是否页写入繁忙逻辑:

  • 获取CommitLog获取锁的开始时间,因为broker把本机所有topic消息的写入都采用追加到同一个CommitLog中,所以采用锁来进行并发写入的控制。
  • CommitLog中的锁有两种实现形式,ReentrantLock(非公平)和Spin Lock两种形式,默认采用Spin Lock形式,锁的实现比较简单,这里就不进行展开。
  • 计算CommitLog锁持有时间 = 当前时间 - 获取锁开始时间,page cache timeout默认配置的时间是1000ms,也即1s,所以broker在处理上一个消息写入的时候占有锁的时间超过1s的时候就会被认为page cache busy。
case OS_PAGECACHE_BUSY:
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark("[PC_SYNCHRONIZED]broker busy, start flow control for a while");
                break;

    在broker端出现异常的时候,给到客户端的都是对应的错误码,而不是把异常往外抛,这里指的借鉴。在这里broker给到client端的是ResponseCode.SYSTEM_ERROR。很重要一点,rocketMq中没有把具体业务异常往client端抛,而是都封装起来通过错误码作为结果给到客户端,除了发生了Netty网络通信的异常。
    接着看下client端对broker page cache是怎么处理的:

private SendResult processSendResponse(
        final String brokerName,
        final Message msg,
        final RemotingCommand response
    ) throws MQBrokerException, RemotingCommandException {
        switch (response.getCode()) {
            case ResponseCode.FLUSH_DISK_TIMEOUT:
            case ResponseCode.FLUSH_SLAVE_TIMEOUT:
            case ResponseCode.SLAVE_NOT_AVAILABLE: {
            }
            case ResponseCode.SUCCESS: {
                SendStatus sendStatus = SendStatus.SEND_OK;
                switch (response.getCode()) {
                    case ResponseCode.FLUSH_DISK_TIMEOUT:
                        sendStatus = SendStatus.FLUSH_DISK_TIMEOUT;
                        break;
                    case ResponseCode.FLUSH_SLAVE_TIMEOUT:
                        sendStatus = SendStatus.FLUSH_SLAVE_TIMEOUT;
                        break;
                    case ResponseCode.SLAVE_NOT_AVAILABLE:
                        sendStatus = SendStatus.SLAVE_NOT_AVAILABLE;
                        break;
                    case ResponseCode.SUCCESS:
                        sendStatus = SendStatus.SEND_OK;
                        break;
                    default:
                        assert false;
                        break;
                }

                SendMessageResponseHeader responseHeader =
                    (SendMessageResponseHeader) response.decodeCommandCustomHeader(SendMessageResponseHeader.class);

                //...
                
                sendResult.setRegionId(regionId);
                return sendResult;
            }
            default:
                break;
        }

        throw new MQBrokerException(response.getCode(), response.getRemark());
    }

    可以看到Client端对broker返回的system error是包装成MQBrokerException抛出去的,最后回到producer send message的地方:

for (; times < timesTotal; times++) {
                String lastBrokerName = null == mq ? null : mq.getBrokerName();
                //选择一个MessageQueue进行发送,会进行client端发送消息的负载均衡,总是不会发送到上次刚发送的broker
                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);//选择哪个messageQueue
                if (mqSelected != null) {
                    mq = mqSelected;
                    brokersSent[times] = mq.getBrokerName();
                    try {
                        beginTimestampPrev = System.currentTimeMillis();
                        //发送消息的和兴,组装发送消息请求对象进行发送消息
                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        //进行消息发送模式匹配
                        switch (communicationMode) {
                            case ASYNC:
                                return null;
                            case ONEWAY:
                                return null;
                            case SYNC:
                                if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                    //发送失败时是否重试另一个broker
                                    if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                        continue;
                                    }
                                }

                                return sendResult;
                            default:
                                break;
                        }
                    } catch (RemotingException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        continue;
                    } catch (MQClientException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        continue;
                    } catch (MQBrokerException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        switch (e.getResponseCode()) {
                            case ResponseCode.TOPIC_NOT_EXIST:
                            case ResponseCode.SERVICE_NOT_AVAILABLE:
                            case ResponseCode.SYSTEM_ERROR:
                            case ResponseCode.NO_PERMISSION:
                            case ResponseCode.NO_BUYER_ID:
                            case ResponseCode.NOT_IN_CURRENT_UNIT:
                                continue;
                            default:
                                if (sendResult != null) {
                                    return sendResult;
                                }

                                throw e;
                        }
                    } catch (InterruptedException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());

                        log.warn("sendKernelImpl exception", e);
                        log.warn(msg.toString());
                        throw e;
                    }
                } else {
                    break;
                }
            }

    可以看到,在client端发送消息的时候,如果发生了MQBrokerException的时候,获取到broker端返回的response,response code = system_error的时候就continue,同步消息会进行重试两次,异步消息和oneway消息不会进行重试,所以这两种模式在这种情况下就会丢消息。
注意:
    broker在处理消息写入的时候出现page cache busy的时候,因为是在等待broker CommitLog的锁的,还没有获取锁成功,是还没有写入到PageCache的,所以更没有写入到磁盘中。

出现这个问题的解决办法:
    出现这个问题就是broker处理message发送处理不过来,所以可以通过提升当前机器的性能或者再添加一个broker分走当前broker相应的message queue走。

相关文章

网友评论

      本文标题:rocketMq broker busy分析

      本文链接:https://www.haomeiwen.com/subject/rwthbctx.html