一、概要
消费者主要是去发送数据 常用的发送消息的方式有以下几种
-
同步发送
-
同步顺序发送
-
异步发送
-
异步顺序发送
二、前期准备
添加依赖
<pre spellcheck="false" class="md-fences md-end-block ty-contain-cm modeLoaded" lang="xml" cid="n15" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: normal; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 30px; width: inherit; color: rgb(184, 191, 198); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;"><dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.4</version>
</dependency></pre>
配置信息
<pre spellcheck="false" class="md-fences md-end-block ty-contain-cm modeLoaded" lang="yml" cid="n17" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: normal; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 30px; width: inherit; color: rgb(184, 191, 198); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">rocketmq:
支持配置多个name-server地址,采用;分隔即可
name-server: 127.0.0.1:9876
producer:
group: hello-rocketmq</pre>
[图片上传失败...(image-eb1f8-1598427722705)]
三、同步栗子
核心方法
<pre spellcheck="false" class="md-fences md-end-block ty-contain-cm modeLoaded" lang="java" cid="n21" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: normal; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 30px; width: inherit; color: rgb(184, 191, 198); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">// 普通发送消息
public SendResult syncSend(String destination, Object payload, long timeout, int delayLevel)
// 顺序发送消息
public SendResult syncSendOrderly(String destination, Message<?> message, String hashKey, long timeout)</pre>
参数说明
-
destination
目的地,语法格式topicName:tags
-
payload
消息体 可以是任意对象
-
timeout
超时时间
-
delayLevel
延迟基本
返回值
查看发送到RocketMQ的信息是否发送成功
发送同步消息
<pre spellcheck="false" class="md-fences md-end-block ty-contain-cm modeLoaded" lang="java" cid="n39" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: normal; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 30px; width: inherit; color: rgb(184, 191, 198); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">@Service
public class RocketMqServiceImpl implements RocketMqService {
@Resource
private RocketMQTemplate template;
public void sendBaseMsg() {
template.convertAndSend("base-topic", "Hello, World!",1000);
// 等价
template.send("base-topic", MessageBuilder.withPayload("用如家思想学编程").build());
}
}
</pre>
发送同步消息自定义对象
<pre spellcheck="false" class="md-fences md-end-block ty-contain-cm modeLoaded" lang="java" cid="n41" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: normal; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 30px; width: inherit; color: rgb(184, 191, 198); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">@Service
public class RocketMqServiceImpl implements RocketMqService {
@Resource
private RocketMQTemplate template;
/**
- 发送任意对象
- @param products
*/
public void sendBaseMsgObject(List<Product> products) {
// template.syncSend(baseTopic, MessageBuilder.withPayload(products).build());
SendResult sendResult = template.syncSend(baseTopic, products);
}
}</pre>
同步顺序发送消息
<pre spellcheck="false" class="md-fences md-end-block ty-contain-cm modeLoaded" lang="java" cid="n43" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: normal; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 30px; width: inherit; color: rgb(184, 191, 198); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">@Service
public class RocketMqServiceImpl implements RocketMqService {
@Resource
private RocketMQTemplate template;
@Value("${base-topic}")
private String baseTopic;
@Override
public void sendBaseMsgOrder(List<Product> products) {
template.syncSendOrderly(baseTopic, products, "123");
}
}</pre>
四、异步栗子
核心方法
指发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通讯方式。MQ 的异步发送,需要用户实现异步发送回调接口(SendCallback),在执行消息的异步发送时,应用不需要等待服务器响应即可直接返回,通过回调接口接收务器响应,并对服务器的响应结果进行处理。
<pre spellcheck="false" class="md-fences md-end-block ty-contain-cm modeLoaded" lang="java" cid="n47" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: normal; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 30px; width: inherit; color: rgb(184, 191, 198); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">asyncSend(String destination, Message<?> message, SendCallback sendCallback, long timeout,int delayLevel)
void asyncSendOrderly(String destination, Message<?> message, String hashKey, SendCallback sendCallback,long timeout)
</pre>
发送异步消息
<pre spellcheck="false" class="md-fences md-end-block ty-contain-cm modeLoaded" lang="java" cid="n49" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: normal; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 30px; width: inherit; color: rgb(184, 191, 198); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">@Service
public class RocketMqServiceImpl implements RocketMqService {
@Resource
private RocketMQTemplate template;
@Value("${base-topic}")
private String baseTopic;
public void asyncSendBaseMsgOrder(List<Product> products) {
template.asyncSendOrderly(baseTopic, products, "1234", new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
}
@Override
public void onException(Throwable e) {
}
});
}
}</pre>
五、其它消息
发送延迟消息
RocketMQ
目前只支持固定精度的定时消息。
延迟级别(18个等级)1到18分别对应1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
<pre spellcheck="false" class="md-fences mock-cm md-end-block" lang="java" cid="n54" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: pre-wrap; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 30px; width: inherit; color: rgb(184, 191, 198); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">@Service
public class RocketMqServiceImpl implements RocketMqService {
@Resource
private RocketMQTemplate template;
@Value("${base-topic}")
private String baseTopic;
@Override
public void sendBaseMsgOrder(List<Product> products) {
//表示延时10秒
template.syncSend(topic, products,1000,3);
}
}</pre>
单向(Oneway)发送
特点为只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。适用于某些耗时非常短,但对可靠性要求并不高的场景,如日志收集。
<pre spellcheck="false" class="md-fences mock-cm md-end-block" lang="" cid="n57" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: pre-wrap; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 30px; width: inherit; color: rgb(184, 191, 198); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">@Service
public class RocketMqServiceImpl implements RocketMqService {
@Resource
private RocketMQTemplate template;
@Value("${base-topic}")
private String baseTopic;
@Override
public void sendBaseMsgOrder(List<Product> products) {
//表示延时10秒
template.sendOneWay(topic, products);
}
}</pre>
六、消配置说明
namesrvAddr
NameServer地址,可以配置多个,用逗号分隔;
brokerClusterName
所属集群名称,如果节点较多可以配置多个
brokerName
broker名称,master和slave使用相同的名称,表明他们的主从关系
brokerId
0表示Master,大于0表示不同的slave
deleteWhen
表示几点做消息删除动作,默认是凌晨4点
fileReservedTime
在磁盘上保留消息的时长,单位是小时
brokerRole
同步和异步表示Master和Slave之间同步数据的机制;有三个值:
-
SYNC_MASTER
-
ASYNC_MASTER
-
SLAVE
flushDiskType
刷盘策略,取值为:
-
ASYNC_FLUSH SYNC_FLUSH表示同步刷盘和异步刷盘;
-
SYNC_FLUSH 消息写入磁盘后才返回成功状态,ASYNC_FLUSH不需要;
listenPort
启动监听的端口号
storePathRootDir
存储消息的根目录
网友评论