前言
顺序消息:指的是消息的消费顺序和生产顺序相同
全局顺序:在某个topic下,所有的消息都要保证顺序
局部顺序:只要保证每一组消息被顺序消费即可
Rocketmq顺序消息属于局部顺序(分区顺序)
image.png1. 普通消息与顺序消息
public class Producer {
public static void main(String[] args) {
try {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);
producer.start();
//顺序发送100条编号为0到99的,orderId为1 的消息
new Thread(() -> {
Integer orderId = 1;
sendMessage(producer, orderId);
}).start();
//顺序发送100条编号为0到99的,orderId为2 的消息
new Thread(() -> {
Integer orderId = 2;
sendMessage(producer, orderId);
}).start();
//sleep 30秒让消息都发送成功再关闭
Thread.sleep(1000*30);
producer.shutdown();
} catch (MQClientException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private static void sendMessage(MQProducer producer, Integer orderId) {
for (int i = 0; i < 100; i++) {
try {
Message msg =
new Message("TopicTestjjj", "TagA", i + "",
(orderId + "").getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
System.out.println("message send,orderId:"+orderId +" i:" +i );
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
Normal Consumer Demo
模拟了一个消费者中多线程并行消费消息的情况,使用的消费监听器为MessageListenerConcurrently
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
consumer.setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTestjjj", "*");
//单个消费者中多线程并行消费
consumer.setConsumeThreadMin(3);
consumer.setConsumeThreadMin(6);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
// System.out.println("收到消息," + new String(msg.getBody()));
System.out.println("queueId:"+msg.getQueueId()+",orderId:"+new String(msg.getBody())+",i:"+msg.getKeys());
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
public class Consumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
consumer.setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTestjjj", "TagA");
//消费者并行消费
consumer.setConsumeThreadMin(3);
consumer.setConsumeThreadMin(6);
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
// context.setAutoCommit(false);
for (MessageExt msg : msgs) {
System.out.println("queueId:"+msg.getQueueId()+",orderId:"+new String(msg.getBody())+",i:"+msg.getKeys());
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
看下结果输出,如图,同一个orderId下,编号为27的消息先于编号为25的消息被消费,不是正确的顺序消费,即普通的并行消息消费,无法保证消息消费的顺序性
image.png
public class Consumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
consumer.setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTestjjj", "TagA");
//消费者并行消费
consumer.setConsumeThreadMin(3);
consumer.setConsumeThreadMin(6);
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
// context.setAutoCommit(false);
for (MessageExt msg : msgs) {
System.out.println("queueId:"+msg.getQueueId()+",orderId:"+new String(msg.getBody())+",i:"+msg.getKeys());
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
image.png
2.RocketMQ顺序消费实战整合应用
高并发下的用户下单订单场景(比如下单时候要求顺序消费消息,除了订单消息以外,还有扣库存消息/折扣消息等等)
不同的商家对应同一主题下的不同的队列
比如有 1000个商家 一家店家有30万订单的顺序消费,其他999商家需要等这么???
解决:设置1个topic,建立1000多个消费队列,一个线程连一个messageQueue ,当然topic 可以根据城市、商家重要程度、继续划分多个topic
这样做的好处就是并行处理操作的同时,保障顺序操作
3. 顺序消费原理
image.png- 创建消息拉取任务时,消息客户端向broker端申请锁定MessageQueue,使得一个MessageQueue同一个时刻只能被一个消费客户端消费
- 消息消费时,多线程针对同一个消息队列的消费先尝试使用synchronized申请独占锁,加锁成功才能进行消费,使得一个MessageQueue同一个时刻只能被一个消费客户端中一个线程消费
4. 顺序消息重试机制
在使用顺序消息时,一定要注意其异常情况的出现,对于顺序消息,当消费者消费消息失败后,消息队列 RocketMQ 版会自动不断地进行消息重试(每次间隔时间为 1 秒),重试最大值是Integer.MAX_VALUE
.这时,应用会出现消息消费被阻塞的情况。因此,建议您使用顺序消息时,务必保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的发生
重要的事再强调一次:在使用顺序消息时,一定要注意其异常情况的出现!
网友评论