最简单的demo,编写一个消息监听和消息发送。namesrv,broker,producer,consuer都是一个。本文的意思在于初学者可以根据文章的代码,操作复制出一个入门例子出来。
- producer。 该节点是用于发送消息。
- consumer。该节点用于接受发送的消息。
- namesrv。 rocketmq的生产者和消费者都不会记录broker的实际地址,所以broker的地址会放在namesrv节点。broker启动的时候,把自己的地址写进namesrv,producer和consumer启动的时候会从namesrv中读取broker的地址。
- broker。 该节点主要是接受生产者的消息,然后发送给消费者,并且还会存储记录消息。
一,下载
http://mirrors.tuna.tsinghua.edu.cn/apache/rocketmq/4.4.0/rocketmq-all-4.4.0-bin-release.zip
二,启动rocketmq服务
- 2.1 先启动nameserver
>>bin/nameserver
- 2.2 启动broker
>>bin/mqbroker -n localhost:9876
>>nohup sh mqbroker -n 127.0.0.1:9876 autoCreateTopicEnable=true > broker_log.log 2>&1 &
三,编写简单java代码
- 3.1 maven 依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>
- 3.2 java代码生产者和消费者
public class ConsumerMain {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("topicName","*");
consumer.registerMessageListener(new MessageListenerConcurrently(){
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
System.out.println(list);
System.out.println(consumeConcurrentlyContext);
return null;
}
});
consumer.start();
System.out.println("消费者启动");
}
}
public class ProducerMain {
public static void main( String[] args ) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("ConsumerGroup");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
producer.setSendMsgTimeout(30000);
for (int i = 0; i < 50000000; i++) {
Message msg = new Message("topicName" ,("Hello_RocketMQ " + i).getBytes("UTF-8"));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
Thread.sleep(3000);
}
System.out.println("生产者发送了");
producer.shutdown();
}
}
网友评论