目录
1:RocketMQ 生产和消费消息重试及处理 (重要 重要 重要)
设置重试次数(报错消息存储)
消费端去重(消息唯一性)
测试集群消费与广播消费的重试
顺序消息
2:RocketMQ生产者之MessageQueueSelector实战(指定队列)(顺序消息前置学习)
3:讲解顺序消息(发送与消费)在电商 和 证券系统中应用场景
4:RocketMQ顺序消息概念
5:RocketMQ顺序消息发送 实战代码
6:RocketMQ顺序消息消费 实战代码
1:RocketMQ生产和消费消息重试及处理 (保证消息的唯一性)
生产者Producer重试(异步和SendOneWay下配置无效)
消息重投(保证数据的高可靠性),本身内部支持重试,默认次数是2,
如果网络情况比较差,或者跨集群则建改多几次
消费端重试
原因:消息处理异常、broker端到consumer端各种问题
如网络原因闪断, 消费处理失败,ACK返回失败等等问题。
注意 :
重试的间隔时间(默认重试16次):
默认:messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
1:设置重试次数
2:消费端去重:一条消息无论重试多少次,这些重试消息的Message ID, key不会改变。
3:消费重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试
public ConsumerReconsume() throws MQClientException {
DefaultMQPushConsumer defaultMQPushConsumer = creatDefaultMQPushConsumer();
// 确认问题 3:消费重试只针对集群消费方式生效 广播方式不提供失败重试特性,即消费失败后,失败消息不再重试
// defaultMQPushConsumer.setMessageModel(MessageModel.BROADCASTING);
//监听器
defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently) (messages, context) -> {
MessageExt msg = messages.get(0);
int times = msg.getReconsumeTimes();
System.out.println("重试次数:"+times);
//判断该msgId是否已经在redis中
// 解决问题 2:消费端去重:一条消息无论重试多少次,这些重试消息的Message ID, key不会改变。
//遗留问题 假如在下面逻辑成功之前 有另一个一样msgId的消息进入 那么该如何解决
//考虑再添加一个Synchronized的锁 但这个性能消耗大 同时概率极低 因此不推荐
try {
log.info(" Receive New Messages: {},{} ",Thread.currentThread().getName(), new String(messages.get(0).getBody()));
String topic = msg.getTopic();
String tags = msg.getTags();
String keys = msg.getKeys();
String body = new String(msg.getBody(), "utf-8");
log.info("topic=" + topic + ", tags=" + tags + ", keys = " + keys + ", msg = " + body);
// 使消费一定失败
if(true){
throw new Exception();
}
System.out.println("重试时的megId:"+ msg.getMsgId());
//解决问题 2: 可以把这个msgId存放到redis中设置对应的过期时间 如果这个Id已经存在则不需要重复进入该逻辑
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
log.info("rocket消费者异常:{}",e.getMessage(),e);
//times的初始值为0 等于等于2就是一共尝试了3次
//解决问题 1:设置重试次数
if (times>=2){
System.out.println("记录数据库 通知运维人员");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
System.out.println("重试次数异常:"+times);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
log.info("consumer start");
defaultMQPushConsumer.start();
}
2:RocketMQ生产者之MessageQueueSelector实战(指定队列)(顺序消息前置学习)
简介:生产消息使用MessageQueueSelector投递到Topic下指定的queue
默认topic下的queue数量是4 可以进行配置
应用场景:配合 顺序消息 分摊负载 的使用
顺序消息:消息是有序的(同步发送)
分摊负载:可以自定义规则 选择发到topic的队列
某种事件的消息 发送到 队列里面的消息特别多 导致该topic里面的4个队列都不可用
该事件可以特殊选的某个队列 那么其他3个队列就可用
注意:
1:支持同步,异步发送:
2:选择的queue数量必须小于配置的,否则会出错
//顺序发送可以根据一个订单号进行取模 然后该订单选择对应的队列
//与业务是强耦合的 需要根据业务做具体的分析
//public SendResult send(Message msg, MessageQueueSelector selector, Object arg)
//同步发送
//选择对应的队列
SendResult sendResultSyn = defaultMQProducer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> messageQueueList, Message message, Object arg) {
//2:选择使用该topic下的哪个 queueId 该Id需要在 arg获取
int queueNum = Integer.parseInt(arg.toString());
return messageQueueList.get(queueNum);
}
// 1:arg 0 为选择队列 queueId =0
},0);
log.info("RocketMQ order 消息同步发送 结果:{}", JSON.toJSONString(sendResultSyn));
//public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback)
//异步发送
//选择对应的队列
defaultMQProducer.send(message, (messageQueueList, message1, arg) -> {
int queueNum = Integer.parseInt(arg.toString());
return messageQueueList.get(queueNum);
}, 1, new SendCallback() {
@Override
public void onSuccess(SendResult sendResultAsyn) {
log.info("RocketMQ order 消息异步发送 结果:{}", JSON.toJSONString(sendResultAsyn));
}
@Override
public void onException(Throwable throwable) {
}
});
3:讲解顺序消息(发送与消费)在电商 和 证券系统中应用场景
简介:基础介绍顺序消息和对应可以使用的场景,订单系统,
什么是顺序消息:消息的生产和消费顺序一致
全局顺序:topic下面全部消息都要有序(少用)(该topic下只有一个队列)(了解即可)
性能要求不高,所有的消息严格按照FIFO原则进行消息发布和消费的场景,吞吐量不够.
在证券处理中,以人民币兑换美元为例子,在价格相同的情况下,
有多个出价者,先出价者优先处理, 则FIFO的方式进行发布和消费
注意:也可以使用局部顺序配合其他方案处理
局部顺序:只要保证一组消息被顺序消费即可(RocketMQ使用)
电商的订单创建,同一个订单相关的创建订单消息、订单支付消息、订单交易成功、订单退款消息
(阿里巴巴集团内部电商系统均使用局部顺序消息,既保证业务的顺序,同时又能保证 业业务的高性能
顺序发布:对于指定的一个Topic,客户端将按照一定的先后顺序发送消息
顺序消费:对于指定的一个Topic,按照一定的先后顺序接收消息,即先发送的消息一定会先被客 户端接收到。
注意:
顺序消息暂不支持广播模式
顺序消息不支持异步发送方式,否则将无法严格保证顺序
4:RocketMQ顺序消息 概念
生产端保证发送消息有序,且发送到同一个Topic的同个queue里面,RocketMQ的确是能保证FIFO
例子:订单的顺序流程是:
创建、付款、物流、完成,订单号相同的消息会被先后发送到同 —个队列中,
根据MessageQueueSelector里面自定义策略,
根据同个业务id放置到同个queue里面,如订 单号取模运算再放到selector中,同一个模的值都会投递到同一条queue
消费端要在保证消费同个topic里的同个队列,
不应该用MessageListenerConcurrently 并发使用多线程去消费
应该使用MessageListenerOrderly 自带单线程消费消息,消费端分配到的queue数量是固定的,
消费者集群会锁住当前正在消费的队列集合的消息,所以会保证顺序消费。
MessageListenerOrderly
1:Consumer会平均分配Queue的数量
2:一个Consumer需要先处理完一个Consumer Queue 才能切换到其他的Consumer Queue
3 :为每个Consumer Quene加个锁,消费每个消息前,需要获得这个消息所在的Queue的锁,
这样同个时间,同个Queue的消息不被其他的Consumer Quene并发消费,但是不同Queue的消息可以并发处理
锁的机制类似于ConcurrentHashMap
小技巧:启动两个节点:
然后修改yml里面的端口 再次启动发现同一个服务可以启动两次
5:RocketMQ顺序消息发送 实战代码
private void sendOrderMessage(RocketEvent<?> rocketEvent) {
//主题
String topic = "pay_Sequence_topic";
Message message ;
try {
if (rocketEvent.getTag() != null && !"".equals(rocketEvent.getTag()) ) {
message = new Message(topic,rocketEvent.getTag(), JSON.toJSONString(rocketEvent).getBytes());
}else {
message = new Message(topic, JSON.toJSONString(rocketEvent).getBytes());
}
log.info("RocketMQ order 消息发送:{}", JSON.toJSONString(rocketEvent));
//转换为Order
Order order = JSONObject.parseObject(JSONObject.toJSONString(rocketEvent.getData()), Order.class);
//顺序发送可以根据一个订单号进行取模 然后该订单选择对应的队列
SendResult sendResultSyn = defaultMQProducer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> messageQueueList, Message message, Object arg) {
Long orderId = (Long) arg;
//根据 订单Id 取模后循序投递到队列
long queueNum = orderId%messageQueueList.size();
return messageQueueList.get((int) queueNum);
}
// 取订单Id作为入参
},order.getOrderId());
log.info("RocketMQ order 消息同步发送 结果:{}", JSON.toJSONString(sendResultSyn));
} catch (Exception e) {
log.error("RocketMQ order 消息发送异常: " + e.getMessage(), e);
}
}
6:RocketMQ顺序消息消费 实战代码
public ConsumerConsumeSequence() throws MQClientException {
//创建DefaultMQPushConsumer
DefaultMQPushConsumer defaultMQPushConsumer = creatDefaultMQPushConsumer();
//MessageListenerOrderly用于顺序消费
//该消费者在处理一个 topic下的 某个队列的时候 其他消费者不能再去处理该队列
//监听器
defaultMQPushConsumer.registerMessageListener((MessageListenerOrderly) (messages, context) -> {
try {
Message msg = messages.get(0);
log.info(" Receive New Messages: {},{} ",Thread.currentThread().getName(), new String(messages.get(0).getBody()));
String topic = msg.getTopic();
String tags = msg.getTags();
String keys = msg.getKeys();
String body = new String(msg.getBody(), "utf-8");
log.info("topic=" + topic + ", tags=" + tags + ", keys = " + keys + ", msg = " + body);
return ConsumeOrderlyStatus.SUCCESS;
} catch (UnsupportedEncodingException e) {
log.info("rocket消费者异常:{}",e.getMessage(),e);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
});
log.info("consumer start");
defaultMQPushConsumer.start();
}
项目连接
请配合项目代码食用效果更佳:
项目地址:
https://github.com/hesuijin/hesuijin-study-project
Git下载地址:
https://github.com.cnpmjs.org/hesuijin/hesuijin-study-project.git
rocketmq-module项目模块下
注意:因为需要修改相应配置 相关测试的代码
生产者代码主要在 单元测试
消费者代码主要在 项目代码 com.example.rocketmq.demo.consumer.Junit包下
网友评论