美文网首页IT必备技能
Spring Cloud Alibaba之消息中间件 - Roc

Spring Cloud Alibaba之消息中间件 - Roc

作者: 匆匆岁月 | 来源:发表于2019-10-28 18:03 被阅读0次

    MQ消息队列对比

    CentOS7上搭建RocketMQ

    环境要求:

    • CentOS 7.2
    • 64位JDK1.8+
    • 4G+的可用磁盘空间

    1、下载RocketMQ的二进制包,我这里使用的是4.5.1版本,下载地址如下:

    http://rocketmq.apache.org/release_notes/release-notes-4.5.1/

    使用wget命令下载:

    [root@study-01 ~]# cd /usr/local/src
    [root@study-01 /usr/local/src]# wget http://mirror.bit.edu.cn/apache/rocketmq/4.5.1/rocketmq-all-4.5.1-bin-release.zip
    

    2、解压下载好的压缩包,并移动到合适的目录下:

    [root@study-01 /usr/local/src]# unzip rocketmq-all-4.5.1-bin-release.zip
    [root@study-01 /usr/local/src]# mv rocketmq-all-4.5.1-bin-release /usr/local/rocketmq-4.5.1
    

    注:若没有安装unzip命令则使用如下命令安装:
    yum install -y unzip

    3、进入rocketmq的根目录并查看是否包含如下目录及文件:

    [root@study-01 /usr/local/src]# cd /usr/local/rocketmq-4.5.1
    [root@study-01 /usr/local/rocketmq-4.5.1]# ls
    benchmark  bin  conf  lib  LICENSE  NOTICE  README.md
    

    4、没问题后,使用如下命令启动Name Server:

    [root@study-01 /usr/local/rocketmq-4.5.1]# nohup sh bin/mqnamesrv &
    [1] 2448
    [root@study-01 /usr/local/rocketmq-4.5.1]# 
    

    5、查看默认的9876端口是否被监听,以验证Name Server是否启动成功:

    [root@study-01 /usr/local/rocketmq-4.5.1]# netstat -lntp |grep java
    tcp6       0      0 :::9876                 :::*                    LISTEN      2454/java           
    [root@study-01 /usr/local/rocketmq-4.5.1]#
    

    6、启动Broker:

    [root@study-01 /usr/local/rocketmq-4.5.1]# nohup sh bin/mqbroker -n localhost:9876 &
    [2] 2485
    [root@study-01 /usr/local/rocketmq-4.5.1]# 
    

    7、验证Broker是否启动成功,如果启动成功,能看到类似如下的日志::

    [root@study-01 /usr/local/rocketmq-4.5.1]# cat ~/logs/rocketmqlogs/broker.log |grep "boot success"
    2019-08-04 01:27:38 INFO main - The broker[study-01, 192.168.190.129:10911] boot success. serializeType=JSON and name server is localhost:9876
    [root@study-01 /usr/local/rocketmq-4.5.1]# 
    

    若想停止Name Server和Broker,则依次执行以下两条命令即可:

    [root@study-01 /usr/local/rocketmq-4.5.1]# sh bin/mqshutdown broker
    The mqbroker(2492) is running...
    Send shutdown request to mqbroker(2492) OK  # 输出该信息说明停止成功
    [root@study-01 /usr/local/rocketmq-4.5.1]# sh bin/mqshutdown namesrv
    The mqnamesrv(2454) is running...
    Send shutdown request to mqnamesrv(2454) OK  # 输出该信息说明停止成功
    [2]+  退出 143              nohup sh bin/mqbroker -n localhost:9876
    [root@study-01 /usr/local/rocketmq-4.5.1]#
    

    验证RocketMQ功能是否正常

    1、验证生产消息正常,执行如下命令:

    [root@study-01 /usr/local/rocketmq-4.5.1]# export NAMESRV_ADDR=localhost:9876
    [root@study-01 /usr/local/rocketmq-4.5.1]# sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
    

    正常的情况下,会看到一堆的类似于如下的输出,这是生产消息后成功的result:

    SendResult [sendStatus=SEND_OK, msgId=C0A8BE810A690D7163610FCC253B03E7, offsetMsgId=C0A8BE8100002A9F000000000002BDFE, messageQueue=MessageQueue [topic=TopicTest, brokerName=study-01, queueId=3], queueOffset=249]
    

    2、验证消费消息正常,执行如下命令:

    [root@study-01 /usr/local/rocketmq-4.5.1]# sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
    

    正常的情况下,会看到一堆的类似于如下的输出,这是消费的消息内容:

    ConsumeMessageThread_6 Receive New Messages: [MessageExt [queueId=3, storeSize=180, queueOffset=242, sysFlag=0, bornTimestamp=1564853837073, bornHost=/192.168.190.129:34708, storeTimestamp=1564853837074, storeHost=/192.168.190.129:10911, msgId=C0A8BE8100002A9F000000000002AA4E, commitLogOffset=174670, bodyCRC=911284903, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1564854006859, UNIQ_KEY=C0A8BE810A690D7163610FCC251103CB, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 55, 49], transactionId='null'}]]
    

    搭建RocketMQ控制台

    RocketMQ官方提供了一个基于Spring Boot开发的可视化控制台,可以方便我们查看RocketMQ的运行情况以及提升运维效率。所以本小节将介绍一下如何搭建搭建RocketMQ的控制台,由于我们使用的RocketMQ版本是4.5.1,所以需要对控制台的源码进行一些改动以适配RocketMQ的4.5.1版本。

    1、首先需要下载源码,有两种方式,一是使用git克隆代码仓库,二是直接下载rocketmq-externals的zip包,我这里使用git方式,执行如下命令:

    git clone https://github.com/apache/rocketmq-externals.git
    

    2、修改控制台代码,使用IDE打开rocketmq-console项目,如下图所示:

    2.1、修改项目中的application.properties配置文件,我这里主要是修改了监听端口和Name Server的连接地址,至于其他配置项有需要的话可按照说明自行修改:

    # console的监听端口,默认是8080
    server.port=8011
    # Name Server的连接地址;非必须,可以在启动了console后,在控制台导航栏 - 运维 - NameSvrAddrList一栏设置
    rocketmq.config.namesrvAddr=192.168.190.129:9876
    

    2.2、修改依赖,由于console项目默认使用的rocketmq版本是4.4.0,与我们这里使用的是4.5.1不完全兼容,所以需要修改一下依赖版本,找到这一行:

    <rocketmq.version>4.4.0</rocketmq.version>
    

    修改为:

    <rocketmq.version>4.5.1</rocketmq.version>
    

    2.3、修改代码,由于修改了rocketmq的版本,会导致org.apache.rocketmq.console.service.impl.MessageServiceImpl#queryMessageByTopic方法编译报错,所以需要改动一下此处代码 ,将:

    @Override
    public List<MessageView> queryMessageByTopic(String topic, final long begin, final long end) {
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, null);
        ...
    

    修改为:

    @Override
    public List<MessageView> queryMessageByTopic(String topic, final long begin, final long end) {
        RPCHook rpcHook = null;
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, rpcHook);
        ...
    

    3、打包构建并启动,打开idea的terminal,执行如下命令:

    # 在rocketmq-console目录下执行
    mvn clean package -DskipTests
    
    # 进入jar包存放目录
    cd target
    
    # 启动rocketmq console
    java -jar rocketmq-console-ng-1.0.1.jar
    

    4、使用浏览器访问控制台,我这里由于修改了端口,所以访问地址是:http://localhost:8011,正常的情况下能看到如下界面:


    不习惯英文的话可以在右上角切换语言:

    由于控制台是可视化界面并且支持中文,这里就不过多介绍了,可以参考官方的控制台使用说明文档:

    RocketMQ术语与概念

    官方文档:

    Spring消息编程模型 - 编写生产者

    在以上小节搭建完RocketMQ之后,我们来使用Spring的消息编程模型,编写一个简单的示例。首先需要在项目中添加相关依赖如下:

    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.0.3</version>
    </dependency>
    

    在配置文件中添加rocketmq相关的配置如下:

    rocketmq:
      name-server: 192.168.190.129:9876
      producer:
        # 小坑:必须指定group
        group: test-group
    

    编写生产者的代码,这里以Controller做示例,具体代码如下:

    import lombok.Data;
    import lombok.RequiredArgsConstructor;
    import org.apache.rocketmq.spring.core.RocketMQTemplate;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.util.Date;
    
    /**
     * 生产者
     **/
    @RestController
    @RequiredArgsConstructor
    public class TestProducerController {
    
        /**
         * 用于发送消息到 RocketMQ 的api
         */
        private final RocketMQTemplate rocketMQTemplate;
    
        @GetMapping("/test-rocketmq/sendMsg")
        public String testSendMsg() {
            String topic = "test-topic";
            // 发送消息
            rocketMQTemplate.convertAndSend(topic, MyMessage.getInstance());
    
            return "send message success";
        }
    }
    
    @Data
    class MyMessage {
        private Integer id;
        private String name;
        private String status;
        private Date createTime;
    
        static MyMessage getInstance() {
            MyMessage message = new Message();
            message.id = 1;
            message.name = "×××";
            message.status = "default";
            message.createTime = new Date();
    
            return message;
        }
    }
    

    编写完成后,启动项目,访问该接口:



    消息发送成功后,可以到RocketMQ的控制台中进行查看:



    消息体可以在消息详情中查看,如下:

    从生产者的代码来看,可以说是十分的简单了,只需要使用一个RocketMQTemplate就可以实现将对象转换成消息体并发送消息。实际上除了RocketMQ外,其他的MQ也有对应的Template,如下:

    • RocketMQ:RocketMQTemplate
    • ActiveMQ/Artemis:JmsTemplate
    • RabbitMQ:AmqpTemplate
    • Kafka:KafkaTemplate

    Spring消息编程模型 - 编写消费者

    在消费者项目中,也需要添加rocketmq的依赖:

    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.0.3</version>
    </dependency>
    

    同样需要配置Name Server的连接地址:

    rocketmq:
      name-server: 192.168.190.129:9876
    

    编写消费者的代码,具体代码如下:

    import com.alibaba.fastjson.JSON;
    import lombok.Data;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
    import org.apache.rocketmq.spring.core.RocketMQListener;
    import org.springframework.stereotype.Component;
    
    import java.util.Date;
    
    /**
     * 消费者监听器
     **/
    @Slf4j
    @Component
    // topic需要和生产者的topic一致,consumerGroup属性是必须指定的,内容可以随意
    @RocketMQMessageListener(topic = "test-topic", consumerGroup = "consumer-group")
    public class TestConsumerListener implements RocketMQListener<MyMessage> {
    
        /**
         * 监听到消息的时候就会调用该方法
         *
         * @param message 消息体
         */
        @Override
        public void onMessage(MyMessage message) {
            log.info("从test-topic中监听到消息");
            log.info(JSON.toJSONString(message));
        }
    }
    
    /**
     * 消息体结构需要一致
     */
    @Data
    class MyMessage {
        private Integer id;
        private String name;
        private String status;
        private Date createTime;
    }
    

    编写完成后启动项目,由于之前我们已经往队列里发送了消息,所以此时消费者项目一启动,就可以监听到消息并消费,控制台就会输出如下日志:



    RocketMQ事务消息

    众所周知RocketMQ是支持事务消息的,这也是很多人选择使用RocketMQ作为消息中间件的一大原因,也是RocketMQ的一大特定。RocketMQ事务消息的流程如下图所示:



    简单剖析一下流程:

    1、生产者向MQ Server发送半消息,半消息是一种特殊的消息,这种消息会被存储到MQ Server里,但是会标记为暂时不能投递的状态,所以此时消费者不会消费该消息
    2、当半消息发送成功后,生产者就会去执行本地事务
    3、生产者根据本地事务的执行结果,向MQ Server发送commit或rollback消息进行二次确认。如果MQ Server接收到的是commit则会将半消息标记为可投递状态,那么消费者就可以进行消费。反之,MQ Server接收到的是rollback则会将半消息丢弃掉,消费者就无法进行消费
    4、若MQ Server未接收到二次确认的消息或生产者暂停了本地事务的执行,MQ Server则会定时(默认1分钟)向生产者发送回查消息,检查生产者的本地事务状态。然后生产者会根据回查的本地事务执行结果向MQ Server再次发送commit或rollback消息

    概念术语

    • 半消息(Half Message):暂时无法消费的消息,生产者将消息发送到了MQ Server,但这个消息会被标记为“暂不能投递”状态,先存储起来;消费者不会消费这条消息
    • 消息回查(Message Status Check):网络断开或生产者重启可能导致丢失事务消息的第二次确认。当MQ Server发现消息长时间处于半消息状态时,将向消息生产者发送请求,询问该消息的最终状态(提交或回滚)

    消息三态

    Commit:提交事务消息,消费者可以消费此消息
    Rollback:回滚事务消息,broker会删除该消息,消费者不能消费
    UNKNOWN:broker需要回查确认该消息的状态

    编码实现RocketMQ事务消息

    要想实现RocketMQ事务消息的话,需要按照流程图编写一些代码。在开始编码之前,先在数据库中创建一张RocketMQ的事务日志表,用作于本地事务回查的依据,表结构如下:


    然后再建一张表,作为事务方法操作的数据表,表结构如下:



    接着开始写代码,首先定义一个service,里面有带有事务注解的方法以及发送事务消息的方法。具体代码如下:

    import com.zj.node.contentcenter.dao.content.NoticeMapper;
    import com.zj.node.contentcenter.dao.log.RocketmqTransactionLogMapper;
    import com.zj.node.contentcenter.domain.entity.content.Notice;
    import com.zj.node.contentcenter.domain.entity.log.RocketmqTransactionLog;
    import lombok.Data;
    import lombok.RequiredArgsConstructor;
    import org.apache.rocketmq.spring.core.RocketMQTemplate;
    import org.apache.rocketmq.spring.support.RocketMQHeaders;
    import org.springframework.messaging.support.MessageBuilder;
    import org.springframework.stereotype.Service;
    import org.springframework.transaction.annotation.Transactional;
    
    import java.util.Date;
    import java.util.UUID;
    
    @Service
    @RequiredArgsConstructor
    public class TestProducerService {
    
        private final RocketMQTemplate rocketMQTemplate;
        private final NoticeMapper noticeMapper;
        private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper;
    
        public String testSendMsg(Notice notice) {
            // topic
            String topic = "test-topic";
            // 生产者所在的事务组
            String txProducerGroup = "tx-test-producer-group";
            // 生产事务id
            String transactionId = UUID.randomUUID().toString();
            // 发送半消息
            rocketMQTemplate.sendMessageInTransaction(
                    txProducerGroup, topic,
                    // 消息体
                    MessageBuilder.withPayload("事务消息")
                            // header是消息的头部分,可以用作传参
                            .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
                            .setHeader("notice_id", notice.getId())
                            .build(),
                    // 传递到executeLocalTransaction的参数
                    notice);
    
            return "send message success";
        }
    
        @Transactional(rollbackFor = Exception.class)
        public void updateNotice(Integer noticeId, Notice notice) {
            Notice newNotice = new Notice();
            newNotice.setId(noticeId);
            newNotice.setContent(notice.getContent());
    
            noticeMapper.updateByPrimaryKeySelective(newNotice);
        }
    
        @Transactional(rollbackFor = Exception.class)
        public void updateNoticeWithRocketMQLog(Integer noticeId, Notice notice, String transactionId) {
            updateNotice(noticeId, notice);
            // 写入事务日志
            rocketmqTransactionLogMapper.insertSelective(
                    RocketmqTransactionLog.builder()
                            .transactionId(transactionId)
                            .log("updateNotice")
                            .build()
            );
        }
    }
    

    实现一个本地事务监听器,用于执行事务方法及提供本地事务状态的回查方法。具体代码如下:

    package com.zj.node.contentcenter.rocketmq;
    
    import com.zj.node.contentcenter.dao.log.RocketmqTransactionLogMapper;
    import com.zj.node.contentcenter.domain.entity.content.Notice;
    import com.zj.node.contentcenter.domain.entity.log.RocketmqTransactionLog;
    import com.zj.node.contentcenter.service.test.TestProducerService;
    import lombok.RequiredArgsConstructor;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
    import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
    import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
    import org.apache.rocketmq.spring.support.RocketMQHeaders;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.MessageHeaders;
    
    /**
     * 本地事务监听器
     **/
    @Slf4j
    @RequiredArgsConstructor
    // 这里的txProducerGroup需要与sendMessageInTransaction里设置的一致
    @RocketMQTransactionListener(txProducerGroup = "tx-test-producer-group")
    public class TestTransactionListener implements RocketMQLocalTransactionListener {
    
        private final TestProducerService service;
        private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper;
    
        /**
         * 用于执行本地事务的方法
         */
        @Override
        public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            MessageHeaders headers = msg.getHeaders();
            String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
                    log.info("执行本地事务方法. 事务id: {}", transactionId);
                    // header里拿出来的都是String类型
            Integer noticeId = Integer.parseInt((String) headers.get("notice_id"));
    
            try {
                // 执行带有事务注解的方法
                service.updateNoticeWithRocketMQLog(noticeId, (Notice) arg, transactionId);
                // 正常执行,向MQ Server发送commit消息
                return RocketMQLocalTransactionState.COMMIT;
            } catch (Exception e) {
                log.error("本地事务方法发生异常,消息将被回滚", e);
                // 发生异常向MQ Server发送rollback消息
                return RocketMQLocalTransactionState.ROLLBACK;
            }
        }
    
        /**
         * 用于回查本地事务的执行结果
         */
        @Override
        public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
            MessageHeaders headers = msg.getHeaders();
            String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
            log.warn("回查本地事务状态. 事务id: {}", transactionId);
    
            // 按事务id查询日志数据
            RocketmqTransactionLog transactionLog = rocketmqTransactionLogMapper.selectOne(
                    RocketmqTransactionLog.builder()
                            .transactionId(transactionId)
                            .build()
            );
    
            // 如果能按事务id查询出来数据表示本地事务执行成功,没有数据则表示本地事务执行失败
            if (transactionLog == null) {
                log.warn("本地事务执行失败,事务日志不存在,消息将被回滚. 事务id: {}", transactionId);
                return RocketMQLocalTransactionState.ROLLBACK;
            }
            return RocketMQLocalTransactionState.COMMIT;
        }
    }
    

    简单说明一下这些方法的执行流程:

    首先调用TestProducerService.testSendMsg向MQ Server发送半消息,从代码也可以看到该方法里不会执行本地事务方法。当MQ Server接收半消息成功后,会告诉生产者接收成功,接着就会执行本地事务监听器里的executeLocalTransaction方法,该方法里会调用TestProducerService里带有事务注解的方法updateNoticeWithRocketMQLog,并在事务方法执行完毕后返回本地事务状态给MQ Server。若executeLocalTransaction方法返回的事务状态是UNKNOWN或者该方法出于某种原因没有被执行完毕,那么MQ Server就接收不到二次确认消息,默认会在一分钟后向生产者发送回查消息,生产者接收到回查消息的话就会执行本地事务监听器里的checkLocalTransaction方法,通过事务日志记录表的数据来确认该事务状态并返回。


    RocketMQ日志相关的坑

    由于rocketmq有自己内部的日志体系,所以默认不会使用Slf4j。体现到executeLocalTransaction方法的话,就是如果该方法的执行过程中抛出了异常的话,异常信息不会被打印到控制台,而是输出到rocketmq_client.log日志文件中。相关源码:org.apache.rocketmq.client.log.ClientLogger

    如果希望rocketmq的日志输出到控制台的话,需要在启动类的main方法中增加如下代码:

    // 让rocketmq使用slf4j日志
    System.setProperty(ClientLogger.CLIENT_LOG_USESLF4J, "true");
    

    原文:https://blog.51cto.com/zero01/2426303

    相关文章

      网友评论

        本文标题:Spring Cloud Alibaba之消息中间件 - Roc

        本文链接:https://www.haomeiwen.com/subject/xeyvvctx.html