美文网首页
RocketMQ 基础

RocketMQ 基础

作者: Tinyspot | 来源:发表于2024-01-18 09:38 被阅读0次

    1. RocketMQ

    1.1 设计理念

    消息中间件的难题:如何保证消息一定能被消息消费者消费,并且保证只消费一次
    RocketMQ 的设计者给出的解决办法是不解决这个难题,而是退而求其次,只保证消息至少被消费一次,但不承诺消息不会被消费者多次消费,其消费的幂等由消费者实现

    1.2 启动顺序

    • 首先启动 NameServer,再启动 Broker 向 NameServer 注册(NameServer 对 Broker 进行心跳检测)
    • NameServer是RocketMQ的轻量级注册中心,负责存储和管理Broker集群以及Topic路由信息
    • Broker是RocketMQ的消息处理节点,负责接收生产者发送的消息并存储消息,以及响应消费者的消息读取请求

    2. 路由中心NameServer

    2.1 路由注册

    • (1) Broker发送心跳包
    • (2) NameServer处理心跳包

    3. Topic

    • Group 标识一类消息的生产者、消费者;一个分组对应一个主题
    • RocketMQ 基于订阅发布机制,一个Topic拥有多个消息队列,一个Broker为每一主题默认创建4个读队列4个写队列

    4. 消息发送

    • 同步发送:发送消息后,同步响应
    • 异步发送:发送消息后,异步响应
    • 单向发送:无返回值
    public void demo() throws MQClientException {
        DefaultMQProducer producer = new DefaultMQProducer("producer_group");
        producer.start();
    
        Message msg = new Message(topic, tag, byte[]);
    
        // 单向发送
        producer.sendOneway(msg);
    
        // 同步发送
        SendResult result = producer.send(msg);
    
        // 异步回调
        producer.send(msg, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                String msgId = sendResult.getMsgId;
            }
            @Override
            public void onException(Throwable e) {
                String msgId = sendResult.getMsgId;
            }
        });
    }
    

    5. 消息消费

    • 两种消费模式
      • 拉模式 (Pull Consumer)
      • 推模式 (Push Consumer)

    5.1 拉模式

    • 消费者(Consumer) 主动从 Broker (消息服务器) 中拉取消息,它会周期性地向 Broker 发送请求,询问是否有新的消息可以消费
    • 消费者通过 DefaultMQPullConsumer 实现拉取逻辑,它会定期或根据需要调用 pull() 方法从 Broker 获取消息队列中的消息

    5.2 推模式 (Push Consumer)

    • 在 RocketMQ 中,Push 模式实际上是基于 Pull 模式的一种封装优化,它并不是传统意义上的服务端主动推送消息给客户端。
    • 在推模式下,虽然消费者看起来像是被动接收消息,但实际上内部仍然是通过定期拉取的方式来实现的,只是对用户隐藏了这一细节
    • DefaultMQPushConsumer 作为推模式的消费者,在内部使用定时任务或者监听器机制自动执行拉取操作,对使用者而言更像是消息被推送过来
    public void consumer() throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("my_topic", "tag_test");
        // CONSUME_FROM_LAST_OFFSET(默认值):从最新的消息开始消费,即忽略之前的所有消息
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        // 批量消费的消息数量上限
        consumer.setConsumeMessageBatchMaxSize(1);
    
        // 消息事件监听
        consumer.registerMessageListener((MessageListenerConcurrently) (messages, context) -> {
            try {
                for (MessageExt message : messages) {
                    logger.info("consumer_message, topic={}, tags={}, msgId={}, properties={}, body={}",
                            message.getTopic(), message.getTags(), message.getMsgId(), message.getProperties(), new String(message.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            } catch (Exception e) {
                // 消息重试
                logger.error("consumer_message_exception_retry, topic=my_topic, tags=tag_test, exception={}", e.getMessage());
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        });
    
        consumer.start();
        logger.info("consumer_message_success");
    }
    

    相关文章

      网友评论

          本文标题:RocketMQ 基础

          本文链接:https://www.haomeiwen.com/subject/aairodtx.html