美文网首页
rocketmq入门的demo

rocketmq入门的demo

作者: xhrg | 来源:发表于2020-03-31 19:22 被阅读0次

    最简单的demo,编写一个消息监听和消息发送。namesrv,broker,producer,consuer都是一个。本文的意思在于初学者可以根据文章的代码,操作复制出一个入门例子出来。

    • producer。 该节点是用于发送消息。
    • consumer。该节点用于接受发送的消息。
    • namesrv。 rocketmq的生产者和消费者都不会记录broker的实际地址,所以broker的地址会放在namesrv节点。broker启动的时候,把自己的地址写进namesrv,producer和consumer启动的时候会从namesrv中读取broker的地址。
    • broker。 该节点主要是接受生产者的消息,然后发送给消费者,并且还会存储记录消息。

    一,下载

    http://mirrors.tuna.tsinghua.edu.cn/apache/rocketmq/4.4.0/rocketmq-all-4.4.0-bin-release.zip

    二,启动rocketmq服务

    • 2.1 先启动nameserver
    >>bin/nameserver
    
    • 2.2 启动broker
    >>bin/mqbroker -n localhost:9876 
    >>nohup sh mqbroker -n 127.0.0.1:9876 autoCreateTopicEnable=true > broker_log.log 2>&1 &
    

    三,编写简单java代码

    • 3.1 maven 依赖
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>4.4.0</version>
    </dependency>
    
    • 3.2 java代码生产者和消费者
    public class ConsumerMain {
        public static void main(String[] args) throws  Exception {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup");
            consumer.setNamesrvAddr("127.0.0.1:9876");
            consumer.subscribe("topicName","*");
            consumer.registerMessageListener(new MessageListenerConcurrently(){
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                    System.out.println(list);
                    System.out.println(consumeConcurrentlyContext);
                    return null;
                }
            });
            consumer.start();
            System.out.println("消费者启动");
        }
    }
    
    public class ProducerMain {
        public static void main( String[] args ) throws     Exception {
             DefaultMQProducer producer = new DefaultMQProducer("ConsumerGroup");
             producer.setNamesrvAddr("127.0.0.1:9876");
             producer.start();
             producer.setSendMsgTimeout(30000);
             for (int i = 0; i < 50000000; i++) {
                 Message msg = new Message("topicName" ,("Hello_RocketMQ " + i).getBytes("UTF-8"));
                 SendResult sendResult = producer.send(msg);
                 System.out.printf("%s%n", sendResult);
                 Thread.sleep(3000);
             }
             System.out.println("生产者发送了");
             producer.shutdown();
        }
    }
    

    相关文章

      网友评论

          本文标题:rocketmq入门的demo

          本文链接:https://www.haomeiwen.com/subject/rqyguhtx.html