发送消息的几种模式:
- 同步发送:可靠性高,但延迟较高。
- 异步发送:提高性能,但需要额外处理回调。
- 单向发送:延迟最低,但没有消息确认。
- 顺序发送:保证消息顺序,但影响性能。
- 广播发送:每个消费者都消费消息,适用于广播场景。
- 事务消息:支持分布式事务,适合保证消息一致性。
springboot集成rocketmq的语法:
同步发送:
rocketMQTemplate.syncSend(topic, message)
异步发送:
rocketMQTemplate.asyncSend(topic, message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("Send success: " + sendResult);
}
@Override
public void onException(Throwable e) {
e.printStackTrace();
}
});
单向发送:
rocketMQTemplate.sendOneWay(topic, message);
顺序发送:
// 使用 MessageQueueSelector 来控制消息发送到某个队列,从而保证顺序性
rocketMQTemplate.send(topic, message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
// 根据某个参数决定消息发送到哪个队列
int index = Math.abs(msg.getKeys().hashCode()) % mqs.size();
return mqs.get(index);
}
});
广播发送:
rocketMQTemplate.setMessageModel(MessageModel.BROADCASTING);
事务消息:
// 发送事务消息
SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(topic, message, null);
tag的使用的坑:
使用相同的topic和group,使用不同的tag区分消息,会出现指定消费tagA的消息还会消费tagB的消息,
- 轮询消费
是由于消息队列的负载均衡机制 和 消费者组的共享消费 导致的。虽然消费者配置了不同的 tag,消息队列的分配不受 tag 影响,导致多个消费者可能从相同的消息队列中获取不同 tag 的消息。 - 避免轮询消费
可以通过调整消费者组配置、增加消息队列数量、确保每个消费者配置不同的 selectorExpression 来减少或避免轮询消费现象。
参考:
网友评论