这个分析的思路来源于前同事的提问,前同事线上环境阿里云Ons日志显示broker busy异常,异常信息如下:
at com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeHandler.handleRequest(HeaderExchangeHandler.java:86)
at com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeHandler.received(HeaderExchangeHandler.java:172)
at com.alibaba.dubbo.remoting.rpc.http.codec.HttpRequestConvertHandler.received(HttpRequestConvertHandler.java:149)
at com.alibaba.dubbo.remoting.transport.DecodeHandler.received(DecodeHandler.java:52)
at com.alibaba.dubbo.remoting.transport.dispatcher.ChannelEventRunnable.run(ChannelEventRunnable.java:82)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.aliyun.openservices.ons.api.exception.ONSClientException: defaultMQProducer send exception
at com.aliyun.openservices.ons.api.impl.rocketmq.ProducerImpl.checkProducerException(ProducerImpl.java:208)
at com.aliyun.openservices.ons.api.impl.rocketmq.ProducerImpl.send(ProducerImpl.java:107)
at com.souche.optimus.mq.aliyunons.ONSProducerInvoker.send(ONSProducerInvoker.java:170)
... 40 more
Caused by: com.aliyun.openservices.shade.com.alibaba.rocketmq.client.exception.MQBrokerException: CODE: 2 DESC: [TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: 3008ms, size of queue: 3086
For more information, please visit the url, http://rocketmq.apache.org/docs/faq/
at com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.MQClientAPIImpl.processSendResponse(MQClientAPIImpl.java:551)
at com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:353)
at com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:335)
at com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:298)
at com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:696)
at com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:463)
at com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1065)
at com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1023)
at com.aliyun.openservices.shade.com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:212)
at com.aliyun.openservices.ons.api.impl.rocketmq.ProducerImpl.send(ProducerImpl.java:98)
... 41 m
拿到异常信息,第一时间根据异常提示信息broker busy, start flow去源码中进行搜索,定位打印异常信息位置。
case OS_PAGECACHE_BUSY:
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("[PC_SYNCHRONIZED]broker busy, start flow control for a while");
break;
可以看到这里是broker端os pageCache busy,rocketMq消息持久化是先写入page cache中后由操作系统进行刷盘操作。所以这里是消息写入到cache中时就报错了,总结说,就是broker在处理消息写入page cache持久化的时候系统写繁忙异常。
//操作系统是否页繁忙
if (this.isOSPageCacheBusy()) {
return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
}
//操作系统是否繁忙
public boolean isOSPageCacheBusy() {
long begin = this.getCommitLog().getBeginTimeInLock();
long diff = this.systemClock.now() - begin;
return diff < 10000000
&& diff > this.messageStoreConfig.getOsPageCacheBusyTimeOutMills();
}
//默认是用自旋锁,现货去锁再把message存到MappedBuffer中
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
this.beginTimeInLock = beginLockTimestamp;
操作系统判断是否页写入繁忙逻辑:
- 获取CommitLog获取锁的开始时间,因为broker把本机所有topic消息的写入都采用追加到同一个CommitLog中,所以采用锁来进行并发写入的控制。
- CommitLog中的锁有两种实现形式,ReentrantLock(非公平)和Spin Lock两种形式,默认采用Spin Lock形式,锁的实现比较简单,这里就不进行展开。
- 计算CommitLog锁持有时间 = 当前时间 - 获取锁开始时间,page cache timeout默认配置的时间是1000ms,也即1s,所以broker在处理上一个消息写入的时候占有锁的时间超过1s的时候就会被认为page cache busy。
case OS_PAGECACHE_BUSY:
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("[PC_SYNCHRONIZED]broker busy, start flow control for a while");
break;
在broker端出现异常的时候,给到客户端的都是对应的错误码,而不是把异常往外抛,这里指的借鉴。在这里broker给到client端的是ResponseCode.SYSTEM_ERROR。很重要一点,rocketMq中没有把具体业务异常往client端抛,而是都封装起来通过错误码作为结果给到客户端,除了发生了Netty网络通信的异常。
接着看下client端对broker page cache是怎么处理的:
private SendResult processSendResponse(
final String brokerName,
final Message msg,
final RemotingCommand response
) throws MQBrokerException, RemotingCommandException {
switch (response.getCode()) {
case ResponseCode.FLUSH_DISK_TIMEOUT:
case ResponseCode.FLUSH_SLAVE_TIMEOUT:
case ResponseCode.SLAVE_NOT_AVAILABLE: {
}
case ResponseCode.SUCCESS: {
SendStatus sendStatus = SendStatus.SEND_OK;
switch (response.getCode()) {
case ResponseCode.FLUSH_DISK_TIMEOUT:
sendStatus = SendStatus.FLUSH_DISK_TIMEOUT;
break;
case ResponseCode.FLUSH_SLAVE_TIMEOUT:
sendStatus = SendStatus.FLUSH_SLAVE_TIMEOUT;
break;
case ResponseCode.SLAVE_NOT_AVAILABLE:
sendStatus = SendStatus.SLAVE_NOT_AVAILABLE;
break;
case ResponseCode.SUCCESS:
sendStatus = SendStatus.SEND_OK;
break;
default:
assert false;
break;
}
SendMessageResponseHeader responseHeader =
(SendMessageResponseHeader) response.decodeCommandCustomHeader(SendMessageResponseHeader.class);
//...
sendResult.setRegionId(regionId);
return sendResult;
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark());
}
可以看到Client端对broker返回的system error是包装成MQBrokerException抛出去的,最后回到producer send message的地方:
for (; times < timesTotal; times++) {
String lastBrokerName = null == mq ? null : mq.getBrokerName();
//选择一个MessageQueue进行发送,会进行client端发送消息的负载均衡,总是不会发送到上次刚发送的broker
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);//选择哪个messageQueue
if (mqSelected != null) {
mq = mqSelected;
brokersSent[times] = mq.getBrokerName();
try {
beginTimestampPrev = System.currentTimeMillis();
//发送消息的和兴,组装发送消息请求对象进行发送消息
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
//进行消息发送模式匹配
switch (communicationMode) {
case ASYNC:
return null;
case ONEWAY:
return null;
case SYNC:
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
//发送失败时是否重试另一个broker
if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
continue;
}
}
return sendResult;
default:
break;
}
} catch (RemotingException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
continue;
} catch (MQClientException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
continue;
} catch (MQBrokerException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
switch (e.getResponseCode()) {
case ResponseCode.TOPIC_NOT_EXIST:
case ResponseCode.SERVICE_NOT_AVAILABLE:
case ResponseCode.SYSTEM_ERROR:
case ResponseCode.NO_PERMISSION:
case ResponseCode.NO_BUYER_ID:
case ResponseCode.NOT_IN_CURRENT_UNIT:
continue;
default:
if (sendResult != null) {
return sendResult;
}
throw e;
}
} catch (InterruptedException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
log.warn("sendKernelImpl exception", e);
log.warn(msg.toString());
throw e;
}
} else {
break;
}
}
可以看到,在client端发送消息的时候,如果发生了MQBrokerException的时候,获取到broker端返回的response,response code = system_error的时候就continue,同步消息会进行重试两次,异步消息和oneway消息不会进行重试,所以这两种模式在这种情况下就会丢消息。
注意:
broker在处理消息写入的时候出现page cache busy的时候,因为是在等待broker CommitLog的锁的,还没有获取锁成功,是还没有写入到PageCache的,所以更没有写入到磁盘中。
出现这个问题的解决办法:
出现这个问题就是broker处理message发送处理不过来,所以可以通过提升当前机器的性能或者再添加一个broker分走当前broker相应的message queue走。
网友评论