美文网首页
4、RocketMQ基础-消息发送样例

4、RocketMQ基础-消息发送样例

作者: 站得高看得远 | 来源:发表于2021-08-02 10:11 被阅读0次

    消息发送样例

    1. 导入MQ客户端依赖
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>4.4.0</version>
    </dependency>
    

    2、* 消息发送者步骤分析

    1.创建消息生产者producer,并制定生产者组名
    2.指定Nameserver地址
    3.启动producer
    4.创建消息对象,指定主题Topic、Tag和消息体
    5.发送消息
    6.关闭生产者producer
    
      • 消息消费者步骤分析
    1.创建消费者Consumer,制定消费者组名
    2.指定Nameserver地址
    3.订阅主题Topic和Tag
    4.设置回调函数,处理消息
    5.启动消费者consumer
    

    基本样例

    消息发送

    发送同步消息

    这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。

    public class SyncProducer {
        public static void main(String[] args) throws Exception {
            // 实例化消息生产者Producer
            DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
            // 设置NameServer的地址
            producer.setNamesrvAddr("localhost:9876");
            // 启动Producer实例
            producer.start();
            for (int i = 0; i < 100; i++) {
                // 创建消息,并指定Topic,Tag和消息体
                Message msg = new Message("TopicTest" /* Topic */,
                "TagA" /* Tag */,
                ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );
                // 发送消息到一个Broker
                SendResult sendResult = producer.send(msg);
                // 通过sendResult返回消息是否成功送达
                System.out.printf("%s%n", sendResult);
            }
            // 如果不再发送消息,关闭Producer实例。
            producer.shutdown();
        }
    }
    

    发送异步消息

    异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。

    public class AsyncProducer {
        public static void main(String[] args) throws Exception {
            // 实例化消息生产者Producer
            DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
            // 设置NameServer的地址
            producer.setNamesrvAddr("localhost:9876");
            // 启动Producer实例
            producer.start();
            producer.setRetryTimesWhenSendAsyncFailed(0);
            for (int i = 0; i < 100; i++) {
                    final int index = i;
                    // 创建消息,并指定Topic,Tag和消息体
                    Message msg = new Message("TopicTest",
                        "TagA",
                        "OrderID188",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    // SendCallback接收异步返回结果的回调
                    producer.send(msg, new SendCallback() {
                        @Override
                        public void onSuccess(SendResult sendResult) {
                            System.out.printf("%-10d OK %s %n", index,
                                sendResult.getMsgId());
                        }
                        @Override
                        public void onException(Throwable e) {
                          System.out.printf("%-10d Exception %s %n", index, e);
                          e.printStackTrace();
                        }
                    });
            }
            // 如果不再发送消息,关闭Producer实例。
            producer.shutdown();
        }
    }
    

    单向发送消息

    这种方式主要用在不特别关心发送结果的场景,例如日志发送。

    public class OnewayProducer {
        public static void main(String[] args) throws Exception{
            // 实例化消息生产者Producer
            DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
            // 设置NameServer的地址
            producer.setNamesrvAddr("localhost:9876");
            // 启动Producer实例
            producer.start();
            for (int i = 0; i < 100; i++) {
                // 创建消息,并指定Topic,Tag和消息体
                Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );
                // 发送单向消息,没有任何返回结果
                producer.sendOneway(msg);
    
            }
            // 如果不再发送消息,关闭Producer实例。
            producer.shutdown();
        }
    }
    

    消费消息

    负载均衡模式

    消费者采用负载均衡方式消费消息,多个消费者共同消费队列消息,每个消费者处理的消息不同

    public static void main(String[] args) throws Exception {
        // 实例化消息生产者,指定组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        // 指定Namesrv地址信息.
        consumer.setNamesrvAddr("localhost:9876");
        // 订阅Topic
        consumer.subscribe("Test", "*");
        //负载均衡模式消费
        consumer.setMessageModel(MessageModel.CLUSTERING);
        // 注册回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", 
                                  Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //启动消费者
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
    

    广播模式

    消费者采用广播的方式消费消息,每个消费者消费的消息都是相同的

    public static void main(String[] args) throws Exception {
        // 实例化消息生产者,指定组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        // 指定Namesrv地址信息.
        consumer.setNamesrvAddr("localhost:9876");
        // 订阅Topic
        consumer.subscribe("Test", "*");
        //广播模式消费
        consumer.setMessageModel(MessageModel.BROADCASTING);
        // 注册回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", 
                                  Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //启动消息者
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
    

    顺序消息

    消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。

    顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。

    下面用订单进行分区有序的示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。

    顺序消息生产

    /**
    * Producer,发送顺序消息
    */
    public class Producer {
    
       public static void main(String[] args) throws Exception {
           DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    
           producer.setNamesrvAddr("127.0.0.1:9876");
    
           producer.start();
    
           String[] tags = new String[]{"TagA", "TagC", "TagD"};
    
           // 订单列表
           List<OrderStep> orderList = new Producer().buildOrders();
    
           Date date = new Date();
           SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
           String dateStr = sdf.format(date);
           for (int i = 0; i < 10; i++) {
               // 加个时间前缀
               String body = dateStr + " Hello RocketMQ " + orderList.get(i);
               Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, body.getBytes());
    
               SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                   @Override
                   public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                       Long id = (Long) arg;  //根据订单id选择发送queue
                       long index = id % mqs.size();
                       return mqs.get((int) index);
                   }
               }, orderList.get(i).getOrderId());//订单id
    
               System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
                   sendResult.getSendStatus(),
                   sendResult.getMessageQueue().getQueueId(),
                   body));
           }
    
           producer.shutdown();
       }
    
       /**
        * 订单的步骤
        */
       private static class OrderStep {
           private long orderId;
           private String desc;
    
           public long getOrderId() {
               return orderId;
           }
    
           public void setOrderId(long orderId) {
               this.orderId = orderId;
           }
    
           public String getDesc() {
               return desc;
           }
    
           public void setDesc(String desc) {
               this.desc = desc;
           }
    
           @Override
           public String toString() {
               return "OrderStep{" +
                   "orderId=" + orderId +
                   ", desc='" + desc + '\'' +
                   '}';
           }
       }
    
       /**
        * 生成模拟订单数据
        */
       private List<OrderStep> buildOrders() {
           List<OrderStep> orderList = new ArrayList<OrderStep>();
    
           OrderStep orderDemo = new OrderStep();
           orderDemo.setOrderId(15103111039L);
           orderDemo.setDesc("创建");
           orderList.add(orderDemo);
    
           orderDemo = new OrderStep();
           orderDemo.setOrderId(15103111065L);
           orderDemo.setDesc("创建");
           orderList.add(orderDemo);
    
           orderDemo = new OrderStep();
           orderDemo.setOrderId(15103111039L);
           orderDemo.setDesc("付款");
           orderList.add(orderDemo);
    
           orderDemo = new OrderStep();
           orderDemo.setOrderId(15103117235L);
           orderDemo.setDesc("创建");
           orderList.add(orderDemo);
    
           orderDemo = new OrderStep();
           orderDemo.setOrderId(15103111065L);
           orderDemo.setDesc("付款");
           orderList.add(orderDemo);
    
           orderDemo = new OrderStep();
           orderDemo.setOrderId(15103117235L);
           orderDemo.setDesc("付款");
           orderList.add(orderDemo);
    
           orderDemo = new OrderStep();
           orderDemo.setOrderId(15103111065L);
           orderDemo.setDesc("完成");
           orderList.add(orderDemo);
    
           orderDemo = new OrderStep();
           orderDemo.setOrderId(15103111039L);
           orderDemo.setDesc("推送");
           orderList.add(orderDemo);
    
           orderDemo = new OrderStep();
           orderDemo.setOrderId(15103117235L);
           orderDemo.setDesc("完成");
           orderList.add(orderDemo);
    
           orderDemo = new OrderStep();
           orderDemo.setOrderId(15103111039L);
           orderDemo.setDesc("完成");
           orderList.add(orderDemo);
    
           return orderList;
       }
    }
    

    顺序消费消息

    public class ScheduledMessageConsumer {
       public static void main(String[] args) throws Exception {
          // 实例化消费者
          DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
          // 订阅Topics
          consumer.subscribe("TestTopic", "*");
          // 注册消息监听者
          consumer.registerMessageListener(new MessageListenerConcurrently() {
              @Override
              public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
                  for (MessageExt message : messages) {
                      // Print approximate delay time period
                      System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
                  }
                  return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
              }
          });
          // 启动消费者
          consumer.start();
      }
    }
    

    发送延时消息

    public class ScheduledMessageProducer {
       public static void main(String[] args) throws Exception {
          // 实例化一个生产者来产生延时消息
          DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
          // 启动生产者
          producer.start();
          int totalMessagesToSend = 100;
          for (int i = 0; i < totalMessagesToSend; i++) {
              Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
              // 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)
              message.setDelayTimeLevel(3);
              // 发送消息
              producer.send(message);
          }
           // 关闭生产者
          producer.shutdown();
      }
    }
    

    验证

    您将会看到消息的消费比存储时间晚10秒

    使用限制

    // org/apache/rocketmq/store/config/MessageStoreConfig.java
    private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
    

    现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18

    批量消息

    批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的topic,相同的waitStoreMsgOK,而且不能是延时消息。此外,这一批消息的总大小不应超过4MB。

    发送批量消息

    如果您每次只发送不超过4MB的消息,则很容易使用批处理,样例如下:

    String topic = "BatchTest";
    List<Message> messages = new ArrayList<>();
    messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
    messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
    messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
    try {
       producer.send(messages);
    } catch (Exception e) {
       e.printStackTrace();
       //处理error
    }
    

    如果消息的总长度可能大于4MB时,这时候最好把消息进行分割

    public class ListSplitter implements Iterator<List<Message>> {
       private final int SIZE_LIMIT = 1024 * 1024 * 4;
       private final List<Message> messages;
       private int currIndex;
       public ListSplitter(List<Message> messages) {
               this.messages = messages;
       }
        @Override 
        public boolean hasNext() {
           return currIndex < messages.size();
       }
        @Override 
        public List<Message> next() {
           int nextIndex = currIndex;
           int totalSize = 0;
           for (; nextIndex < messages.size(); nextIndex++) {
               Message message = messages.get(nextIndex);
               int tmpSize = message.getTopic().length() + message.getBody().length;
               Map<String, String> properties = message.getProperties();
               for (Map.Entry<String, String> entry : properties.entrySet()) {
                   tmpSize += entry.getKey().length() + entry.getValue().length();
               }
               tmpSize = tmpSize + 20; // 增加日志的开销20字节
               if (tmpSize > SIZE_LIMIT) {
                   //单个消息超过了最大的限制
                   //忽略,否则会阻塞分裂的进程
                   if (nextIndex - currIndex == 0) {
                      //假如下一个子列表没有元素,则添加这个子列表然后退出循环,否则只是退出循环
                      nextIndex++;
                   }
                   break;
               }
               if (tmpSize + totalSize > SIZE_LIMIT) {
                   break;
               } else {
                   totalSize += tmpSize;
               }
    
           }
           List<Message> subList = messages.subList(currIndex, nextIndex);
           currIndex = nextIndex;
           return subList;
       }
    }
    //把大的消息分裂成若干个小的消息
    ListSplitter splitter = new ListSplitter(messages);
    while (splitter.hasNext()) {
      try {
          List<Message>  listItem = splitter.next();
          producer.send(listItem);
      } catch (Exception e) {
          e.printStackTrace();
          //处理error
      }
    }
    

    过滤消息

    在大多数情况下,TAG是一个简单而有用的设计,其可以来选择您想要的消息。例如:

    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
    consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");
    

    消费者将接收包含TAGA或TAGB或TAGC的消息。但是限制是一个消息只能有一个标签,这对于复杂的场景可能不起作用。在这种情况下,可以使用SQL表达式筛选消息。SQL特性可以通过发送消息时的属性来进行计算。在RocketMQ定义的语法下,可以实现一些简单的逻辑。下面是一个例子:

    ------------
    | message  |
    |----------|  a > 5 AND b = 'abc'
    | a = 10   |  --------------------> Gotten
    | b = 'abc'|
    | c = true |
    ------------
    ------------
    | message  |
    |----------|   a > 5 AND b = 'abc'
    | a = 1    |  --------------------> Missed
    | b = 'abc'|
    | c = true |
    ------------
    

    SQL基本语法

    RocketMQ只定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。

    • 数值比较,比如:>,>=,<,<=,BETWEEN,=;
    • 字符比较,比如:=,<>,IN;
    • IS NULL 或者 IS NOT NULL;
    • 逻辑符号 AND,OR,NOT;

    常量支持类型为:

    • 数值,比如:123,3.1415;
    • 字符,比如:'abc',必须用单引号包裹起来;
    • NULL,特殊的常量
    • 布尔值,TRUEFALSE

    只有使用push模式的消费者才能用使用SQL92标准的sql语句,接口如下:

    public void subscribe(finalString topic, final MessageSelector messageSelector)
    

    消息生产者

    发送消息时,你能通过putUserProperty来设置消息的属性

    DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    producer.start();
    Message msg = new Message("TopicTest",
       tag,
       ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
    );
    // 设置一些属性
    msg.putUserProperty("a", String.valueOf(i));
    SendResult sendResult = producer.send(msg);
    
    producer.shutdown();
    

    消息消费者

    用MessageSelector.bySql来使用sql筛选消息

    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
    // 只有订阅的消息有这个属性a, a >=0 and a <= 3
    consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
       @Override
       public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
           return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
       }
    });
    consumer.start();
    

    事务消息

    流程分析

    事务消息流程图

    上图说明了事务消息的大致方案,其中分为两个流程:正常事务消息的发送及提交、事务消息的补偿流程。

    1. 事务消息发送及提交
      1. 发送消息(half消息)。
      2. 服务端响应消息写入结果。
      3. 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。
      4. 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)
    2. 事务补偿
      1. 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”
      2. Producer收到回查消息,检查回查消息对应的本地事务的状态
      3. 根据本地事务状态,重新Commit或者Rollback。其中,补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况。
    3. 事务消息状态
      事务消息共有三种状态,提交状态、回滚状态、中间状态:
    • TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。
    • TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。
    • TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态。

    发送事务消息

    1. 创建事务性生产者

      使用 TransactionMQProducer类创建生产者,并指定唯一的 ProducerGroup,就可以设置自定义线程池来处理这些检查请求。执行本地事务后、需要根据执行结果对消息队列进行回复。回传的事务状态在请参考前一节。

      public class Producer {
          public static void main(String[] args) throws MQClientException, InterruptedException {
              //创建事务监听器
              TransactionListener transactionListener = new TransactionListenerImpl();
              //创建消息生产者
              TransactionMQProducer producer = new TransactionMQProducer("group6");
              producer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876");
              //生产者这是监听器
              producer.setTransactionListener(transactionListener);
              //启动消息生产者
              producer.start();
              String[] tags = new String[]{"TagA", "TagB", "TagC"};
              for (int i = 0; i < 3; i++) {
                  try {
                      Message msg = new Message("TransactionTopic", tags[i % tags.length], "KEY" + i,
                              ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                      SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                      System.out.printf("%s%n", sendResult);
                      TimeUnit.SECONDS.sleep(1);
                  } catch (MQClientException | UnsupportedEncodingException e) {
                      e.printStackTrace();
                  }
              }
              //producer.shutdown();
          }
      }
      
    2. 实现事务的监听接口
      当发送半消息成功时,我们使用 executeLocalTransaction 方法来执行本地事务。它返回前一节中提到的三个事务状态之一。checkLocalTranscation 方法用于检查本地事务状态,并回应消息队列的检查请求。它也是返回前一节中提到的三个事务状态之一。

    public class TransactionListenerImpl implements TransactionListener {
    
        @Override
        public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            System.out.println("执行本地事务");
            if (StringUtils.equals("TagA", msg.getTags())) {
                return LocalTransactionState.COMMIT_MESSAGE;
            } else if (StringUtils.equals("TagB", msg.getTags())) {
                return LocalTransactionState.ROLLBACK_MESSAGE;
            } else {
                return LocalTransactionState.UNKNOW;
            }
    
        }
    
        @Override
        public LocalTransactionState checkLocalTransaction(MessageExt msg) {
            System.out.println("MQ检查消息Tag【"+msg.getTags()+"】的本地事务执行结果");
            return LocalTransactionState.COMMIT_MESSAGE;
        }
    }
    

    使用限制

    1. 事务消息不支持延时消息和批量消息。
    2. 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionCheckListener 类来修改这个行为。
    3. 事务消息将在 Broker 配置文件中的参数 transactionMsgTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionMsgTimeout 参数。
    4. 事务性消息可能不止一次被检查或消费。
    5. 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
    6. 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。

    相关文章

      网友评论

          本文标题:4、RocketMQ基础-消息发送样例

          本文链接:https://www.haomeiwen.com/subject/uuvdvltx.html