美文网首页Java 程序员Java
RocketMQ普通消息实战演练

RocketMQ普通消息实战演练

作者: 马小莫QAQ | 来源:发表于2022-09-04 15:54 被阅读0次

    之前研究了RocketMQ的源码,在这里将各种消息发送与消费的demo进行举例,方便以后使用的时候CV。

    普通消息同步发送

    同步消息是指发送出消息后,同步等待,直到接收到Broker发送成功的响应才会继续发送下一个消息。这个方式可以确保消息发送到Broker成功,一些重要的消息可以使用此方式,比如重要的通知。

    public static void main(String[] args) throws Exception {
        //实例化消息生产者对象
        DefaultMQProducer producer = new DefaultMQProducer("group_luke");
        //设置NameSever地址
        producer.setNamesrvAddr("127.0.0.1:9876");
        //启动Producer实例
        producer.start();
        for (int i = 0; i < 10; i++) {
            Message msg = new Message("topic_luke", "tag", ("这是第"+i+"条消息。").getBytes(StandardCharsets.UTF_8));
    
            //同步发送方式
            SendResult send = producer.send(msg);
            //确认返回
            System.out.println(send);
        }
        //关闭producer
        producer.shutdown();
    }
    

    普通消息异步发送

    异步消息发送方在发送了一条消息后,不等接收方发回响应,接着进行第二条消息发送。发送方通过回调接口的方式接收服务器响应,并对响应结果进行处理。

    public static void main(String[] args) throws Exception {
        //实例化消息生产者对象
        DefaultMQProducer producer = new DefaultMQProducer("group_luke");
        //设置NameSever地址
        producer.setNamesrvAddr("127.0.0.1:9876");
        //启动Producer实例
        producer.start();
        for (int i = 0; i < 10; i++) {
            Message msg = new Message("topic_luke", "tag", ("这是第"+i+"条消息。").getBytes(StandardCharsets.UTF_8));
            //SendCallback会接收异步返回结果的回调
            producer.send(msg, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println(sendResult);
                }
                @Override
                public void onException(Throwable throwable) {
                    throwable.printStackTrace();
                }
            });
        }
        //若是过早关闭producer,会抛出The producer service state not OK, SHUTDOWN_ALREADY的错
        Thread.sleep(10000);
        //关闭producer
        producer.shutdown();
    }
    

    普通消息单向发送

    单项发送不关心发送的结果,只发送请求不等待应答。发送消息耗时极短。

    public static void main(String[] args) throws Exception {
        //实例化消息生产者对象
        DefaultMQProducer producer = new DefaultMQProducer("group_luke");
        //设置NameSever地址
        producer.setNamesrvAddr("127.0.0.1:9876");
        //启动Producer实例
        producer.start();
        for (int i = 0; i < 10; i++) {
            Message msg = new Message("topic_luke", "tag", ("这是第"+i+"条消息。").getBytes(StandardCharsets.UTF_8));
    
            //同步发送方式
            producer.sendOneway(msg);
        }
        //关闭producer
        producer.shutdown();
    }
    

    集群消费模式

    消费者采用负载均衡的方式消费消息,同一个Group下的多个Consumer共同消费Queue里的Message,每个Consumer处理的消息不同。一个Consumer Group中的各个Consumer实例分共同消费消息,即一条消息只会投递到一个Group下面的一个实例,并且只消费一遍。

    例如某个Topic有3个队列,其中一个Consumer Group 有 3 个实例,那么每个实例只消费其中的1个队列。集群消费模式是消费者默认的消费方式。

    public static void main(String[] args) throws Exception {
        //实例化消息消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_luke");
        //指定nameserver地址
        consumer.setNamesrvAddr("127.0.0.1:9876");
        //订阅topic,"*"表示所有tag
        consumer.subscribe("topic_luke","*");
    
        consumer.setMessageModel(MessageModel.CLUSTERING);
        // 注册回调实现类来处理从broker拉取回来的消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @SneakyThrows
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println(new String(msg.getBody()));
                }
                // 标记该消息已经被成功消费
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 启动消费者实例
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
    

    广播消费模式

    广播消费模式中把消息对一个Group下的各个Consumer实例都投递一遍。也就是说消息也会被 Group 中的每个Consumer都消费一次。实际上,是一个消费组下的每个消费者实例都获取到了topic下面的每个Message Queue去拉取消费。所以消息会投递到每个消费者实例。

    public static void main(String[] args) throws Exception {
        //实例化消息消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_luke");
        //指定nameserver地址
        consumer.setNamesrvAddr("127.0.0.1:9876");
        //订阅topic,"*"表示所有tag
        consumer.subscribe("topic_luke","*");
    
        consumer.setMessageModel(MessageModel.BROADCASTING);
        // 注册回调实现类来处理从broker拉取回来的消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @SneakyThrows
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println(new String(msg.getBody()));
                }
                // 标记该消息已经被成功消费
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 启动消费者实例
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
    

    作者:奔跑的毛球
    链接:https://juejin.cn/post/7134336877431586830
    来源:稀土掘金

    相关文章

      网友评论

        本文标题:RocketMQ普通消息实战演练

        本文链接:https://www.haomeiwen.com/subject/rprmnrtx.html