美文网首页
RocketMQ入门实战

RocketMQ入门实战

作者: 宏势 | 来源:发表于2023-02-02 21:05 被阅读0次

消息中间件三大特性:异步、削峰、解耦

一、 安装部署

1.安装

官网下载,解压后直接运行(单Master)

#启动namesrv
nohup sh bin/mqnamesrv &
#查看日志
tail -f ~/logs/rocketmqlogs/namesrv.log

#先启动broker
nohup sh bin/mqbroker -n localhost:9876 &
查看日志
tail -f ~/logs/rocketmqlogs/broker.log 

注意Broker的IP设置,手动修改属性 echo "brokerIP1=主机外网IP" > broker.conf
指定配置文件启动:nohup sh bin/mqbroker -n ${namesrvIp}:9876 -c conf/broker.conf
参考:https://github.com/apache/rocketmq#readme

2.部署模型

部署模型图

NameServer

NameServer是一个简单的 Topic 路由注册中心,支持 Topic、Broker 的动态注册与发现, NameServer是无状态化,节点之间无任何信息同步。

主要包括两个功能:

  • Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活
  • 路由信息管理,每个NameServer将保存关于 Broker 集群的整个路由信息和用于客户端查询的队列信息。Producer和Consumer通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。

Broker

Broker主要负责消息的存储、投递和查询以及服务高可用保证. Broker采用Master-Slave架构解决高可用问题,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master可以多个,提升消息写入能力

1.每个 Broker 与 NameServer 集群中的所有节点建立长连接,定时注册 Topic 信息到所有 NameServer
2.Producer 与 NameServer 集群中的其中一个节点建立长连接,定期从 NameServer 获取Topic路由信息,并向提供 Topic 服务的 Master 建立长连接,且定时向 Master 发送心跳。Producer 完全无状态
3.Consumer 与 NameServer 集群中的其中一个节点建立长连接,定期从 NameServer 获取 Topic 路由信息,并向提供 Topic 服务的MasterSlave 建立长连接,且定时向 Master、Slave发送心跳。Consumer 既可以从 Master 订阅消息,也可以从Slave订阅消息。

部署方式

  • 单Master模式:只有一个节点,一般用于测试环境
  • 多Master模式:多个Master,提升负载能力,但是不部署Slave,单台Master故障,该Master上未被消费的消息在机器恢复之前不可消费
  • 多Master-Slave异步复制:解决单台Master故障问题,消费者可以从Slave消费,由于是异步,主备有短暂消息延迟(毫秒级)
  • 多Master-Slave同步复制: 同上,只是采用主备采用同步机制,性能略低10%左右,发送单个消息RT会略高,但是消息无延迟

一般生产环境采用的是多Master-Slave异步复制模式

二、消息模型

消息模型图

消费者组

为了消息消费能力的水平扩展,引入消费者组。

消费模式

同一个消费者组的消费模式:

  • 集群模式(默认)
    同一个 ConsumerGroup 中的 Consumer 实例是负载均衡消费,即任意一条消息只需要被消费组内的任意一个消费者处理即可
  • 广播模式
    同一个 ConsumerGroup 中的每个 Consumer 实例都处理全部的队列

1.集群模式下扩缩消费者数量也无法提升或降低消费能力,但当Topic的总队列数小于消费者的数量时,消费者将分配不到队列,即使消费者再多也无法提升消费能力
3.广播模式下扩缩消费者数量也无法提升或降低消费能力

  • 支持推Push和拉Pull两种模式

三、基本概念

队列

为了支持高并发和水平扩展,需要对 Topic 进行分区,称为队列。一个 Topic 可能有多个队列,并且可能分布在不同的 Broker 上

主题Topic&Tag

Topic 与 Tag 都是业务上用来归类的标识,区别在于 Topic 是一级分类,而 Tag 可以理解为是二级分类。可使用 Tag 可以实现对 Topic 中的消息进行过滤。通常情况下,不同的 Topic 之间的消息没有必然的联系,而 Tag 则用来区分同一个 Topic 下相互关联的消息

Keys

消息的唯一标识码,方便定位消息丢失问题。

消费位点

每个队列都会记录自己的最小位点、最大位点,针对消费者组,会有消费位点,在集群模式下,消费位点是由客户端提给交服务端保存的,在广播模式下,消费位点是由客户端自己保存的

四、代码示例

