美文网首页
RocketMQ生产者

RocketMQ生产者

作者: 杨健kimyeung | 来源:发表于2020-08-26 15:42 被阅读0次

    官方文档

    一、概要

    消费者主要是去发送数据 常用的发送消息的方式有以下几种

    • 同步发送

    • 同步顺序发送

    • 异步发送

    • 异步顺序发送

    二、前期准备

    添加依赖

    <pre spellcheck="false" class="md-fences md-end-block ty-contain-cm modeLoaded" lang="xml" cid="n15" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: normal; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 30px; width: inherit; color: rgb(184, 191, 198); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;"><dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.0.4</version>
    </dependency></pre>

    配置信息

    <pre spellcheck="false" class="md-fences md-end-block ty-contain-cm modeLoaded" lang="yml" cid="n17" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: normal; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 30px; width: inherit; color: rgb(184, 191, 198); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">rocketmq:

    支持配置多个name-server地址,采用;分隔即可

    name-server: 127.0.0.1:9876
    producer:
    group: hello-rocketmq</pre>

    [图片上传失败...(image-eb1f8-1598427722705)]

    三、同步栗子

    核心方法

    <pre spellcheck="false" class="md-fences md-end-block ty-contain-cm modeLoaded" lang="java" cid="n21" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: normal; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 30px; width: inherit; color: rgb(184, 191, 198); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">// 普通发送消息
    public SendResult syncSend(String destination, Object payload, long timeout, int delayLevel)
    // 顺序发送消息
    public SendResult syncSendOrderly(String destination, Message<?> message, String hashKey, long timeout)</pre>

    参数说明

    • destination

      目的地,语法格式topicName:tags

    • payload

      消息体 可以是任意对象

    • timeout

      超时时间

    • delayLevel

      延迟基本

    返回值

    查看发送到RocketMQ的信息是否发送成功

    发送同步消息

    <pre spellcheck="false" class="md-fences md-end-block ty-contain-cm modeLoaded" lang="java" cid="n39" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: normal; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 30px; width: inherit; color: rgb(184, 191, 198); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">@Service
    public class RocketMqServiceImpl implements RocketMqService {
    @Resource
    private RocketMQTemplate template;

    public void sendBaseMsg() {
    template.convertAndSend("base-topic", "Hello, World!",1000);
    // 等价
    template.send("base-topic", MessageBuilder.withPayload("用如家思想学编程").build());
    }
    }
    ​</pre>

    发送同步消息自定义对象

    <pre spellcheck="false" class="md-fences md-end-block ty-contain-cm modeLoaded" lang="java" cid="n41" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: normal; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 30px; width: inherit; color: rgb(184, 191, 198); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">@Service
    public class RocketMqServiceImpl implements RocketMqService {
    @Resource
    private RocketMQTemplate template;

    /**

    • 发送任意对象
    • @param products
      */
      public void sendBaseMsgObject(List<Product> products) {
      // template.syncSend(baseTopic, MessageBuilder.withPayload(products).build());
      SendResult sendResult = template.syncSend(baseTopic, products);
      }
      }</pre>

    同步顺序发送消息

    <pre spellcheck="false" class="md-fences md-end-block ty-contain-cm modeLoaded" lang="java" cid="n43" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: normal; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 30px; width: inherit; color: rgb(184, 191, 198); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">@Service
    public class RocketMqServiceImpl implements RocketMqService {
    @Resource
    private RocketMQTemplate template;
    @Value("${base-topic}")
    private String baseTopic;
    @Override
    public void sendBaseMsgOrder(List<Product> products) {
    template.syncSendOrderly(baseTopic, products, "123");
    }
    }</pre>

    四、异步栗子

    核心方法

    指发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通讯方式。MQ 的异步发送,需要用户实现异步发送回调接口(SendCallback),在执行消息的异步发送时,应用不需要等待服务器响应即可直接返回,通过回调接口接收务器响应,并对服务器的响应结果进行处理。

    <pre spellcheck="false" class="md-fences md-end-block ty-contain-cm modeLoaded" lang="java" cid="n47" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: normal; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 30px; width: inherit; color: rgb(184, 191, 198); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">asyncSend(String destination, Message<?> message, SendCallback sendCallback, long timeout,int delayLevel)

    void asyncSendOrderly(String destination, Message<?> message, String hashKey, SendCallback sendCallback,long timeout)
    </pre>

    发送异步消息

    <pre spellcheck="false" class="md-fences md-end-block ty-contain-cm modeLoaded" lang="java" cid="n49" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: normal; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 30px; width: inherit; color: rgb(184, 191, 198); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">@Service
    public class RocketMqServiceImpl implements RocketMqService {
    @Resource
    private RocketMQTemplate template;
    @Value("${base-topic}")
    private String baseTopic;
    public void asyncSendBaseMsgOrder(List<Product> products) {
    template.asyncSendOrderly(baseTopic, products, "1234", new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
    }
    @Override
    public void onException(Throwable e) {
    }
    });

    }
    }</pre>

    五、其它消息

    发送延迟消息

    RocketMQ 目前只支持固定精度的定时消息。

    延迟级别(18个等级)1到18分别对应1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

    <pre spellcheck="false" class="md-fences mock-cm md-end-block" lang="java" cid="n54" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: pre-wrap; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 30px; width: inherit; color: rgb(184, 191, 198); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">@Service
    public class RocketMqServiceImpl implements RocketMqService {
    @Resource
    private RocketMQTemplate template;
    @Value("${base-topic}")
    private String baseTopic;
    @Override
    public void sendBaseMsgOrder(List<Product> products) {
    //表示延时10秒
    template.syncSend(topic, products,1000,3);
    }
    }</pre>

    单向(Oneway)发送

    特点为只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。适用于某些耗时非常短,但对可靠性要求并不高的场景,如日志收集。

    <pre spellcheck="false" class="md-fences mock-cm md-end-block" lang="" cid="n57" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: pre-wrap; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 30px; width: inherit; color: rgb(184, 191, 198); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">@Service
    public class RocketMqServiceImpl implements RocketMqService {
    @Resource
    private RocketMQTemplate template;
    @Value("${base-topic}")
    private String baseTopic;
    @Override
    public void sendBaseMsgOrder(List<Product> products) {
    //表示延时10秒
    template.sendOneWay(topic, products);
    }
    }</pre>

    六、消配置说明

    namesrvAddr

    NameServer地址,可以配置多个,用逗号分隔;

    brokerClusterName

    所属集群名称,如果节点较多可以配置多个

    brokerName

    broker名称,master和slave使用相同的名称,表明他们的主从关系

    brokerId

    0表示Master,大于0表示不同的slave

    deleteWhen

    表示几点做消息删除动作,默认是凌晨4点

    fileReservedTime

    在磁盘上保留消息的时长,单位是小时

    brokerRole

    同步和异步表示Master和Slave之间同步数据的机制;有三个值:

    • SYNC_MASTER

    • ASYNC_MASTER

    • SLAVE

    flushDiskType

    刷盘策略,取值为:

    • ASYNC_FLUSH SYNC_FLUSH表示同步刷盘和异步刷盘;

    • SYNC_FLUSH 消息写入磁盘后才返回成功状态,ASYNC_FLUSH不需要;

    listenPort

    启动监听的端口号

    storePathRootDir

    存储消息的根目录

    相关文章

      网友评论

          本文标题:RocketMQ生产者

          本文链接:https://www.haomeiwen.com/subject/gqffsktx.html