下载构建
// 下载地址
https://rocketmq.apache.org/dowloading/releases
# 我这里是下bin版本 就是二进制版本 只需要解压zip文件
cd 指定解压目录
# 启动 NameServer
nohup sh bin/mqnamesrv &
# 启动 日志
tail -f ~/logs/rocketmqlogs/namesrv.log
启动成功
1.png
# 启动 Broker
nohup sh bin/mqbroker -n localhost:9876 &
# 启动 日志
tail -f ~/logs/rocketmqlogs/broker.log
启动成功
2
sh bin/mqshutdown broker //停止 broker
sh bin/mqshutdown namesrv //停止 nameserver
Rocketmq管理后台
# 下载地址
https://github.com/apache/rocketmq
# 用idea 打开rocketmq-console
# 下载依赖
# 打开http://localhost:8080/
打开成功
1.png
编写生产者和消费者
写配置
# application.properties
# 消费者的组名
apache.rocketmq.consumer.PushConsumer=PushConsumer
# 同步生产者的组名
apache.rocketmq.producer.syncProducerGroup=syncProducerGroup
# 异步生产者的组名
apache.rocketmq.producer.asyncProducerGroup=asyncProducerGroup
# NameServer地址
apache.rocketmq.namesrvAddr=localhost:9876
生产者
@Component
public class RocketMQClient {
/**
* 生产者的组名
*/
@Value("${apache.rocketmq.producer.syncProducerGroup}")
private String syncProducerGroup;
/**
* 生产者的组名
*/
@Value("${apache.rocketmq.producer.asyncProducerGroup}")
private String asyncProducerGroup;
/**
* NameServer 地址
*/
@Value("${apache.rocketmq.namesrvAddr}")
private String namesrvAddr;
/**
* 同步发送
*/
@PostConstruct
public void SyncProducer() {
//生产者的组名
DefaultMQProducer producer = new DefaultMQProducer(syncProducerGroup);
//指定NameServer地址,多个地址以 ; 隔开
producer.setNamesrvAddr(namesrvAddr);
// 同步发送消息重试次数,默认为 2
producer.setRetryTimesWhenSendFailed(3);
try {
/**
* Producer对象在使用之前必须要调用start初始化,初始化一次即可
* 注意:切记不可以在每次发送消息时,都调用start方法
*/
producer.start();
//创建一个消息实例,包含 topic、tag 和 消息体
//如下:topic 为 "demo",tag 为 "push"
Message message = new Message("demo", "push", "发送消息----同步信息-----".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 目前RocketMQ只支持固定精度级别的定时消息,服务器按照1-N定义了如下级别:
// “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”;
// ;若要发送定时消息,在应用层初始化Message消息对象之后,
// 调用setDelayTimeLevel(int level)方法来设置延迟级别,按照序列取相应的延迟级别,例如level=2,则延迟为5s:
message.setDelayTimeLevel(2);
SendResult result = producer.send(message);
System.out.println("发送同步响应:MsgId:" + result.getMsgId() + ",发送状态:" + result.getSendStatus());
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.shutdown();
}
}
/**
* 异步发送
*/
@PostConstruct
public void AsyncProducer() {
//生产者的组名
DefaultMQProducer producer = new DefaultMQProducer(asyncProducerGroup);
//指定NameServer地址,多个地址以 ; 隔开
producer.setNamesrvAddr(namesrvAddr);
try {
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
Message msg = new Message("demo",
"push",
"发送消息----异步信息-----".getBytes(RemotingHelper.DEFAULT_CHARSET));
//重点在这里 异步发送回调
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("发送异步响应:MsgId:" + sendResult.getMsgId() + ",发送状态:" + sendResult.getSendStatus());
producer.shutdown();
}
@Override
public void onException(Throwable e) {
e.printStackTrace();
producer.shutdown();
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* onewag
*/
@PostConstruct
public void OnewayProducer() {
//生产者的组名
DefaultMQProducer producer = new DefaultMQProducer(syncProducerGroup);
//指定NameServer地址,多个地址以 ; 隔开
producer.setNamesrvAddr(namesrvAddr);
try {
/**
* Producer对象在使用之前必须要调用start初始化,初始化一次即可
* 注意:切记不可以在每次发送消息时,都调用start方法
*/
producer.start();
Message message = new Message("demo", "push", "发送消息----单向信息-----".getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.sendOneway(message);
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.shutdown();
}
}
}
# 消费者
@Component
public class RocketMQServer {
/**
* 消费者的组名
*/
@Value("${apache.rocketmq.consumer.PushConsumer}")
private String consumerGroup;
/**
* NameServer 地址
*/
@Value("${apache.rocketmq.namesrvAddr}")
private String namesrvAddr;
@PostConstruct
public void defaultMQPushConsumer() {
//消费者的组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
//指定NameServer地址,多个地址以 ; 隔开
consumer.setNamesrvAddr(namesrvAddr);
try {
//订阅PushTopic下Tag为push的消息
consumer.subscribe("demo", "push");
//设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
//如果非第一次启动,那么按照上次消费的位置继续消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.registerMessageListener((MessageListenerConcurrently) (list, context) -> {
try {
for (MessageExt messageExt : list) {
System.out.println("messageExt: " + messageExt);//输出消息内容
String messageBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.out.println("消费响应:msgId : " + messageExt.getMsgId() + ", msgBody : " + messageBody);//输出消息内容
}
} catch (Exception e) {
e.printStackTrace();
// 接收失败重试
if (list.get(0).getReconsumeTimes() == 3){
// 重试3次
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功
}else {
return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功
});
consumer.start();
} catch (Exception e) {
e.printStackTrace();
}
}
}
rocketmq信息
rocketmq信息
rocketmq管理后台发送主题
mq发送.png
rocketmq管理后台发送主题的状态
mq状态.png
idea 接受mq
mq接收.png
rocketmq管理后台查看管理后台
mq管理.png
docker安装rocketmq
网友评论