美文网首页
腾讯云TDMQ(RocketMQ)版介绍及入门

腾讯云TDMQ(RocketMQ)版介绍及入门

作者: 文景大大 | 来源:发表于2022-01-05 17:14 被阅读0次

    一、入门介绍

    1.1 TDMQ简介

    TDMQ总共有4个版本,分别是RocketMQ版、Pulsar版、RabbitMQ版、CMQ版;其中Pulsar是一款基于 Apache 顶级开源项目 Pulsar 自研的金融级分布式消息中间件,具备跨城高一致、高可靠、高并发的特性,RabbitMQ是Erlang编写的,CMQ是腾讯自研的,RocketMQ也是腾讯基于开源项目自研的;这里面除了Pulsar之外,其它目前都是公测阶段,不收费,但是也意味着可能不稳定。

    由于我们公司主要使用的就是开源RocketMQ,而且中台项目都是基于Spring Cloud Stream的,所以要无缝低成本切换的话,只有RocketMQ版本比较适合,本文就此展开介绍。

    TDMQ for RocketMQ是一款腾讯自主研发的消息队列服务,兼容 Apache RocketMQ 的各个组件与概念,支持 RocketMQ 4.6.1及以上版本的客户端零改造接入,同时具备计算存储分离灵活扩缩容的底层优势。能较好地应对各类营销活动带来的流量冲击,非常适用于顺序性以及事务性要求较高的场景,在电商交易、金融结算等领域有着十分广泛的应用。

    1.2 产品优势

    • 兼容开源4.3.0及以上版本,对于 4.6.1及以上版本的客户端支持零改造接入;
    • 服务(Broker)和存储(Bookie)分离,整体架构采用云原生无状态设计,用户可以按量使用和按需扩展,体验更加 Serverless 化,用户对底层资源无感知。
    • 多层级的资源结构,不仅基于命名空间做了虚拟隔离,也可以在集群维度做物理隔离。支持在命名空间维度为客户端配置权限校验,区分不同环境的客户端,方便灵活。
    • 对于消息数据采用分片的方式进行持久化,不容易产生数据倾斜等问题。当由于扩容、机器故障等导致的节点新增、删除时,不会触发重平衡而导致整个集群的吞吐急剧下降。
    • 支持普通消息、顺序消息、延时消息等多种消息类型,支持消息重试和死信机制,满足各类业务场景。
    • 单机最高可支持上万级别的生产消费吞吐,分布式架构,无状态服务,可以横向扩容来增强整个集群的吞吐。

    1.3 应用场景

    • 异步解耦,实现高效的异步通信和应用解耦,确保主站业务的连续性。
    • 削峰填谷,承担一个缓冲器的角色,将上游突增的请求集中收集,下游可以根据自己的实际处理能力来消费请求消息。
    • 顺序收发,提供一种专门应对需要严格按照顺序执行消息的功能,即保证消息 FIFO。
    • 分布式事务一致性,自动重推和海量堆积能力来实现事务补偿,实现最终一致性。
    • 分布式缓存同步,广播消费模式会被所有节点消费一次,相当于把信息同步到需要的每台机器上,可以取代缓存的作用。
    • 大数据分析, 与流式计算引擎相结合,可以很方便地实现业务数据的实时分析。

    1.4 使用限制

    • 同一地域内集群数量上限5个;
    • 同一集群内命名空间数量上限10个;
    • 单个命名空间TPS上限8000,生产和消费带宽上限400Mbps;
    • 单个命名空间内Topic上限1000;
    • 单个Topic生产者数量上限1000个,消费者数量上限500个;
    • 单个命名空间内Group上限10000个;
    • 消息最大保留时长15天;
    • 消息最大延迟时间40天;
    • 消息大小上限5MB;
    • 消费位点重置最长15天;

    1.5 收费标准

    2022年5月之前处于公测阶段,申请资格成功后,可以免费使用。

    1.6 资源介绍

    • 集群,用户手动创建一个可伸缩、免运维、零配置的RocketMQ集群,接入地址目前只支持内网接入,公网接入需要等到公测结束后才支持;通常建议不同环境创建不同的集群,达到物理隔离的目的;
    集群创建
    • 命名空间,在集群内部可以创建不同的命名空间,来逻辑隔离不同的消息,通常建议不同的项目群使用不同的命名空间;允许设置TTL时长、消息持久化策略、时长、空间等;
    命名空间创建
    • 角色与授权,在角色管理里面先创建需要的角色,然后在命名空间里面给角色赋权生产或者消费的权限;
    角色创建 角色赋权
    • Topic,消息的Topic需要手动创建,允许设置Topic中消息的类型、分区数量;
    Topic创建
    • Group,消费者组也需要手动创建,可以设置消费组消费开关和广播开关;
    Group创建

    1.7 消息与轨迹查询

    当一条消息从生产者发送到 TDMQ RocketMQ 版服务端,再由消费者进行消费,TDMQ RocketMQ 版会完整记录这条消息中间的流转过程,并以消息轨迹的形式呈现在控制台。消息轨迹记录了消息从生产端到 TDMQ RocketMQ 版服务端,最后到消费端的整个过程,包括各阶段的时间(精确到微秒)、执行结果、生产者 IP、消费者 IP 等。

    消息轨迹查询示意图

    使用限制如下:

    • 消息查询最多可以查询近3天的消息;
    • 一次性最多可以查询65536条消息;

    1.8 配置告警

    • 指标
      • 消息生产速率
      • 消息生产流量
      • 生产者数据量
      • 消息堆积数量
    • 告警频次
      • 分钟级别
      • 小时级别
      • 天级别
    • 通知模板
      • 接收对象可以是用户或者用户组;
      • 接收渠道可以是邮件、短信、微信、电话;

    1.9 TDMQ与开源RocketMQ对比

    对比图

    二、开发指南

    2.1 Java SDK

    首先需要导入RocketMQ的依赖:

    <!-- in your <dependencies> block -->
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>4.6.1</version>
    </dependency>
    
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-acl</artifactId>
        <version>4.6.1</version>
    </dependency>
    

    然后,我们创建一个生产者:

    // 实例化消息生产者Producer
    DefaultMQProducer producer = new DefaultMQProducer(
        namespace, 
        groupName,
        new AclClientRPCHook(new SessionCredentials(accessKey, secretKey)) // ACL权限
    );
    // 设置NameServer的地址
    producer.setNamesrvAddr(nameserver);
    // 启动Producer实例
    producer.start();
    

    然后发送消息分为如下几种方式:

    • 同步发送

      同一个线程同步阻塞式地进行消息的发送,比较适合需要根据上一次发送结果来做逻辑判断,以决定下一次是否发送的场景。

      for (int i = 0; i < 10; i++) {
           // 创建消息实例,设置topic和消息内容
           Message msg = new Message(topic_name, "TAG", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
           // 发送消息
           SendResult sendResult = producer.send(msg);
           System.out.printf("%s%n", sendResult);
      }
      
    • 异步发送

      不同的线程同时并发地进行消息的发送,比较适合各个消息之间无依赖的场景,发送效率得到了提升。

      // 设置发送失败后不重试
      producer.setRetryTimesWhenSendAsyncFailed(0);
      // 设置发送消息的数量
      int messageCount = 10;
      final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
      for (int i = 0; i < messageCount; i++) {
           try {
                   final int index = i;
                   // 创建消息实体,设置topic和消息内容
                   Message msg = new Message(topic_name, "TAG", ("Hello rocketMq " + index).getBytes(RemotingHelper.DEFAULT_CHARSET));
                   producer.send(msg, new SendCallback() {
                           @Override
                           public void onSuccess(SendResult sendResult) {
                                   // 消息发送成功逻辑
                                   countDownLatch.countDown();
                                   System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
                           }
      
                           @Override
                           public void onException(Throwable e) {
                                   // 消息发送失败逻辑
                                   countDownLatch.countDown();
                                   System.out.printf("%-10d Exception %s %n", index, e);
                                   e.printStackTrace();
                           }
                   });
           } catch (Exception e) {
                   e.printStackTrace();
           }
      }
      countDownLatch.await(5, TimeUnit.SECONDS);
      
    • 单向发送

      同一个线程同步阻塞式地进行消息的发送,但是发送者不用等待broker的消息确认返回,即开始下一个消息的发送,比较快速,适合于对消息少量丢失不敏感的场景。

      for (int i = 0; i < 10; i++) {
           // 创建消息实例,设置topic和消息内容
           Message msg = new Message(topic_name, "TAG", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
           // 发送单向消息
           producer.sendOneway(msg);
      }
      

    然后再设置消费者,分为如下两种:

    • push

      // 实例化消费者
      DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer(
           namespace,                                                  
           groupName,                                              
           new AclClientRPCHook(new SessionCredentials(accessKey, secretKey))); //ACL权限
      // 设置NameServer的地址
      pushConsumer.setNamesrvAddr(nameserver);
      
      // 订阅topic
      pushConsumer.subscribe(topic_name, "*");
      // 设置消费模式:CLUSTERING-集群负载消费(默认);BROADCASTING-广播消费
      //consumer.setMessageModel(MessageModel.CLUSTERING);
      // 注册回调实现类来处理从broker拉取回来的消息
      pushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
           // 消息处理逻辑
           System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
           // 标记该消息已经被成功消费, 根据消费情况,返回处理状态
           return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
      });
      // 启动消费者实例
      pushConsumer.start();
      
    • pull

      // 实例化消费者
      DefaultLitePullConsumer pullConsumer = new DefaultLitePullConsumer(
           namespace,                                               
           groupName,                                             
           new AclClientRPCHook(new SessionCredentials(accessKey, secretKey)));
      // 设置NameServer的地址
      pullConsumer.setNamesrvAddr(nameserver);
      // 设置从第一个偏移量开始消费
      pullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
      
      // 订阅topic
      pullConsumer.subscribe(topic_name, "*");
      // 设置消费模式:CLUSTERING-集群负载消费(默认);BROADCASTING-广播消费
      //consumer.setMessageModel(MessageModel.CLUSTERING);
      // 启动消费者实例
      pullConsumer.start();
      try {
           System.out.printf("Consumer Started.%n");
           while (true) {
                   // 拉取消息
                   List<MessageExt> messageExts = pullConsumer.poll();
                   System.out.printf("%s%n", messageExts);
           }
      } finally {
           pullConsumer.shutdown();
      }
      

    2.2 Spring Boot Starter

    首先引入依赖:

    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.2.1</version>
    </dependency>
    

    然后增加配置信息如下:

    server:
      port: 8082
    
    #rocketmq配置信息
    rocketmq:
      # tdmq-rocketmq服务接入地址
      name-server: rocketmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:9876
      # 生产者配置
      producer:
        # 生产者组名
        group: group111
        # 角色密钥
        access-key: eyJrZXlJZC....
        # 已授权的角色名称
        secret-key: admin
      # 消费者公共配置
      consumer:
        # 角色密钥
        access-key: eyJrZXlJZC....
        # 已授权的角色名称
        secret-key: admin
    
      # 用户自定义配置
      namespace: rocketmq-xxx|namespace1
      producer1:
        topic: testdev1
      consumer1:
        group: group111
        topic: testdev1
        subExpression: TAG1
      consumer2:
        group: group222
        topic: testdev1
        subExpression: TAG2
    

    然后如下是一个发送消息的示例:

    /**
     * Description: 消息生产者
     */
    @Service
    public class SendMessage {
        // 需要使用topic全称,所以进行topic名称的拼接,也可以自己设置  格式:namespace全称%topic名称
        @Value("${rocketmq.namespace}%${rocketmq.producer1.topic}")
        private String topic;
    
        @Autowired
        private RocketMQTemplate rocketMQTemplate;
    
        /**
         * 同步发送
         *
         * @param message 消息内容
         * @param tags    订阅tags
         */
        public void syncSend(String message, String tags) {
            // springboot不支持使用header传递tags,根据要求,需要在topic后进行拼接 formats: `topicName:tags`,不拼接标识无tag
            String destination = StringUtils.isBlank(tags) ? topic : topic + ":" + tags;
            SendResult sendResult = rocketMQTemplate.syncSend(destination,
                    MessageBuilder.withPayload(message)
                            .setHeader(MessageConst.PROPERTY_KEYS, "yourKey")   // 指定业务key
                            .build());
            System.out.printf("syncSend1 to topic %s sendResult=%s %n", topic, sendResult);
        }
    }
    

    如下是一个消费者示例:

    @Service
    @RocketMQMessageListener(
            // 消费组,格式:namespace全称%group名称
            consumerGroup = "${rocketmq.namespace}%${rocketmq.consumer1.group}",  
            // 需要使用topic全称,所以进行topic名称的拼接,也可以自己设置  格式:namespace全称%topic名称
            topic = "${rocketmq.namespace}%${rocketmq.consumer1.topic}",
            selectorExpression = "${rocketmq.consumer1.subExpression}" // 订阅表达式, 不配置表示订阅所有消息
    )
    public class MessageConsumer implements RocketMQListener<String> {
    
        @Override
        public void onMessage(String message) {
            System.out.println("Tag1Consumer receive message:" + message);
        }
    }
    

    2.3 Spring Cloud Stream

    如果不熟悉Stream使用的可以先参考文章Stream使用入门 - 简书 (jianshu.com)

    首先引入依赖:

    <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.7.1</version>
    </dependency>
    <dependency>
         <groupId>org.apache.rocketmq</groupId>
         <artifactId>rocketmq-acl</artifactId>
         <version>4.7.1</version>
    </dependency>
    
    <!--spring-cloud-starter-stream-rocketmq 里面的 RocketMQ 版本较老,需要排除掉,然后单独引用新的版本-->
    <dependency>
         <groupId>com.alibaba.cloud</groupId>
         <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
         <version>2.2.5-RocketMQ-RC1</version>
         <exclusions>
                 <exclusion>
                         <groupId>org.apache.rocketmq</groupId>
                         <artifactId>rocketmq-client</artifactId>
                 </exclusion>
                 <exclusion>
                         <groupId>org.apache.rocketmq</groupId>
                         <artifactId>rocketmq-acl</artifactId>
                 </exclusion>
         </exclusions>
    </dependency>
    

    然后是stream的配置:

    spring:
     cloud:
         stream:
             rocketmq:
                 bindings:
                     # channel名称, 与spring.cloud.stream.bindings下的channel名称对应
                     Topic-test1:
                         consumer:
                             # 订阅的tag类型,根据消费者实际情况进行配置(默认是订阅所有消息)
                             subscription: TAG1
                     # channel名称
                     Topic-test2:
                         consumer:
                             subscription: TAG2
                 binder:
                     # 服务地址全称
                     name-server: rocketmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:9876
                     # 角色名称
                     secret-key: admin
                     # 角色密钥
                     access-key: eyJrZXlJZ...
                     # namespace全称
                     namespace: rocketmq-xxx|namespace1
                     # 生成者group名称
                     group: group1
             bindings:
                 # channel名称
                 Topic-send:
                     # 指定topic, 对应创建的topic名称
                     destination: topic1
                     content-type: application/json
                     # 要使用group名称
                     group: group1
                 # channel名称
                 Topic-test1:
                     destination: topic1
                     content-type: application/json
                     group: group1
                 # channel名称
                 Topic-test2:
                     destination: topic1
                     content-type: application/json
                     group: group2
    

    如下是channel的示例:

    import org.springframework.cloud.stream.annotation.Input;
    import org.springframework.cloud.stream.annotation.Output;
    import org.springframework.messaging.MessageChannel;
    
    /**
    * 自定义通道 Binder
    */
    public interface CustomChannelBinder {
    
         /**
            * 发送消息(消息生产者)
            * 绑定配置中的channel名称
            */
         @Output("Topic-send")
         MessageChannel sendChannel();
    
    
         /**
            * 接收消息1(消费者1)
            * 绑定配置中的channel名称
            */
         @Input("Topic-test1")
         MessageChannel testInputChannel1();
    
         /**
            * 接收消息2(消费者2)
            * 绑定配置中的channel名称
            */
         @Input("Topic-test2")
         MessageChannel testInputChannel2();
    }
    

    在配置类或启动类上添加相应注解,如果有多个binder配置,都要在此注解中进行指定。

    @EnableBinding({CustomChannelBinder.class})
    

    发送消息示例:

    @Autowired
    private CustomChannelBinder channelBinder;
    
    Message<String> message = MessageBuilder.withPayload("This is a new message.").build();
    channelBinder.sendChannel().send(message);
    

    接收消息示例:

    @Service
    public class TestStreamConsumer {
         private final Logger logger = LoggerFactory.getLogger(DemoApplication.class);
             /**
            * 监听channel (配置中的channel名称)
            *
            * @param messageBody 消息内容
            */
         @StreamListener("Topic-test1")
         public void receive(String messageBody) {
                 logger.info("Receive1: 通过stream收到消息,messageBody = {}", messageBody);
         }
             /**
            * 监听channel (配置中的channel名称)
            *
            * @param messageBody 消息内容
            */
         @StreamListener("Topic-test2")
         public void receive2(String messageBody) {
                 logger.info("Receive2: 通过stream收到消息,messageBody = {}", messageBody);
         }
    }
    

    三、参考文档

    消息队列 RocketMQ 版 产品概述 - 产品简介 - 文档中心 - 腾讯云 (tencent.com)

    RocketMQ开发者使用指南入门 - 简书 (jianshu.com)

    相关文章

      网友评论

          本文标题:腾讯云TDMQ(RocketMQ)版介绍及入门

          本文链接:https://www.haomeiwen.com/subject/evhkcrtx.html