美文网首页
2022-03-17_rocketMQ入门学习笔记

2022-03-17_rocketMQ入门学习笔记

作者: kikop | 来源:发表于2022-03-17 20:19 被阅读0次

20220317_rocketMQ入门学习笔记

1概述

1.1涉及内容

基于SpringBoot,主要是对mq进行基本操作的学习,并进行模块化划分,便于业务系统的拓展,涉及如下内容:

  1. 公共模块的抽取(mycommon-rocketmq),提供mq消息的生产、消费
  2. 业务模块的调用(myunreadserver)

1.2依赖的mq版本

        <!--2.rocketmq-->
        <!--v2.0.4对应 v4.5.2-->
        <!--v2.2.1对应 v4.9-->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.2.1</version>
        </dependency>

2代码示例

2.1公共模块

[图片上传失败...(image-106e3f-1647519560464)]


image-20220317200648041.png

2.1.1ConsumerUtils

package com.kikop.utils;

import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.consumer.MessageQueueListener;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
 * @author kikop
 * @version 1.0
 * @project mycommon-rocketmq
 * @file ConsumerUtils
 * @desc
 * @date 2022/3/16
 * @time 9:30
 * @by IDE IntelliJ IDEA
 */
@Component
public class ConsumerUtils {

    // rocketMQTemplate 底层会自动去取
    // 容器中的 producer、consumer
    @Autowired
    public RocketMQTemplate rocketMQTemplate;

    /**
     * 接收同步可靠消息
     *
     * @param rocketMQListener
     * @param pollTimeoutMills 如果有数据的化,在范围内,会一直poll的,注意,好设计,空闲时,每隔多长时间从队列里面去消费数据
     * @throws MQClientException
     */
    public void getMessage(RocketMQListener<List<MessageExt>> rocketMQListener, long pollTimeoutMills) throws MQClientException {

        // 启动线程进行消费,否则会阻塞其他 Bean的创建
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    DefaultLitePullConsumer defaultLitePullConsumer = rocketMQTemplate.getConsumer();
                    while (defaultLitePullConsumer.isRunning()) {

                        // poll:不阻塞
                        // poll by timeout:可中断,释放cpu时间片,到指定的超时时间会被唤醒
                        // 此方法从此LinkedBlockingQueue的头部检索并删除元素,如果在元素可用之前经过了指定的等待时间,则为null。
                        List<MessageExt> messageExts = defaultLitePullConsumer.poll(pollTimeoutMills);

                        // 1.业务回调内容
                        rocketMQListener.onMessage(messageExts);
                        // 2.消费完手动提交(默认就是)
                        // defaultLitePullConsumer.commitSync();
                    }
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }).start();
    }
}

2.1.2ProducerUtils

package com.kikop.utils;

import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @author kikop
 * @version 1.0
 * @project mycommon-rocketmq
 * @file ProducerUtils
 * @desc
 * @date 2022/3/16
 * @time 9:30
 * @by IDE IntelliJ IDEA
 */
@Component
public class ProducerUtils {

    // rocketMQTemplate 底层会自动去取
    // 容器中的 producer、consumer
    @Autowired
    public RocketMQTemplate rocketMQTemplate;

    /**
     * 发送同步可靠消息
     *
     * @param topic
     * @param strMsg
     * @return
     */
    public SendResult syncSend(String topic, String strMsg) {

        /**
         * 发送可靠同步消息 ,可以拿到SendResult 返回数据
         * 同步发送是指消息发送出去后,会在收到mq发出响应之后才会发送下一个数据包的通讯方式。
         * 这种方式应用场景非常广泛,例如重要的右键通知、报名短信通知、营销短信等。
         *
         * 参数1: topic:tag
         * 参数2:  消息体 可以为一个对象
         * 参数3: 超时时间 毫秒
         */
        SendResult result = rocketMQTemplate.syncSend(topic, strMsg, 10000);
        return result;
    }
}

2.1.3.spring.factories

# 扩展点:外部业务系统为 SpringBoot工程时,实现 Bean 自动注入
# 特别是命名空间不一致时
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
  com.kikop.config.MyRocketMQConfig

2.2业务模块

[图片上传失败...(image-c90504-1647519560465)]


image-20220317200710011.png

2.2.1MyConsumerConfig

