美文网首页
RocketMQ-广播消息

RocketMQ-广播消息

作者: 快点给我想个名 | 来源:发表于2019-06-23 23:21 被阅读0次
    广播消息
    • 生产者
    public class BroadcastProducer {
        public static void main(String[] args) throws Exception {
            DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
            producer.start();
    
            for (int i = 0; i < 100; i++){
                Message msg = new Message("TopicTest",
                    "TagA",
                    "OrderID188",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            }
            producer.shutdown();
        }
    }
    
    • 消费者
    public class BroadcastConsumer {
        public static void main(String[] args) throws Exception {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
    
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    
            //设置为广播模式
            consumer.setMessageModel(MessageModel.BROADCASTING);
    
            consumer.subscribe("TopicTest", "TagA || TagC || TagD");
    
            consumer.registerMessageListener(new MessageListenerConcurrently() {
    
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                    ConsumeConcurrentlyContext context) {
                    System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
    
            consumer.start();
            System.out.printf("Broadcast Consumer Started.%n");
        }
    }
    

    相关文章

      网友评论

          本文标题:RocketMQ-广播消息

          本文链接:https://www.haomeiwen.com/subject/imlgqctx.html