消费端重试超过16次的消息,会进入死信队列,此类消息需要人工接入或是程序定时处理下未成功的数据。
之前需要到RocketMq-Console-Ng的死信队列主题下查看,不是很方便。
一般套路会思考是否要修改rockmq的相关处理的源码,修改记录数据源,但是这种对代码侵入性太大,不便于灵活维护。
经过思考决定采用如下aop机制,在数据库中再记录一遍数据。
@Aspect
@Component
public class DlLogAspect {
/** 日志打印 */
private Logger logger = LoggerFactory.getLogger(DlLogAspect.class);
/** rocketmq默认重试16次 */
private static final Integer MAXTIMES = Integer.valueOf("16");
/** 主键服务 */
@Autowired
private IIdService idService;
/** 日志记录服务 */
@Autowired
private IMessageErrorLogService messageErrorLogService;
/**
* 定义切点,捕获所有消息处理方法
* @param messageExt messageExt
*/
@Pointcut("execution(* org.apache.rocketmq.spring.core.RocketMQListener.onMessage(..)) && args(messageExt)")
public void dlMessageLog(MessageExt messageExt) { }
/**
* 后置通知
* @param messageExt messageExt
*/
@Before("dlMessageLog(messageExt)")
public void dlListener(MessageExt messageExt) {
try {
dlMessageLogCreate(messageExt);
} catch (Exception e) {
e.printStackTrace();
logger.error("死信队列记录出错" + e);
}
}
/**
* 死信消息创建
* @param message message
*/
private void dlMessageLogCreate(MessageExt message) {
int retryTimes = message.getReconsumeTimes();
// 打印重试次数,便于在ES中查看,及时发现错误
if (retryTimes > NumberConsts.INT_1) {
logger.error("topic={}, tags={}, msgId={}, body={} 的重试次数为{}", message.getTopic(), message.getTags(),
message.getMsgId(), new String(message.getBody()), retryTimes);
}
if (retryTimes >= MAXTIMES) {
//到达最大重试次数,将会进入死信队列,记录入库,以便再次消费
logger.error("消息重试次数达到最大值.消息内容为" + new String(message.getBody()));
MessageErrorLog messageErrorLog = new MessageErrorLog(idService.getUniqueId(), message.getTopic(),
message.getTags(), new String(message.getBody()));
// 插入数据库中,便于人工接入回溯问题
messageErrorLogService.insert(messageErrorLog);
}
}
}
网友评论