package com.kikop.config;


import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.Assert;

/**
 * @author kikop
 * @version 1.0
 * @project myunreadserver
 * @file MyConsumerConfig
 * @desc
 * @date 2022/3/16
 * @time 9:30
 * @by IDE IntelliJ IDEA
 */
@Configuration
public class MyConsumerConfig {


    /**
     * RocketMQ配置文件
     */
    @Autowired
    public RocketMQProperties rocketMQProperties;

    public DefaultLitePullConsumer defaultLitePullConsumer(RocketMQProperties rocketMQProperties) throws MQClientException {


        RocketMQProperties.Consumer consumerConfig = rocketMQProperties.getConsumer();
        String nameServer = rocketMQProperties.getNameServer();
        String groupName = consumerConfig.getGroup();

        // 指定消费的 topic 名称
        String topicName = consumerConfig.getTopic();
        Assert.hasText(nameServer, "[rocketmq.name-server] must not be null");
        Assert.hasText(groupName, "[rocketmq.consumer.group] must not be null");
        Assert.hasText(topicName, "[rocketmq.consumer.topic] must not be null");
        String accessChannel = rocketMQProperties.getAccessChannel();
        MessageModel messageModel = MessageModel.valueOf(consumerConfig.getMessageModel());

        // String转换成需要的枚举,设计思路
        SelectorType selectorType = SelectorType.valueOf(consumerConfig.getSelectorType());
        String selectorExpression = consumerConfig.getSelectorExpression();
        String ak = consumerConfig.getAccessKey();
        String sk = consumerConfig.getSecretKey();
        int pullBatchSize = consumerConfig.getPullBatchSize();
        DefaultLitePullConsumer litePullConsumer = RocketMQUtil.createDefaultLitePullConsumer(nameServer, accessChannel, groupName, topicName, messageModel, selectorType, selectorExpression, ak, sk, pullBatchSize);
        litePullConsumer.setEnableMsgTrace(consumerConfig.isEnableMsgTrace());
        litePullConsumer.setCustomizedTraceTopic(consumerConfig.getCustomizedTraceTopic());

        // 指定消费位置
        // 选择从 queue 头部开始 pull 还是从尾部开始 pull
        // litePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        litePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        return litePullConsumer;
    }

}

2.2.2MyProducerConfig

package com.kikop.config;


import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

/**
 * @author kikop
 * @version 1.0
 * @project myunreadserver
 * @file MyProducerConfig
 * @desc
 * @date 2022/3/16
 * @time 9:30
 * @by IDE IntelliJ IDEA
 */
@Configuration
public class MyProducerConfig {

//    @Value("${myunread.topic}")
//    public String topic;
//
//    @Value("${myunread.producer.group}")
//    public String strProducerGroup;


    /**
     * RocketMQ配置文件
     */
    @Autowired
    public RocketMQProperties rocketMQProperties;


    @Bean
    DefaultMQProducer defaultMQProducer() {
        RocketMQProperties.Producer producerConfig = rocketMQProperties.getProducer();
        String nameServer = rocketMQProperties.getNameServer();
        String groupName = producerConfig.getGroup();
        Assert.hasText(nameServer, "[rocketmq.name-server] must not be null");
        Assert.hasText(groupName, "[rocketmq.producer.group] must not be null");
        String accessChannel = rocketMQProperties.getAccessChannel();
        String ak = rocketMQProperties.getProducer().getAccessKey();
        String sk = rocketMQProperties.getProducer().getSecretKey();
        boolean isEnableMsgTrace = rocketMQProperties.getProducer().isEnableMsgTrace();
        String customizedTraceTopic = rocketMQProperties.getProducer().getCustomizedTraceTopic();
        DefaultMQProducer producer = RocketMQUtil.createDefaultMQProducer(groupName, ak, sk, isEnableMsgTrace, customizedTraceTopic);


        producer.setNamesrvAddr(nameServer);
        if (!StringUtils.isEmpty(accessChannel)) {
            producer.setAccessChannel(AccessChannel.valueOf(accessChannel));
        }

        producer.setSendMsgTimeout(producerConfig.getSendMessageTimeout());
        producer.setRetryTimesWhenSendFailed(producerConfig.getRetryTimesWhenSendFailed());
        producer.setRetryTimesWhenSendAsyncFailed(producerConfig.getRetryTimesWhenSendAsyncFailed());
        producer.setMaxMessageSize(producerConfig.getMaxMessageSize());
        producer.setCompressMsgBodyOverHowmuch(producerConfig.getCompressMessageBodyThreshold());
        producer.setRetryAnotherBrokerWhenNotStoreOK(producerConfig.isRetryNextServer());

        // 默认的topic: TBW102,自动创建的topci会继承该衣钵, 否则 No route info of this topic: ur-unread-topic
        // producer.setCreateTopicKey(rocketMQProperties.getConsumer().getTopic()
        //          + ":" + rocketMQProperties.getConsumer().getSelectorExpression());

        // 官方生产端无法预先指定topic todo
        return producer;
    }
}

