美文网首页
MQ Springboot整合ActiveMQ

MQ Springboot整合ActiveMQ

作者: 小P聊技术 | 来源:发表于2021-04-03 09:41 被阅读0次

    1 资源

    资源信息 版本号 备注
    activeMQ 5.16.1 IP: 192.168.51.4
    springboot 2.1.5.RELEASE
    prettyZoo 2.0 zookeeper可视化工具

    springboot-activemq-demo 源码 下载

    2 zookeeper安装

    需要安装zookeeper,如果未安装,可参考博文:

    MQ ActiveMQ安装部署和配置

    3 springboot整合

    3.1 pom文件

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.1.5.RELEASE</version>
            <!--        <version>2.3.2.RELEASE</version>-->
            <relativePath />
        </parent>
    
        <groupId>com.auskat.demo</groupId>
        <artifactId>springboot-activemq-demo</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <properties>
            <maven.compiler.source>8</maven.compiler.source>
            <maven.compiler.target>8</maven.compiler.target>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-activemq</artifactId>
            </dependency>
    
        </dependencies>
    
    </project>
    

    3.2 配置信息

    3.2.1 application.yml

    spring:
      activemq:
        # ActiveMQ通讯地址
        broker-url: tcp://192.168.51.4:61616
        # 用户名
        user: auskat
        # 密码
        password: 123456
        # 是否启用内存模式(就是不安装MQ,项目启动时同时启动一个MQ实例)
        in-memory: false
        packages:
          # 信任所有的包
          trust-all: true
        pool:
          # 是否替换默认的连接池,使用ActiveMQ的连接池需引入的依赖
          enabled: false
    

    3.2.2 配置类

    package com.auskat.demo.activemq.config;
    
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import org.apache.activemq.command.ActiveMQQueue;
    
    import javax.jms.Queue;
    
    
    /**
     * 类文件: ActiveMQConfig
     * <p>
     * <p>
     * 类描述:
     * <p>
     * 作     者: AusKa_T
     * <p>
     * 日     期: 2021/3/25 0025
     * <p>
     * 时     间: 13:46
     * <p>
     */
    @Configuration
    public class ActiveMQConfig {
    
        @Bean
        public Queue queue() {
            return new ActiveMQQueue("testQueue");
        }
    }
    

    3.3 功能实现

    3.3.1 messge对象

    package com.auskat.demo.activemq.message;
    
    import java.io.Serializable;
    import java.util.Date;
    
    /**
     * 类文件: Message
     * <p>
     * <p>
     * 类描述:发送的消息
     * <p>
     * 作     者: AusKa_T
     * <p>
     * 日     期: 2021/3/25 0025
     * <p>
     * 时     间: 14:29
     * <p>
     */
    public class Message implements Serializable {
    
        private static final long serialVersionUID = -3368242239336448470L;
    
        /**
         * 要发送到的队列
         */
        private String queue;
    
        /**
         * 发送内容
         */
        private String content;
    
        /**
         * 发送日期
         */
        private Date sendDate;
    
        public Message() {
        }
    
        public Message(String content) {
            this.content = content;
            this.sendDate = new Date();
        }
    
        public Message(String content, Date sendDate) {
            this.content = content;
            this.sendDate = sendDate;
        }
    
        public Message(String queue, String content, Date sendDate) {
            this.queue = queue;
            this.content = content;
            this.sendDate = sendDate;
        }
    
        public Message(String queue, String content) {
            this.queue = queue;
            this.content = content;
            this.sendDate = new Date();
        }
    
        @Override
        public String toString() {
            return "Message{" +
                    "queue='" + queue + '\'' +
                    ", content='" + content + '\'' +
                    ", sendDate=" + sendDate +
                    '}';
        }
    
        public String getQueue() {
            return queue;
        }
    
        public void setQueue(String queue) {
            this.queue = queue;
        }
    
        public String getContent() {
            return content;
        }
    
        public void setContent(String content) {
            this.content = content;
        }
    
        public Date getSendDate() {
            return sendDate;
        }
    
        public void setSendDate(Date sendDate) {
            this.sendDate = sendDate;
        }
    }
    
    

    3.3.2 producer服务

    package com.auskat.demo.activemq.producer;
    
    import com.auskat.demo.activemq.message.Message;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.jms.core.JmsMessagingTemplate;
    import org.springframework.stereotype.Service;
    
    import javax.jms.JMSException;
    import javax.jms.Queue;
    
    
    /**
     * 类文件: ProducerService
     * <p>
     * <p>
     * 类描述:
     * <p>
     * 作     者: AusKa_T
     * <p>
     * 日     期: 2021/3/25 0025
     * <p>
     * 时     间: 13:55
     * <p>
     */
    @Service
    public class ProducerService {
    
        Logger logger = LoggerFactory.getLogger(ProducerService.class);
    
        /**
         *  注入配置类里面定义的queue
         */
        @Autowired
        private Queue queue;
    
        /**
         * 注入springboot集成的工具类
         */
        @Autowired
        private JmsMessagingTemplate jmsMessagingTemplate;
    
        /**
         * 添加消息到消息队列
         * 使用spring初始化注入的queue队列
         * @param message 消息
         */
        public void sendMessage(Message message) throws JMSException {
            jmsMessagingTemplate.convertAndSend(queue, message);
            logger.info("开始发送消息到队列: {}, 消息内容: {}",queue.getQueueName(), message);
        }
    
        /**
         * 添加消息到消息队列
         * @param queue  队列,不存在的话,系统会自行创建
         * @param message 消息
         */
        public void sendMessage(String queue,Message message) {
            jmsMessagingTemplate.convertAndSend(queue, message);
            logger.info("开始发送消息到队列: {}, 消息内容: {}",queue, message);
        }
    
    }
    

    3.3.3 consumer服务

    package com.auskat.demo.activemq.consumer;
    
    import com.auskat.demo.activemq.message.Message;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.jms.annotation.JmsListener;
    import org.springframework.jms.core.JmsMessagingTemplate;
    import org.springframework.messaging.handler.annotation.SendTo;
    import org.springframework.stereotype.Service;
    
    /**
     * 类文件: ConsumerServuce
     * <p>
     * <p>
     * 类描述:
     * <p>
     * 作     者: AusKa_T
     * <p>
     * 日     期: 2021/3/25 0025
     * <p>
     * 时     间: 14:24
     * <p>
     */
    @Service
    public class ConsumerService {
    
        private final static Logger logger = LoggerFactory.getLogger(ConsumerService.class);
    
        @Autowired
        private JmsMessagingTemplate jmsMessagingTemplate;
    
        // 使用JmsListener配置消费者监听的队列,其中msg是接收到的消息
        @JmsListener(destination = "testQueue")
        // SendTo 会将此方法返回的数据, 写入到 testQueue2 中去
        @SendTo("testQueue2")
        public void receiveMessage(Message message) {
            logger.info("接收到消息, {}", message);
        }
    
        /**
         * 接收消息
         * @param queue 队列
         */
        public void receiveMessage(String queue) {
            Message message = jmsMessagingTemplate.receiveAndConvert(queue, Message.class);
            logger.info("接收到消息, {}", message);
        }
    
    }
    
    

    4 功能测试

    4.1 Idea调试

    package com.auskat.demo.activemq;
    
    import com.auskat.demo.activemq.consumer.ConsumerService;
    import com.auskat.demo.activemq.message.Message;
    import com.auskat.demo.activemq.producer.ProducerService;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    import javax.jms.JMSException;
    
    /**
     * 类文件: ApplicationTest
     * <p>
     * <p>
     * 类描述:
     * <p>
     * 作     者: AusKa_T
     * <p>
     * 日     期: 2021/3/25 0025
     * <p>
     * 时     间: 14:43
     * <p>
     */
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class ApplicationTest {
    
        @Autowired
        private ProducerService producerService;
    
        @Autowired
        private ConsumerService consumerService;
    
        /**
         * 消息接收和发送测试
         * @throws JMSException 异常
         */
        @Test
        public void testSendAndReceiveMessage() throws JMSException {
            Message message = new Message("ActiveMQ-test", "activeMQ-test");
            producerService.sendMessage(message);
            consumerService.receiveMessage(message.getQueue());
        }
    }
    

    4.2 管理后台查看

    访问地址:http://192.168.51.4:8161/admin/

    用户名: auskat

    密码: 123456

    image-20210325151618325.png

    5 相关信息

    • 博文不易,辛苦各位猿友点个关注和赞,感谢

    相关文章

      网友评论

          本文标题:MQ Springboot整合ActiveMQ

          本文链接:https://www.haomeiwen.com/subject/abnckltx.html