架构图
导包
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>${rocketmq.version}</version>
</dependency>
生产者常用方法
方法名 | 说明 |
---|---|
setRetryTimesWhenSendFailed | 同步方式发送消息重试次数,默认为2 |
setRetryTimesWhenSendAsyncFailed | 异步方式发送消息重试次数,默认为2 |
setSendMsgTimeout | 发送消息默认超时时间,默认3000ms |
setMaxMessageSize | 允许发送的最大消息长度,默认为4M |
setCompressMsgBodyOverHowmuch | 消息体超过该值则启用压缩,默认4k |
setRetryAnotherBrokerWhenNotStoreOK | 消息重试时选择另外一个Broker时 |
setNamesrvAddr | 设置NameServer的地址 |
send | 发送消息,可以指定回调函数,同步异步 |
sendOneway | 单向发送消息,不等待broker响应 |
shutdown | 关闭当前生产者实例并释放相关资源 |
start | 启动生产者 |
viewMessage | 根据给定的msgId查询消息,还可指定topic |
queryMessage | 按关键字查询消息 |
消费者常用方法
方法名 | 说明 |
---|---|
setNamesrvAddr | 设置NameServer的地址 |
setMessageModel | 设置消息消费模式(默认集群消费) |
setConsumeThreadMin | 消费者最小线程数量(默认20) |
setConsumeThreadMax | 消费者最大线程数量(默认20) |
setPullInterval | 推模式下任务间隔时间(推模式也是基于不断的轮训拉取的封装) |
setPullBatchSize | 推模式下任务拉取的条数,默认32条(一批批拉) |
setMaxReconsumeTimes | 消息重试次数,-1代表16次 (超过 次数成为死信消息) |
setConsumeTimeout | 消息消费超时时间(消息可能阻塞正在使用的线程的最大时间:以分钟为单位) |
普通消息
生产者思路:
- 启动生产者
- 发送消息
- 关闭生产者
消费者思路:
- 注册消费的主题
- 启动消费者
- 关闭消费者
生产者实现:
public class ProducerUtil {
private static DefaultMQProducer producer = null;
public static void start() {
producer = new DefaultMQProducer("defaultGroup");
producer.setNamesrvAddr("IP:9876");
producer.setRetryTimesWhenSendFailed(3);
try {
producer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
}
public static ResultModel<String> send(String topic, String tags, String content) {
Message msg = new Message(topic, tags, "", content.getBytes());
try {
producer.send(msg);
return new ResultModel<>();
} catch (Exception e) {
e.printStackTrace();
}
return new ResultModel<>();
}
public static void shutDownProducer() {
if(producer != null) {
producer.shutdown();
}
}
}
消费者实现:
@Log
@Service
public class ConsumerService {
private DefaultMQPushConsumer consumer = null;
@PostConstruct
public void initMQConsumer() {
consumer = new DefaultMQPushConsumer("defaultGroup");
consumer.setNamesrvAddr("IP:9876");
consumer.setConsumeTimeout(10000);
try {
consumer.subscribe("demo", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
log.info("Message Received: " + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
}
@PreDestroy
public void shutDownConsumer() {
if (consumer != null) {
consumer.shutdown();
}
}
}
调用工具类测试:
顺序消息
顺序消息设计思路:
顺序消息就是指的,按顺序存入,按顺序取出。实现思路就是将同一类消息按照消费顺序放到同一种队列中去,这样就可以保证消费的先后顺序。
生产者实现:
public class ProducerUtil {
private static DefaultMQProducer producer = null;
public static void start() {
producer = new DefaultMQProducer("defaultGroup");
producer.setNamesrvAddr("49.235.73.14:9876");
producer.setRetryTimesWhenSendFailed(3);
try {
producer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
}
public static ResultModel<String> send(String topic) {
List<Treat> treatList = new ProducerUtil().buildList();
// tags数组 使用tag区分
String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD"};
for (int i = 0; i < treatList.size(); i++) {
String body = " treat:" + treatList.get(i);
Message msg = new Message(topic, tags[i % tags.length], "KEY" + i, body.getBytes());
try {
producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
/**
* 此处代码逻辑:就是找到消息对应的队列,一类消息一种队列
* 一种医疗治疗过程对应一种队列
* 20220819001L
* 20220819002L
* 20220819003L
* 20220819004L
*/
String dbId = (String) arg;
String indexStr = dbId.substring(dbId.length() - 1);
return mqs.get(Integer.parseInt(indexStr) - 1);
}
}, treatList.get(i).getDbId());
} catch (Exception e) {
e.printStackTrace();
}
}
return new ResultModel<>();
}
public static void shutDownProducer() {
if(producer != null) {
producer.shutdown();
}
}
/**
* 医疗过程
*/
static class Treat {
private String dbId;
private String desc;
public Treat() {
}
public Treat(String dbId, String desc) {
this.dbId = dbId;
this.desc = desc;
}
public String getDbId() {
return dbId;
}
public void setDbId(String dbId) {
this.dbId = dbId;
}
public String getDesc() {
return desc;
}
public void setDesc(String desc) {
this.desc = desc;
}
@Override
public String toString() {
return "Treat{" +
"dbId='" + dbId + '\'' +
", desc='" + desc + '\'' +
'}';
}
}
/**
* 生成模拟订单数据
*/
private List<Treat> buildList() {
List<Treat> treatList = new ArrayList<>();
treatList.add(new Treat("20220819004", "慢病检查"));
treatList.add(new Treat("20220819001", "孕产妇产前检查"));
treatList.add(new Treat("20220819003", "老人身体检查"));
treatList.add(new Treat("20220819001", "孕产妇产中处理"));
treatList.add(new Treat("20220819002", "婴儿身体检查"));
treatList.add(new Treat("20220819001", "孕产妇产后护理"));
treatList.add(new Treat("20220819004", "慢病治疗"));
treatList.add(new Treat("20220819003", "老人专家会诊"));
treatList.add(new Treat("20220819002", "婴儿产房护理"));
treatList.add(new Treat("20220819003", "老人定期随访"));
treatList.add(new Treat("20220819002", "婴儿定期随访"));
treatList.add(new Treat("20220819004", "慢病随访"));
treatList.add(new Treat("20220819004", "慢病观察"));
return treatList;
}
}
测试:
延时消息
延时消息指的是间隔一段时间后传给消费者。
setDelayTimeLevel(4);方法:
level(1~18个等级):1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
public static ResultModel<String> send() {
List<Treat> treatList = new ProducerUtil().buildList();
// tags数组 使用tag区分
String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD"};
for (int i = 0; i < treatList.size(); i++) {
String body = " treat:" + treatList.get(i);
Message msg = new Message(TOPIC, tags[i % tags.length], "KEY" + i, body.getBytes());
// 第四个等级 30s
msg.setDelayTimeLevel(4);
try {
producer.send(msg);
} catch (Exception e) {
e.printStackTrace();
}
}
return new ResultModel<>();
}
批量消息
批量消息就是一次性传入多个Message。
public static ResultModel<String> send() {
List<Treat> treatList = new ProducerUtil().buildList();
// tags数组 使用tag区分
String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD"};
List<Message> msgList = new ArrayList<>();
for (int i = 0; i < treatList.size(); i++) {
String body = " treat:" + treatList.get(i);
Message msg = new Message(TOPIC, tags[i % tags.length], "KEY" + i, body.getBytes());
msgList.add(msg);
}
try {
producer.send(msgList);
} catch (Exception e) {
e.printStackTrace();
}
return new ResultModel<>();
}
过滤消息
通过Tag标签来过滤不同消息,在消费者端实现。
@PostConstruct
public void initMQConsumer() {
consumer = new DefaultMQPushConsumer(PRODUCER_GROUP);
consumer.setNamesrvAddr(NAMESRV_ADDR);
consumer.setConsumeTimeout(10000);
try {
consumer.subscribe(TOPIC, "TagC");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
for (MessageExt msg : msgs) {
log.info("QueueId Received: " + msg.getQueueId() + " Message Received: " + new String(msg.getBody()));
}
} catch (Exception e) {
log.info(e.getMessage());
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
}
事务消息
https://www.jianshu.com/p/ed5c3c62a3c1
rocketmq顺序消息解决方案
将消息按照顺序放在同一个队列中就可以了。
rocketmq消息重复消费问题(幂等性问题)解决方案
- 业务方式:消费者业务逻辑处理,譬如消息要我们新增一条记录,那么我们业务逻辑处理就是先去查询,如果该记录已存在,那么我们就不去新增记录。
- 数据库方式:利用记录的unique索引唯一性,不让记录插入。
- mq方式:利用日志表去记录每次成功消费的id,消费前先判断下日志表中是否已经消费过(还未尝试使用过)。
rocketmq消息丢失问题解决方案
使用rocketmq提供的自身事务。
参考
https://gitee.com/apache/rocketmq/blob/master/docs/cn/client/java/API_Reference_DefaultMQProducer.md
https://blog.csdn.net/weixin_38880770/article/details/118447350
网友评论