2.2.3MyRocketMQListener

package com.kikop.mycomponent.custom;

import com.kikop.utils.DateUtils;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.core.RocketMQListener;

import java.util.Date;
import java.util.List;

/**
 * @author kikop
 * @version 1.0
 * @project myunreadserver
 * @file MyRocketMQListener
 * @desc 自定义业务处理消费端监听接口
 * @date 2022/3/16
 * @time 9:30
 * @by IDE IntelliJ IDEA
 */
public class MyRocketMQListener implements RocketMQListener<List<MessageExt>> {

    /**
     * 接收mq消息
     *
     * @param messageExts
     */
    @Override
    public void onMessage(List<MessageExt> messageExts) { // 默认周期:5秒一次
        if (messageExts.size() >= 1) {
            System.out.println(String.format("%s 收到数据,数据量: %d", DateUtils.getTime(), messageExts.size()));
        } else {
            System.out.println(String.format("%s 没有可消费的数据", DateUtils.getTime()));
        }

    }
}

2.2.4ConsumerUnReadServiceByCommon

package com.kikop.mycomponent.custom;


import com.kikop.utils.ConsumerUtils;
import org.apache.rocketmq.client.exception.MQClientException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;


/**
 * @author kikop
 * @version 1.0
 * @project myunreadserver
 * @file ConsumerUnReadServiceByCommon
 * @desc
 * @date 2022/3/16
 * @time 9:30
 * @by IDE IntelliJ IDEA
 */
@Component
public class ConsumerUnReadServiceByCommon {


    /**
     * 如果有数据的化,在范围内,会一直poll的
     * 注意,好设计,空闲时,每隔多长时间从队列里面去消费数据
     */
    @Value("${myunread.consumer.polltimeoutmills}")
    private long pollTimeOutMills;

    @Autowired
    public ConsumerUtils consumerUtils;

    @PostConstruct
    public void regisConsumerData() throws MQClientException {
        consumerUtils.getMessage(new MyRocketMQListener(),pollTimeOutMills);
    }
}

2.2.5UnReadService

package com.kikop.service;


/**
 * @author kikop
 * @version 1.0
 * @project myunreadserver
 * @file UnReadService
 * @desc
 * @date 2022/3/16
 * @time 9:30
 * @by IDE IntelliJ IDEA
 */
public interface UnReadService {

    void syncSendMsg(String strMsg);
}

2.2.6UnReadServiceImpl

package com.kikop.service.impl;

import com.kikop.service.UnReadService;
import com.kikop.utils.DateUtils;
import com.kikop.utils.ProducerUtils;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/**
 * @author kikop
 * @version 1.0
 * @project myunreadserver
 * @file UnReadServiceImpl
 * @desc
 * @date 2022/3/16
 * @time 9:30
 * @by IDE IntelliJ IDEA
 */
@Component
public class UnReadServiceImpl implements UnReadService {

//    @Value("${myunread.topic}")
//    public String topic;
//
//    @Value("${myunread.selectorExpression}")
//    public String tag;

    @Autowired
    public RocketMQProperties rocketMQProperties;

    @Autowired
    public ProducerUtils producerUtil;

    /**
     * 指定需要些的topic(或+tag)
     *
     * @param strMsg
     */
    @Override
    public void syncSendMsg(String strMsg) {

        System.out.println(String.format("%s start syncSendMsg", DateUtils.getTime()));

        SendResult sendResult = producerUtil.syncSend(rocketMQProperties.getConsumer().getTopic()
                + ":" + rocketMQProperties.getConsumer().getSelectorExpression(), strMsg);
        System.out.println(String.format("%s getMsgId:%s", DateUtils.getTime(), sendResult.getMsgId()));

        System.out.println(String.format("%s end syncSendMsg", DateUtils.getTime()));

    }
}

