1. RocketMQ
1.1 设计理念
消息中间件的难题:如何保证消息一定能被消息消费者消费,并且保证只消费一次
RocketMQ 的设计者给出的解决办法是不解决这个难题,而是退而求其次,只保证消息至少被消费一次,但不承诺消息不会被消费者多次消费,其消费的幂等由消费者实现
1.2 启动顺序
-
首先启动 NameServer,再启动 Broker 向 NameServer 注册
(NameServer 对 Broker 进行心跳检测) - NameServer是RocketMQ的轻量级注册中心,负责存储和管理Broker集群以及Topic路由信息
- Broker是RocketMQ的消息处理节点,负责接收生产者发送的消息并存储消息,以及响应消费者的消息读取请求
2. 路由中心NameServer
2.1 路由注册
- (1) Broker发送心跳包
- (2) NameServer处理心跳包
3. Topic
- Group 标识一类消息的生产者、消费者;一个分组对应一个主题
- RocketMQ 基于订阅发布机制,一个Topic拥有多个消息队列,一个Broker为每一主题默认创建4个读队列4个写队列
4. 消息发送
- 同步发送:发送消息后,同步响应
- 异步发送:发送消息后,异步响应
- 单向发送:无返回值
public void demo() throws MQClientException {
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
producer.start();
Message msg = new Message(topic, tag, byte[]);
// 单向发送
producer.sendOneway(msg);
// 同步发送
SendResult result = producer.send(msg);
// 异步回调
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
String msgId = sendResult.getMsgId;
}
@Override
public void onException(Throwable e) {
String msgId = sendResult.getMsgId;
}
});
}
5. 消息消费
- 两种消费模式
- 拉模式 (Pull Consumer)
- 推模式 (Push Consumer)
5.1 拉模式
- 消费者(Consumer) 主动从 Broker (消息服务器) 中拉取消息,它会周期性地向 Broker 发送请求,询问是否有新的消息可以消费
- 消费者通过
DefaultMQPullConsumer
实现拉取逻辑,它会定期或根据需要调用pull()
方法从 Broker 获取消息队列中的消息
5.2 推模式 (Push Consumer)
- 在 RocketMQ 中,Push 模式实际上是基于 Pull 模式的一种封装优化,它并不是传统意义上的服务端主动推送消息给客户端。
- 在推模式下,虽然消费者看起来像是被动接收消息,但实际上内部仍然是通过定期拉取的方式来实现的,只是对用户隐藏了这一细节
-
DefaultMQPushConsumer
作为推模式的消费者,在内部使用定时任务或者监听器机制自动执行拉取操作,对使用者而言更像是消息被推送过来
public void consumer() throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("my_topic", "tag_test");
// CONSUME_FROM_LAST_OFFSET(默认值):从最新的消息开始消费,即忽略之前的所有消息
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
// 批量消费的消息数量上限
consumer.setConsumeMessageBatchMaxSize(1);
// 消息事件监听
consumer.registerMessageListener((MessageListenerConcurrently) (messages, context) -> {
try {
for (MessageExt message : messages) {
logger.info("consumer_message, topic={}, tags={}, msgId={}, properties={}, body={}",
message.getTopic(), message.getTags(), message.getMsgId(), message.getProperties(), new String(message.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
// 消息重试
logger.error("consumer_message_exception_retry, topic=my_topic, tags=tag_test, exception={}", e.getMessage());
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
consumer.start();
logger.info("consumer_message_success");
}
网友评论