美文网首页
四、RocketMQ案例(一)

四、RocketMQ案例(一)

作者: 恨别A鸟惊心 | 来源:发表于2019-03-31 10:54 被阅读0次

    1、订单消息

    RocketMQ使用FIFO顺序提供有序消息。

    以下示例演示了发送/接收全局和分区排序的消息。

    发送消息示例代码

    public class OrderedProducer {
        public static void main(String[] args) throws Exception {
            //Instantiate with a producer group name.
            DefaultMQProducer producer = new DefaultMQProducer("example_group_name");
            // Specify name server addresses.
            producer.setNamesrvAddr("192.168.247.132:9876");
            //Launch the instance.
            producer.start();
            String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
            for (int i = 0; i < 100; i++) {
                int orderId = i % 10;
                //Create a message instance, specifying topic, tag and message body.
                Message msg = new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    Integer id = (Integer) arg;
                    int index = id % mqs.size();
                    return mqs.get(index);
                }
                }, orderId);
    
                System.out.printf("%s%n", sendResult);
            }
            //server shutdown
            producer.shutdown();
        }
    }
    
    

    订阅消息示例代码

    
    public class OrderedConsumer {
        public static void main(String[] args) throws Exception {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
            consumer.setNamesrvAddr("192.168.247.132:9876");
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    
            consumer.subscribe("TopicTest", "TagA || TagC || TagD");
    
            consumer.registerMessageListener(new MessageListenerOrderly() {
    
                AtomicLong consumeTimes = new AtomicLong(0);
    
                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
                                                           ConsumeOrderlyContext context) {
                    context.setAutoCommit(false);
                    System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
                    this.consumeTimes.incrementAndGet();
                    if ((this.consumeTimes.get() % 2) == 0) {
                        return ConsumeOrderlyStatus.SUCCESS;
                    } else if ((this.consumeTimes.get() % 3) == 0) {
                        return ConsumeOrderlyStatus.ROLLBACK;
                    } else if ((this.consumeTimes.get() % 4) == 0) {
                        return ConsumeOrderlyStatus.COMMIT;
                    } else if ((this.consumeTimes.get() % 5) == 0) {
                        context.setSuspendCurrentQueueTimeMillis(3000);
                        return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                    }
                    return ConsumeOrderlyStatus.SUCCESS;
    
                }
            });
    
            consumer.start();
    
            System.out.printf("Consumer Started.%n");
        }
    }
    
    

    2、Broadcasting 广播消息

    什么是Broadcasting

    Broadcasting正在向主题的所有订阅者发送消息。如果您希望所有订阅者都收到有关主题的消息,则Broadcasting是一个不错的选择。

    Producer example

    public class BroadcastProducer {
        public static void main(String[] args) throws Exception {
            DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
            producer.setNamesrvAddr("192.168.247.132:9876");
            producer.start();
    
            for (int i = 0; i < 100; i++){
                Message msg = new Message("TopicTest",
                    "TagA",
                    "OrderID188",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            }
            producer.shutdown();
        }
    }
    
    

    消费者的例子

    public class BroadcastConsumer {
        public static void main(String[] args) throws Exception {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
            consumer.setNamesrvAddr("192.168.247.132:9876");
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    
            //set to broadcast mode
            consumer.setMessageModel(MessageModel.BROADCASTING);
    
            consumer.subscribe("TopicTest", "TagA || TagC || TagD");
    
            consumer.registerMessageListener(new MessageListenerConcurrently() {
    
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                    ConsumeConcurrentlyContext context) {
                    System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
    
            consumer.start();
            System.out.printf("Broadcast Consumer Started.%n");
        }
    }
    

    3、Schedule example 计划消息案例

    什么是Schedule信息?

    Schedule消息与普通消息的不同之处在于,它们将在指定的时间之后发送。

    应用

    启动consumer以等待传入的订阅消息

    ```
     import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
     import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
     import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
     import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
     import org.apache.rocketmq.common.message.MessageExt;
     import java.util.List;
    
     public class ScheduledMessageConsumer {
    
         public static void main(String[] args) throws Exception {
             // Instantiate message consumer
             DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
              consumer.setNamesrvAddr("192.168.247.132:9876");
             // Subscribe topics
             consumer.subscribe("TestTopic", "*");
             // Register message listener
             consumer.registerMessageListener(new MessageListenerConcurrently() {
                 @Override
                 public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
                     for (MessageExt message : messages) {
                         // Print approximate delay time period
                         System.out.println("Receive message[msgId=" + message.getMsgId() + "] "
                                 + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
                     }
                     return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                 }
             });
             // Launch consumer
             consumer.start();
         }
     }
    
    ```
    

    发送预定的消息

    ```
     import org.apache.rocketmq.client.producer.DefaultMQProducer;
     import org.apache.rocketmq.common.message.Message;
    
     public class ScheduledMessageProducer {
    
         public static void main(String[] args) throws Exception {
             // Instantiate a producer to send scheduled messages
             DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
              producer.setNamesrvAddr("192.168.247.132:9876");
             // Launch producer
             producer.start();
             int totalMessagesToSend = 100;
             for (int i = 0; i < totalMessagesToSend; i++) {
                 Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
                 // This message will be delivered to consumer 10 seconds later.
                 message.setDelayTimeLevel(3);
                 // Send the message
                 producer.send(message);
             }
    
             // Shutdown producer after use.
             producer.shutdown();
         }
    
     }
    
    ```
    
    1. 验证

      您应该看到消息比其存储时间晚10秒消耗。

    4、Batch Example 批量消息示例

    为何Batch ?

    Batch发送消息可提高传递大量短消息的性能。

    使用限制

    同一批次的消息应该具有:相同的主题,相同的waitStoreMsgOK和没有schedule计划支持。

    此外,一批消息的总大小不应超过1MiB。

    如何使用批次

    如果您一次只发送不超过1MiB的消息集,则很容易使用Batch:

    String topic = "BatchTest";
    List<Message> messages = new ArrayList<>();
    messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
    messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
    messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
    try {
        producer.send(messages);
    } catch (Exception e) {
        e.printStackTrace();
        //handle the error
    }
    
    

    拆分成列表

    只有在发送大批量时,复杂性才会增加,您可能不确定它是否超出了大小限制(1MiB)。

    此时,您最好拆分列表:

    public class ListSplitter implements Iterator<List<Message>> {
        private final int SIZE_LIMIT = 1000 * 1000;
        private final List<Message> messages;
        private int currIndex;
        public ListSplitter(List<Message> messages) {
                this.messages = messages;
        }
        @Override public boolean hasNext() {
            return currIndex < messages.size();
        }
        @Override public List<Message> next() {
            int nextIndex = currIndex;
            int totalSize = 0;
            for (; nextIndex < messages.size(); nextIndex++) {
                Message message = messages.get(nextIndex);
                int tmpSize = message.getTopic().length() + message.getBody().length;
                Map<String, String> properties = message.getProperties();
                for (Map.Entry<String, String> entry : properties.entrySet()) {
                    tmpSize += entry.getKey().length() + entry.getValue().length();
                }
                tmpSize = tmpSize + 20; //for log overhead
                if (tmpSize > SIZE_LIMIT) {
                    //it is unexpected that single message exceeds the SIZE_LIMIT
                    //here just let it go, otherwise it will block the splitting process
                    if (nextIndex - currIndex == 0) {
                       //if the next sublist has no element, add this one and then break, otherwise just break
                       nextIndex++;  
                    }
                    break;
                }
                if (tmpSize + totalSize > SIZE_LIMIT) {
                    break;
                } else {
                    totalSize += tmpSize;
                }
    
            }
            List<Message> subList = messages.subList(currIndex, nextIndex);
            currIndex = nextIndex;
            return subList;
        }
    }
    //then you could split the large list into small ones:
    ListSplitter splitter = new ListSplitter(messages);
    while (splitter.hasNext()) {
       try {
           List<Message>  listItem = splitter.next();
           producer.send(listItem);
       } catch (Exception e) {
           e.printStackTrace();
           //handle the error
       }
    }
    

    相关文章

      网友评论

          本文标题:四、RocketMQ案例(一)

          本文链接:https://www.haomeiwen.com/subject/yldjbqtx.html