2.2.7配置

2.2.7.1application.yml

spring:
# 1.指定环境:开发v
 profiles:
  active: dev

2.2.7.2application-dev.properties

server.port=9999
server.servlet.context-path=/myunreadserver
rocketmq.name-server=127.0.0.1:9876
# 默认生产端 config by RocketTemplate
rocketmq.producer.group=ur-producer-group4
rocketmq.producer.send-message-timeout=3000
# 默认消费端 config by RocketTemplate
rocketmq.consumer.group=ur-consumer-group4
rocketmq.consumer.topic=ur-unread-topic4
rocketmq.consumer.selector-type=TAG
rocketmq.consumer.selector-expression=order4
## 自定义主题
#myunread.topic=ur-unread-topic
## 自定义参数生产
#myunread.producer.group=ur-producer-group
## 自定义消费端
#myunread.consumer.group=ur-consumer-group
#myunread.consumer.group2=ur-consumer-group2
#myunread.selectorType=SelectorType.TAG
#myunread.selectorExpression=ur
# 如果有数据的化,在范围内,会一直poll的
# 注意,好设计,空闲时,每隔多长时间从队列里面去消费数据
myunread.consumer.polltimeoutmills=5000

2.3测试

package com.kikop;

import com.kikop.service.UnReadService;
import com.kikop.utils.spring.SpringUtils;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.stereotype.Controller;

import java.util.concurrent.TimeUnit;


/**
 * @author kikop
 * @version 1.0
 * @project myunreadserver
 * @file MyUnreadServerApplication
 * @desc
 * @date 2022/3/17
 * @time 16:30
 * @by IDE IntelliJ IDEA
 */
@SpringBootApplication
@Controller
// 注解扫描多个包下示例,内嵌包中有@Component注解,需开启如下内容
//@ComponentScan({"com.kikopxxx", "com.kikop"})
public class MyUnreadServerApplication implements CommandLineRunner {


    public static void main(String[] args) {
        ConfigurableApplicationContext configurableApplicationContext =
                SpringApplication.run(MyUnreadServerApplication.class, args);

    }

    @Override
    public void run(String... args) throws Exception {

        // 1.发送带tag的同步消息
        UnReadService unReadService = SpringUtils.getBean(UnReadService.class);
        for (int i = 0; i < 3; i++) {
            unReadService.syncSendMsg(String.format("你好,我的名字叫大海:%d!", i));
            TimeUnit.SECONDS.sleep(3);
        }
    }
}
2022-03-17 20:16:13 start syncSendMsg
2022-03-17 20:16:13 getMsgId:7F00000106B414DAD5DC56BF3D620000
2022-03-17 20:16:13 end syncSendMsg
2022-03-17 20:16:13 收到数据,数据量: 1

3其他

3.1Producer配置createTopicKey功能

[图片上传失败...(image-92ee4c-1647519560465)]

image-20220317172607790.png

[图片上传失败...(image-185324-1647519560465)]


image-20220317172706496.png

作为发送消息所在的topic的路由。

参数名:createTopicKey
默认值:TBW102
作用:在发送消息时,自动创建Broker服务器不存在的topic,需要指定Key,该Key可用于配置发送消息所在topic的默认路由。

3.2DefaultMQProducerImpl代码分析

E:\workdirectory\mavenRep\LocalMavenRep\org\apache\rocketmq\rocketmq-client\4.9.1\rocketmq-client-4.9.1-sources.jar!\org\apache\rocketmq\client\impl\producer\DefaultMQProducerImpl.java
private SendResult sendDefaultImpl(
    // 获取topic的默认路由,key:默认TBW102
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
        if (topicPublishInfo != null && topicPublishInfo.ok()) {
    public boolean ok() {
        return null != this.messageQueueList && !this.messageQueueList.isEmpty();
    }

参考

相关文章

网友评论

      本文标题:2022-03-17_rocketMQ入门学习笔记

      本文链接:https://www.haomeiwen.com/subject/gahbdrtx.html