消息发送分为三种方式:同步异步单向传输,前两种是可靠的,无论是否成功都有相应,最后一种只管发送没有返回结果

public class SyncProducer {
  public static void main(String[] args) throws Exception {
    // 初始化一个producer并设置Producer group name
    DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); //(1)
    // 设置NameServer地址
    producer.setNamesrvAddr("localhost:9876");  //(2)
    // 启动producer
    producer.start();
    for (int i = 0; i < 100; i++) {
      // 创建一条消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤
      Message msg = new Message("TopicTest" /* Topic */,
        "TagA" /* Tag */,
        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
        );   //(3)
      // 利用producer进行发送,并同步等待发送结果
      SendResult sendResult = producer.send(msg);   //(4)
      System.out.printf("%s%n", sendResult);
    }
    // 一旦producer不再使用,关闭producer
    producer.shutdown();
  }
}

Push消费

public class Consumer {
  public static void main(String[] args) throws InterruptedException, MQClientException {
    // 初始化consumer,并设置consumer group name
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
   
    // 设置NameServer地址 
    consumer.setNamesrvAddr("localhost:9876");
    //设置消费模式,默认是集群模式
    //consumer.setMessageModel(MessageModel.BROADCASTING);
    //订阅一个或多个topic,并指定tag过滤条件,这里指定*表示接收所有tag的消息
    consumer.subscribe("TopicTest", "*");
    //注册回调接口来处理从Broker中收到的消息
    consumer.registerMessageListener(new MessageListenerConcurrently() {
      @Override
      public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        System.out.println(new String(msgs.get(0).getBody(),StandardCharsets.UTF_8))
        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
        // 返回消息消费状态,ConsumeConcurrentlyStatus.CONSUME_SUCCESS为消费成功
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
      }
    });
    // 启动Consumer
    consumer.start();
    System.out.printf("Consumer Started.%n");
  }
}

Pull 消费

public class LitePullConsumerSubscribe {
    public static volatile boolean running = true;
    public static void main(String[] args) throws Exception {
        DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("lite_pull_consumer_test");
        litePullConsumer.subscribe("TopicTest", "*");
        litePullConsumer.setPullBatchSize(20);
        litePullConsumer.start();
        try {
            while (running) {
                List<MessageExt> messageExts = litePullConsumer.poll();
                System.out.printf("%s%n", messageExts);
            }
        } finally {
            litePullConsumer.shutdown();
        }
    }
}

控制台

rocketmq-dashboard

官网:https://github.com/apache/rocketmq-dashboard

官网:https://rocketmq.apache.org/
Github: https://github.com/apache/rocketmq

相关文章

  • rocketmq总目录

    实战 rocketmq最简单的入门demo rocketmq的常用概念,接口和方法 rocketmq的正式部署 高...

  • Apache RocketMQ从入门到实战

    《Apache RocketMQ从入门到实战》在 2017 年听到阿里巴巴将 RocketMQ 捐赠给 Apach...

  • RocketMQ

    RocketMQ实战(一)RocketMQ实战(二)RocketMQ实战(三):分布式事务RocketMQ实战(四...

  • RocketMQ实战入门

    (一)安装与启动 下载rocketmq-4.8.0二进制包,启动name server和broker: 注:Roc...

  • RocketMQ事务消息与分布式事务

    在《RocketMQ实战入门》[https://www.jianshu.com/p/1e91225c41a9]里我...

  • RocketMQ实战 - 快速入门

    RocketMQ 是阿里开源的(目前已捐赠给Apache了)一款高性能、高吞吐量的分布式消息中间件。 参考资料 十...

  • RocketMQ实战(三):分布式事务

    接 《RocketMQ实战(一)》,《RocketMQ实战(二)》,本篇博客主要讨论的话题是:顺序消费、RMQ在分...

  • RocketMQ实战(四)

    前言 这将是RocketMQ实战系列的最后一篇文章,该系列的文章列表如下: 《RocketMQ实战(一)》 《Ro...

  • RocketMQ 实战之快速入门

    最近 RocketMQ 刚刚上生产环境,闲暇之时在这里做一些分享,主要目的是让初学者能快速上手RocketMQ。 ...

  • RocketMQ入门

    RocketMQ入门 1. RocketMQ简介 RocketMQ是阿里开源的消息中间件,它是纯java开发,具有...

网友评论

      本文标题:RocketMQ入门实战

      本文链接:https://www.haomeiwen.com/subject/xnxmcdtx.html