本篇文章代码基于:SpringBoot整合RocketMQhttps://www.jianshu.com/p/0028969d5e17
1、生产者核心配置
compressMsgBodyOverHowmuch :消息超过默认字节4096后进行压缩
retryTimesWhenSendFailed : 失败重发次数
maxMessageSize : 最大消息配置,默认128k
topicQueueNums : 主题下面的队列数量,默认是4
autoCreateTopicEnable : 是否自动创建主题Topic, 开发建议为true,生产要为false
defaultTopicQueueNums : 自动创建服务器不存在的topic,默认创建的队列数
autoCreateSubscriptionGroup: 是否允许 Broker 自动创建订阅组,建议线下开发开启,线上关闭
brokerClusterName : 集群名称
brokerId : 0表示Master主节点 大于0表示从节点
brokerIP1 : Broker服务地址
brokerRole : broker角色 ASYNC_MASTER/ SYNC_MASTER/ SLAVE
deleteWhen : 每天执行删除过期文件的时间,默认每天凌晨4点
flushDiskType :刷盘策略, 默认为 ASYNC_FLUSH(异步刷盘), 另外是SYNC_FLUSH(同步刷盘)
listenPort : Broker监听的端口号
mapedFileSizeCommitLog : 单个conmmitlog文件大小,默认是1GB
mapedFileSizeConsumeQueue:ConsumeQueue每个文件默认存30W条,可以根据项目调整
storePathRootDir : 存储消息以及一些配置信息的根目录 默认为用户的 ${HOME}/store
storePathCommitLog:commitlog存储目录默认为${storePathRootDir}/commitlog
storePathIndex: 消息索引存储路径
syncFlushTimeout : 同步刷盘超时时间
diskMaxUsedSpaceRatio : 检测可用的磁盘空间大小,超过后会写入报错
2、消息发送状态
RocketMQ源码中,有一个SendStatus枚举类,定义了消息投递的状态
package org.apache.rocketmq.client.producer;
public enum SendStatus {
SEND_OK,
FLUSH_DISK_TIMEOUT,
FLUSH_SLAVE_TIMEOUT,
SLAVE_NOT_AVAILABLE,
}
FLUSH_DISK_TIMEOUT
没有在规定时间内完成刷盘 (刷盘策略需要为SYNC_FLUSH 才会出这个错误)
FLUSH_SLAVE_TIMEOUT
主从模式下,broker是SYNC_MASTER, 没有在规定时间内完成主从同步
SLAVE_NOT_AVAILABLE
从模式下,broker是SYNC_MASTER, 但是没有找到被配置成Slave的Broker
SEND_OK
发送成功,没有发生上面的三种问题
3、生产和消费消息重试及处理
3.1、生产者Producer重试
消息重投(保证数据的高可靠性),本身内部支持重试,默认次数是2
//生产者投递消息重复次数
producer.setRetryTimesWhenSendFailed(3);
3.2、消费端重试
原因:消息处理异常、broker端到consumer端各种问题,如网络原因闪断,消费处理失败,ACK返回失败等等问题。
package com.ljessie.rocketmqdemo.jms;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.springframework.stereotype.Component;
import java.io.UnsupportedEncodingException;
import java.util.List;
@Component
public class PayConsumer {
private DefaultMQPushConsumer consumer;
public PayConsumer() throws MQClientException {
consumer = new DefaultMQPushConsumer(JMSConfig.consumerGroup);
consumer.setNamesrvAddr(JMSConfig.nameServer);
//从队首开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//订阅主题,第二个参数*代表注册这个主题下面的所有tag
consumer.subscribe(JMSConfig.TOPIC,"*");
//默认是集群模式,可以更改为广播,更改为广播之后,不支持重试
// consumer.setMessageModel(MessageModel.BROADCASTING);
//配置消费监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
MessageExt message = list.get(0);
int retryTimes = message.getReconsumeTimes();
try {
System.out.println("重试次数:"+retryTimes);
if(1==1){
throw new Exception();
}
System.out.println("Consumer消费成功:"+new String(message.getBody(),"utf-8"));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
System.out.println("消费异常");
//如果重复次数超过两次不成功,则人工介入。广播方式不提供重试机制。
if(retryTimes >= 2){
System.out.println("复次数超过两次不成功,人工介入");
//告诉Broker消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
});
consumer.start();
}
}
控制台打印:
java.lang.Exception
重试次数:0
at com.ljessie.rocketmqdemo.jms.PayConsumer$1.consumeMessage(PayConsumer.java:40)
消费异常
at org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService$ConsumeRequest.run(ConsumeMessageConcurrentlyService.java:417)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
重试次数:1
java.lang.Exception
消费异常
at com.ljessie.rocketmqdemo.jms.PayConsumer$1.consumeMessage(PayConsumer.java:40)
at org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService$ConsumeRequest.run(ConsumeMessageConcurrentlyService.java:417)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
重试次数:2
消费异常
复次数超过两次不成功,人工介入
4、异步发送消息和回调
异步发送:不会重试,发送总次数等于1。
在PayController添加下面的方法:
/**
* 异步发送消息
* @param txt
* @return
*/
@RequestMapping("async")
public Object async(String txt){
Message message = new Message(JMSConfig.TOPIC,"tag_async",("Hello Async RocketMQ:"+txt).getBytes());
//设置延迟消息
// message.setDelayTimeLevel(2);
try {
payProducer.getProducer().send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("Producer发送成功:"+sendResult);
}
@Override
public void onException(Throwable throwable) {
//补偿机制,根据业务需求,设置是否重试
}
});
} catch (MQClientException e) {
e.printStackTrace();
} catch (RemotingException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
5、OneWay发送消息及多种场景对比
SYNC :同步
应用场景:重要通知邮件、报名短信通知、营销短信系统等
ASYNC :异步
应用场景:对RT时间敏感,可以支持更高的并发,回调成功触发相对应的业务,比如注册成功后通知积分系统发放优惠券
ONEWAY : 无需要等待响应
官方文档:https://rocketmq.apache.org/docs/simple-example/
使用场景:主要是日志收集,适用于某些耗时非常短,但对可靠性要求并不高的场景, 也就是LogServer, 只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。
6、延迟消息
Producer 将消息发送到消息队列 RocketMQ 服务端,但并不期望这条消息立马投递,而是推迟到在当前时间点之后的某一个时间投递到 Consumer 进行消费,该消息即定时消息,目前支持固定精度的消息。
延迟消息的级别 MessageStoreConfig.java 属性 messageDelayLevel
"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
设置第2个级别,会立刻发送成功,但是在5秒后,Consumer才会消费此消息。
message.setDelayTimeLevel(2);
使用场景
通过消息触发一些定时任务,比如在某一固定时间点向用户发送提醒消息
消息生产和消费有时间窗口要求:比如在交易中超时未支付关闭订单的场景,在订单创建时会发送一条延时消息。这条消息将会在 30 分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。 如支付未完成,则关闭订单。如已完成支付则忽略。
7、指定消息队列MessageQueueSelector
生产消息使用MessageQueueSelector投递到Topic下指定的queue。
PayController添加下面代码
/**
* 指定消息队列
* @return
*/
@RequestMapping("queue_selector")
public Object queueSelector(String text) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
Message message = new Message(JMSConfig.TOPIC,"tag_msg",("Hello MsgQueueSelector RocketMQ:"+text).getBytes());
//第三个参数指定第几个队列
SendResult sendResult = payProducer.getProducer().send(message, new MessageQueueSelector() {
/**
*第三个参数Object,就是send()方法里面的第三个参数2
*/
@Override
public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
int queueNum = Integer.valueOf(o.toString());
return list.get(queueNum);
}
},2);
System.out.println("Producer发送成功:"+sendResult);
return sendResult;
}
访问:http://localhost:8080/api/queue_selector?text=77777
控制台打印:
Producer发送成功:SendResult [sendStatus=SEND_OK, msgId=C0A80A67677C18B4AAC2831E75F50000, offsetMsgId=C0A80A6900002A9F000000000002E758, messageQueue=MessageQueue [topic=JESSIE_PAY_TEST_TOPIC, brokerName=broker-a, queueId=2], queueOffset=1]
重试次数:0
Consumer消费成功:Hello MsgQueueSelector RocketMQ:77777
queueId = 2,消息投递到第三个队列里面去了。
指定消息队列.jpg
控制台里面,查看Topic的status,第三个队列中的maxOffset加了1。
8、顺序消息
顺序消息:消息的生产和消费顺序一致。
全局顺序:topic下面全部消息都要有序(少用)
性能要求不高,所有的消息严格按照 FIFO 原则进行消息发布和消费的场景,并行度成为消息系统的瓶颈, 吞吐量不够.
在证券处理中,以人民币兑换美元为例子,在价格相同的情况下,先出价者优先处理,则可以通过全局顺序的方式按照 FIFO 的方式进行发布和消费
局部顺序:只要保证一组消息被顺序消费即可(RocketMQ使用)
性能要求高
电商的订单创建,同一个订单相关的创建订单消息、订单支付消息、订单退款消息、订单物流消息、订单交易成功消息 都会按照先后顺序来发布和消费。
顺序发布:对于指定的一个 Topic,客户端将按照一定的先后顺序发送消息
顺序消费:对于指定的一个 Topic,按照一定的先后顺序接收消息,即先发送的消息一定会先被客户端接收到。
注意:
顺序消息暂不支持广播模式
顺序消息不支持异步发送方式,否则将无法严格保证顺序
8.1、生产者
生产端保证发送消息有序,且发送到同一个Topic的同个queue里面
例子:订单的顺序流程是:创建、付款、物流、完成,订单号相同的消息会被先后发送到同一个队列中
ProductOrder 模拟订单
package com.ljessie.rocketmqdemo.domain;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
/**
* 简写订单
*/
public class ProductOrder implements Serializable {
private long orderId;
private String type;
public ProductOrder(){}
public ProductOrder(long orderId, String type) {
this.orderId = orderId;
this.type = type;
}
public long getOrderId() {
return orderId;
}
public void setOrderId(long orderId) {
this.orderId = orderId;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
/**
* 创建模拟数据
* @return
*/
public static List<ProductOrder> getProductOrderList(){
List<ProductOrder> list = new ArrayList<>();
list.add(new ProductOrder(1,"创建订单"));
list.add(new ProductOrder(2,"创建订单"));
list.add(new ProductOrder(1,"支付订单"));
list.add(new ProductOrder(2,"支付订单"));
list.add(new ProductOrder(3,"创建订单"));
list.add(new ProductOrder(1,"完成订单"));
list.add(new ProductOrder(3,"支付订单"));
list.add(new ProductOrder(2,"完成订单"));
list.add(new ProductOrder(3,"完成订单"));
return list;
}
@Override
public String toString() {
return "ProductOrder{" +
"orderId=" + orderId +
", type='" + type + '\'' +
'}';
}
}
PayController添加代码
/**
* 把同一个订单ID的不同消息,投放到相同的消息队列里面
* @return
* @throws InterruptedException
* @throws RemotingException
* @throws MQClientException
* @throws MQBrokerException
*/
@RequestMapping("send_msg_orderly")
public Object sendByOrder() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
List<ProductOrder> productOrders = ProductOrder.getProductOrderList();
List<SendResult> sendResults = new ArrayList<>();
for(int i = 0; i<productOrders.size();i++){
ProductOrder order = productOrders.get(i);
Message message = new Message(JMSConfig.ORDERLY_TOPIC,"",
order.getOrderId()+"",order.toString().getBytes());
SendResult sendResult = payProducer.getProducer().send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
long id = (long) o;
int index = (int) (id % list.size());
return list.get(index);
}
},order.getOrderId());
sendResults.add(sendResult);
System.out.println("prodoctOrder:"+order.toString()+"----->msgQueue:"+sendResult.getMessageQueue());
}
return sendResults;
}
8.2、消费者
package com.ljessie.rocketmqdemo.jms;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;
import java.io.UnsupportedEncodingException;
import java.util.List;
@Component
public class PayOrderlyConsumer {
private DefaultMQPushConsumer consumer;
public PayOrderlyConsumer() throws MQClientException {
consumer = new DefaultMQPushConsumer(JMSConfig.consumerOrdlerlyGroup);
consumer.setNamesrvAddr(JMSConfig.nameServer);
//消费地点,从最后一个开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
//订阅主题
consumer.subscribe(JMSConfig.ORDERLY_TOPIC,"*");
//配置消费监听器
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeConcurrentlyContext) {
MessageExt message = list.get(0);
int retryTimes = message.getReconsumeTimes();
try {
System.out.println("Consumer消费成功:"+new String(message.getBody(),"utf-8"));
} catch (Exception e) {
System.out.println("重试次数:"+retryTimes);
//如果重复次数超过两次不成功,则人工介入。广播方式不提供重试机制。
if(retryTimes >= 2){
//告诉Broker消费成功
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
e.printStackTrace();
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
}
}
网友评论