接下来我们配合着官方提供的demo,进行实际的消息发送学习,主要学习发送方式、发送参数的含义,以及发送中的一些问题。
消息发送的方式
- 可靠同步发送,需要等待服务器响应结果。
- 可靠异步发送,不等待服务器响应结果直接返回,当收到Broker的响应结果后调用SendCallback回调函数。
- 单向发送,不等待响应结果,不调用回调函数。
实战使用
在实际使用中,我们通常会选择可靠同步发送,因为我们快速的得到成功和失败的反馈。
业务场景:用户A下单创建订单,付款、完成订单
在这过程中,可能会产生三条消息,那在发送消息可能会因为负载均衡的策略,被分配到不同的消息队列中去
RocketMQ常用的两种平均分配算法
1. AllocateMessageQueueAveragely
平均分配,按照总数除以消费者个数进行,对每个消费者进行分配
2. AllocateMessageQueueAveragelyByCircle 轮流平均分配,按照消费者个数,进行轮询分配
所以为了保证局部顺序消息,只要保证每一组消息被顺序消费即可,我们需要考虑修改MessageQueueSelector方法,保证消息投递到同一个队列中。
// 实例化消息生产者Producer
org.apache.rocketmq.client.producer.DefaultMQProducer producer = new org.apache.rocketmq.client.producer.DefaultMQProducer("please_rename_unique_group_name");
// 设置NameServer的地址
producer.setNamesrvAddr("xxx:9876");
//设置重试次数
producer.setRetryTimesWhenSendFailed(3);
producer.setSendMsgTimeout(5000);
private static SendResult sendMsg(DefaultMQProducer producer, Order order) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
Message msg = null;
try {
msg = new Message("orderTest","TagA", order.getOrderNo(),
JSON.toJSONString(order).getBytes(RemotingHelper.DEFAULT_CHARSET));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
// 发送消息到一个Broker
SendResult sendResult = producer.send(msg, (list, message, arg) -> {
if (message == null || list.isEmpty()) {
return null;
}
// 取模
int index = Math.abs(arg.hashCode()) % list.size();
//模拟超时
//if (order.getId() == 1L){
try {
Thread.sleep(6000);
} catch (Exception e) {
}
}//
return list.get(Math.max(index, 0));
}, order.getId());
String result = JSONObject.toJSONString(sendResult);
System.out.println(result);
return sendResult;
}
- List mqs:消息要发送的Topic下所有的分区
- Message msg:消息对象
- 额外的参数:用户可以传递自己的参数
发送结果
数据按照我们修改的MessageQueueSelector,进行了队列的选择,正式我们想要的结果,这样我们就可以按照队列的顺序消费。
发送结果参数
messageQueue对象
String topic = msg.getTopic();
if (StringUtils.isNotEmpty(this.clientConfig.getNamespace())) {
topic = NamespaceUtil.withoutNamespace(topic, this.clientConfig.getNamespace());
}
MessageQueue messageQueue = new MessageQueue(topic, brokerName, responseHeader.getQueueId());
主要有brokerName,topic和QueueId组成,RocketMQ支持顺序投递,利用MessageQueueSelector,将相同的Key投递到同一个队列中,保证局部顺序。
全局msgId生成规则
规则:IP+进程PID+类加载器HashCode+自增
初始化值
static {
byte[] ip;
try {
ip = UtilAll.getIP();
} catch (Exception e) {
ip = createFakeIP();
}
LEN = ip.length + 2 + 4 + 4 + 2;
// 分配大小 ip长度+2+4
ByteBuffer tempBuffer = ByteBuffer.allocate(ip.length + 2 + 4);
tempBuffer.put(ip);
// 进程PID
tempBuffer.putShort((short) UtilAll.getPid());
// 类加载器hashCode
tempBuffer.putInt(MessageClientIDSetter.class.getClassLoader().hashCode());
FIX_STRING = UtilAll.bytes2string(tempBuffer.array()).toCharArray();
setStartTime(System.currentTimeMillis());
COUNTER = new AtomicInteger(0);
}
构建UniqID
public static String createUniqID() {
char[] sb = new char[LEN * 2];
System.arraycopy(FIX_STRING, 0, sb, 0, FIX_STRING.length);
long current = System.currentTimeMillis();
if (current >= nextStartTime) {
setStartTime(current);
}
int diff = (int)(current - startTime);
if (diff < 0 && diff > -1000_000) {
// may cause by NTP
diff = 0;
}
int pos = FIX_STRING.length;
UtilAll.writeInt(sb, pos, diff);
pos += 8;
UtilAll.writeShort(sb, pos, COUNTER.getAndIncrement());
return new String(sb);
}
获取MsgId
String uniqMsgId = MessageClientIDSetter.getUniqID(msg);
if (msg instanceof MessageBatch) {
StringBuilder sb = new StringBuilder();
for (Message message : (MessageBatch) msg) {
sb.append(sb.length() == 0 ? "" : ",").append(MessageClientIDSetter.getUniqID(message));
}
uniqMsgId = sb.toString();
}
queueOffset Key中为剩余的消费偏移量
responseHeader.getQueueOffset();
消息发送参数的作用
Keys
消息发送的时候设置Keys为订单编号,我们可以在RocketMQ-Console里面查询消息
Tags标签
标签的作用 :对Topic中的消息进行过滤,选择处理
下面我们做一个Test消费的测试
public static void main(String[] args) throws Exception {
// 实例化消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
// 设置NameServer的地址
consumer.setNamesrvAddr("xxx:9876");
// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
consumer.subscribe("TopicTest", "");
//设置 负载均衡 | 广播模式 默认是负载均衡
consumer.setMessageModel(MessageModel.CLUSTERING);
/**
* 消息顺序:
* 全局消息顺序
* 局部消息顺序
*/
//设置回调函数处理消息
consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), list);
list.forEach(msg ->{
System.out.println("Receive message[msgId=" + msg.getMsgId() + "] " + (System.currentTimeMillis() - msg.getStoreTimestamp()) + "ms later");
});
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
}
在上面发送消息的时候Tags参数设定为TagA,在消费端将tags参数设置为空,则拉取不到消息
使用MessageQueueSelector,报错却没有重发消息,怎么办?
if (order.getId() == 1L){
try {
Thread.sleep(6000);
} catch (Exception e) {
}
}
//设置重试次数
producer.setRetryTimesWhenSendFailed(3);
producer.setSendMsgTimeout(5000);
上面的代码中,进行了重试的设置,同时也在代码中设置了超时的场景
代码很快就抛出了异常信息,发现我们设置的重试设置没有作用,没有进行消息重发,也没有对队列阻塞
缺陷:
-
设置的重试没有生效,发送顺序消息无法利用集群的Failover特性,因为不能更换MessageQueue进行重试
-
因为发送的路由策略导致的热点问题,可能某一些MessageQueue的数据量特别大
-
没有消息重发,也没有队列阻塞
解决方案
典型的就是消息发送失败后存在数据库中,然后定时调度,最终将消息发送到 MQ
但是我觉得这个方案还是存在缺陷的,重新发送消息后,消息的顺序性就发生变动了,这个问题需要思考
总结
今天这篇文章,主要介绍了同步发送顺序消息,以及发送消息中的主要参数的作用和生成规则,
最后的问题,在后面的学习中在进行思考,来完善使用MessageQueueSelector,报错却没有重发消息的问题。
作者:叫我小郭_
链接:https://juejin.cn/post/7102776605528817671
来源:稀土掘金
网友评论