参考:https://www.cnblogs.com/kiwifly/p/11546008.html
前提:安装了docker
启动nameserver
创建nameserver的日志和数据存放路径
/home/rocketmq/namesrv/logs
/home/rocketmq/namesrv/store
启动命令
docker run -d --name rmqnamesrv -p 9876:9876
-v /home/rocketmq/namesrv/logs:/opt/logs
-v /home/rocketmq/namesrv/store:/opt/store
rocketmqinc/rocketmq
sh mqnamesrv
启动broker
创建broker的日志和数据存放的路径以及配置
/home/rocketmq/broker/logs
/home/rocketmq/broker/store
/home/rocketmq/broker/conf/broker.conf
在broker.conf中写入配置
terName=DefaultCluster
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
brokerIP1= 我的服务器外网IP
namesrvAddr=我的服务器外网IP:9876
启动命令
docker run -d --name rmqbroker -p 10911:10911 -p 10909:10909
-v /home/rocketmq/broker/logs:/root/logs
-v /home/rocketmq/broker/store:/root/store
-v /home/rocketmq/broker/conf/broker.conf:/opt/rocketmq/conf/broker.conf
--link rmqnamesrv:namesrv
-e "NAMESRV_ADDR=namesrv:9876"
rocketmqinc/rocketmq
sh mqbroker -c /opt/rocketmq/conf/broker.conf
启动rocketmq的控制台
docker run -d -e "JAVA_OPTS=-Drocketmq.config.namesrvAddr=我的服务器IP:9876 -Drocketmq.config.isVIPChannel=false" -p 8082:8080 -t styletang/rocketmq-console-ng
打开浏览器看看

Java入门测试
maven依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.1.0-incubating</version>
</dependency>
生产者
public class Producer {
public static void main(String[] args) {
DefaultMQProducer producer = new DefaultMQProducer("Producer");
producer.setNamesrvAddr("我的服务器ip:9876");
try {
producer.start();
Message msg = new Message("test-topic","tag","1"."Just for test.".getBytes());
SendResult result = producer.send(msg);
System.out.println("id:" + result.getMsgId() + " result:" + result.getSendStatus());
} catch (Exception e) {
e.printStackTrace();
}finally{
producer.shutdown();
}
}
}
消费者
public class Consumer {
public static void main(String[] args) {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushConsumer");
consumer.setNamesrvAddr("我的服务器ip:9876");
try {
consumer.subscribe("test-topic", "tag");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext Context) {
Message msg = list.get(0);
String topic = msg.getTopic();
System.out.println("topic = " + topic);
byte[] body = msg.getBody();
System.out.println("body: " + new String(body));
String keys = msg.getKeys();
System.out.println("keys = " + keys);
String tags = msg.getTags();
System.out.println("tags = " + tags);
System.out.println("-----------------------------------------------");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
);
consumer.start();
} catch (Exception e) {
e.printStackTrace();
}
}
}
网友评论