美文网首页
Springboot集成RabbitMq

Springboot集成RabbitMq

作者: Albert_Yu | 来源:发表于2018-08-08 17:06 被阅读0次

    spring-boot-starter-amqp项目对消息各种支持。

    可以参阅官方文档
    https://docs.spring.io/spring-boot/docs/1.5.15.RELEASE/reference/htmlsingle/#boot-features-rabbitmq

    RabbitMQ是一个基于AMQP协议的轻量级,可靠,可扩展且可移植的消息代理。Spring使用RabbitMQ AMQP协议进行通信。
    abbitMQ配置由外部配置属性控制 spring.rabbitmq.*。例如,您可以在以下部分声明以下部分 application.properties:

     spring.rabbitmq.host = localhost
     spring.rabbitmq.port = 5672
     spring.rabbitmq.username = admin
     spring.rabbitmq.password = secret
    

    有关rabbitmq的属性相关配置可以参照 RabbitProperties 类中的相关源码
    默认host是localhost ,port是5672,virtualHost是"/",用户名和密码是guest

    一、 SpringBoot集成RabbitMq的最简框架的搭建
    1、在pom.xml中添加必要的依赖

           <!-- spring boot amqp包 -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
    

    当SpringBoot项目中在pom.xml中引入了上面的这个依赖,那么久已经持有了该RabbitTemplate对象了
    默认配置就是上述相关的描述了

    二、 SpringBoot集成RabbitMQ快速入门
    1、新建一个maven项目 springboot-rabbitmq


    image.png

    2、在pom.xml中引入必要的依赖

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>1.5.6.RELEASE</version>
            <relativePath/>
        </parent>
    
        <groupId>com.yubin.springboot</groupId>
        <artifactId>springboot-rabbitmq</artifactId>
        <version>1.0.0-SNAPSHOT</version>
    
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
            <java.version>1.7</java.version>
        </properties>
    
        <dependencies>
            <!-- Spring boot web 集成包 -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <!-- spring boot 测试包 -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
            </dependency>
    
            <!-- spring boot amqp包 -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    </project>
    

    3、在application.properties中配置rabbitmq相关的属性

    # ==================== rabbitmq ===========
    # rabbitmq 主机地址
    spring.rabbitmq.host=127.0.0.1
    # rabbitmq 主机端口号
    spring.rabbitmq.port=5672
    # rabbitmq 用户名
    spring.rabbitmq.username=yubin
    # rabbitmq 密码
    spring.rabbitmq.password=yubin
    # rabbitmq 虚拟主机
    spring.rabbitmq.virtual-host=/yubin
    

    4、创建启动类

    /**
     * SpringBoot集成RabbitMq用例启动类
     *
     * @Author YUBIN
     */
    @SpringBootApplication
    public class RabbitMqDemoApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(RabbitMqDemoApplication.class, args);
        }
    }
    

    5、定义队列信息

    package com.yubin.springboot.rabbitmq.configuration;
    
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * RabbitMq的相关配置
     *
     * @Author YUBIN
     */
    @Configuration // 相当于xml配置文件
    public class RabbitMqConfiguration {
    
        @Bean
        public Queue helloQueue() {
            return new Queue("hello-queue");
        }
    
        @Bean
        public Queue userQueue() {
            return new Queue("user-queue");
        }
    }
    

    6、定义生产者

    package com.yubin.springboot.rabbitmq.producer;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    /**
     * rabbitmq生产者示例
     *
     * @Author YUBIN
     */
    @Component // 将该类交给Spring管理
    public class RabbitMqProducerDemo {
    
        private static Logger logger = LoggerFactory.getLogger(RabbitMqProducerDemo.class);
    
        @Autowired // 注入rabbitmq 模板
        private AmqpTemplate rabbitTemplate;
    
        /**
         * 发送消息的方法 hello-queue
         */
        public void sendHelloMessage() {
            // 定义消息体
            String message = "RabbitMqProducerDemo hello queue send message";
            rabbitTemplate.convertAndSend("hello-queue",message);
            logger.info("==================RabbitMqProducerDemo hello queue send message success");
        }
    
        /**
         * 发送消息的方法 user-queue
         */
        public void sendUserMessage() {
            // 定义消息体
            String message = "RabbitMqProducerDemo user queue send message";
            rabbitTemplate.convertAndSend("user-queue",message);
            logger.info("===================RabbitMqProducerDemo user queue send message success");
        }
    }
    

    7、定义消费者

    package com.yubin.springboot.rabbitmq.consumer;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    /**
     * RabbitMq消费者示例
     *
     * @Author YUBIN
     * @create 2018-08-08
     */
    @Component // 交给Spring管理
    //@RabbitListener(queues = {"hello-queue","user-queue"})
    public class RabbitMqConsumerDemo {
    
        private static Logger logger = LoggerFactory.getLogger(RabbitMqConsumerDemo.class);
    
        //@RabbitHandler
        @RabbitListener(queues = "hello-queue")
        public void executeHello(String message) {
            logger.info("executeHello================接收到的消息是:" + message);
        }
    
        @RabbitListener(queues = "user-queue")
        public void executeUser(String message) {
            logger.info("executeUser=================接收到的消息是:" + message);
        }
    }
    

    8、测试类

    /**
     * SpringBoot集成RabbitMq测试类
     *
     * @Author YUBIN
     */
    @RunWith(SpringRunner.class) //SpringRunner相当于 SpringJUnit4ClassRunner的别名类
    @SpringBootTest(classes = RabbitMqDemoApplication.class)
    public class RabbitMqDemoTest {
    
        @Autowired
        private RabbitMqProducerDemo producerDemo;
    
        @Test
        public void test1() {
            producerDemo.sendHelloMessage();
            producerDemo.sendUserMessage();
        }
    }
    

    三、 SpringBoot集成RabbitMq各种模式的案例
    1、简单队列


    image.png

    如上述案例所示
    2、工作模式(默认是多劳多得的)


    image.png
    在RabbitMqConfiguration类中增加work-queue
    @Bean
    public Queue workQueue() {
        return new Queue("work-queue");
    }
    

    在RabbitMqProducerDemo类中增加一个发送消息的方法

    /**
     * 发送消息的方法 work-queue
     */
    public void sendWorkMessage(String message) {
        // 定义消息体
        rabbitTemplate.convertAndSend("work-queue",message);
        logger.info("==================RabbitMqProducerDemo work queue send success:" + message);
    }
    
    在RabbitMqConsumerDemo类中增加两个消费者方法
    @RabbitListener(queues = "work-queue")
    public void executeWork1(String message) {
        logger.info("executeWork1================接收到的消息是:" + message);
    }
    
    @RabbitListener(queues = "work-queue")
    public void executeWork2(String message) throws InterruptedException {
        Thread.sleep(100);
        logger.info("executeWork2================接收到的消息是:" + message);
    }
    

    测试类的书写

    @Test
    public void test2() throws InterruptedException {
        for (int i = 0; i < 100; i++) {
            producerDemo.sendWorkMessage(i + "");
            Thread.sleep(i*10);
        }
    }
    

    3、发布订阅模式 fanount


    image.png

    在RabbitMqConfiguration中定义队列,交换机以及将交换机和队列进行绑定,当然队列和交换机的绑定也是可以通过在客户端(浏览器上进行绑定)

    @Bean
    public Queue fanoutQueueA() {
        return new Queue("fanout-queue-A");
    }
    
    @Bean
    public Queue fanoutQueueB() {
        return new Queue("fanout-queue-B");
    }
    
    @Bean
    public Queue fanoutQueueC() {
        return new Queue("fanout-queue-C");
    }
    
    // 交换机的声明
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanout-exchange");
    }
    
    /**
     * 将交换机fanoutExchange与队列fanoutQueueA进行绑定 该种模式的路由key是无效的
     * @param fanoutQueueA
     * @param fanoutExchange
     * @return
     */
    @Bean
    public Binding bindingFanoutExchangeToFanoutQueueA(Queue fanoutQueueA, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutQueueA).to(fanoutExchange);
    }
    
    /**
     * 将交换机fanoutExchange与队列fanoutQueueA进行绑定 该种模式的路由key是无效的
     * @param fanoutQueueB
     * @param fanoutExchange
     * @return
     */
    @Bean
    public Binding bindingFanoutExchangeToFanoutQueueB(Queue fanoutQueueB, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutQueueB).to(fanoutExchange);
    }
    
    /**
     * 将交换机fanoutExchange与队列fanoutQueueA进行绑定 该种模式的路由key是无效的
     * @param fanoutQueueC
     * @param fanoutExchange
     * @return
     */
    @Bean
    public Binding bindingFanoutExchangeToFanoutQueueC(Queue fanoutQueueC, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutQueueC).to(fanoutExchange);
    }
    

    在生产者类中RabbitMqProducerDemo添加

    /**
     * 发送消息的方法
     * @param message 消息体
     * @param exchangeName 交换机的名字
     * @param routingKey 路由key
     */
    public void sendMessage(String message,String exchangeName,String routingKey) {
        // 发生消息  这里的 routingKey 是无效的
        rabbitTemplate.convertAndSend(exchangeName,routingKey,message);
        logger.info("==================RabbitMqProducerDemo " + exchangeName + ":" + routingKey + " send success:" + message);
    }
    

    在消费者类中添加

    @RabbitListener(queues = "fanout-queue-A")
    public void fanoutQueueA(String message) throws InterruptedException {
        logger.info("fanoutQueueA================接收到的消息是:" + message);
    }
    
    @RabbitListener(queues = "fanout-queue-B")
    public void fanoutQueueB(String message) throws InterruptedException {
        logger.info("fanoutQueueB================接收到的消息是:" + message);
    }
    
    @RabbitListener(queues = "fanout-queue-C")
    public void fanoutQueueC(String message) throws InterruptedException {
        logger.info("fanoutQueueC================接收到的消息是:" + message);
    }
    

    测试

    @Test
    public void test3() throws InterruptedException {
        producerDemo.sendMessage("哈哈A","fanout-exchange","fanout-queue-A");
        producerDemo.sendMessage("哈哈B","fanout-exchange","fanout-queue-B");
        producerDemo.sendMessage("哈哈C","fanout-exchange","fanout-queue-C");
    }
    

    4、路由模式


    image.png

    在RabbitMqConfiguration中定义队列,交换机以及将交换机和队列进行绑定,当然队列和交换机的绑定也是可以通过在客户端(浏览器上进行绑定)

    // 定义队列
    @Bean
    public Queue directQueueA() {
        return new Queue("direct-queue-A");
    }
    
    @Bean
    public Queue directQueueB() {
        return new Queue("direct-queue-B");
    }
    
    @Bean
    public Queue directQueueC() {
        return new Queue("direct-queue-C");
    }
    
    // 声明交换机
    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange("direct-exchange");
    }
    
    // 交换机与队列进行绑定并定义routingKey
    @Bean
    public Binding bindingDirectQueueAToDirectExchangeU(Queue directQueueA, DirectExchange directExchange) {
        return BindingBuilder.bind(directQueueA).to(directExchange).with("update");
    }
    @Bean
    public Binding bindingDirectQueueAToDirectExchangeI(Queue directQueueA, DirectExchange directExchange) {
        return BindingBuilder.bind(directQueueA).to(directExchange).with("insert");
    }
    @Bean
    public Binding bindingDirectQueueAToDirectExchangeD(Queue directQueueA, DirectExchange directExchange) {
        return BindingBuilder.bind(directQueueA).to(directExchange).with("delete");
    }
    @Bean
    public Binding bindingDirectQueueBToDirectExchangeD(Queue directQueueB, DirectExchange directExchange) {
        return BindingBuilder.bind(directQueueB).to(directExchange).with("delete");
    }
    @Bean
    public Binding bindingDirectQueueCToDirectExchangeI(Queue directQueueC, DirectExchange directExchange) {
        return BindingBuilder.bind(directQueueC).to(directExchange).with("insert");
    }
    在消费者类中添加
    @RabbitListener(queues = "direct-queue-A")
    public void directQueueA(String message) throws InterruptedException {
        logger.info("directQueueA================接收到的消息是:" + message);
    }
    
    @RabbitListener(queues = "direct-queue-B")
    public void directQueueB(String message) throws InterruptedException {
        logger.info("directQueueB================接收到的消息是:" + message);
    }
    
    @RabbitListener(queues = "direct-queue-C")
    public void directQueueC(String message) throws InterruptedException {
        logger.info("directQueueC================接收到的消息是:" + message);
    }
    

    测试类

    @Test
    public void test4() {
        producerDemo.sendMessage("哈哈-update","direct-exchange","update");
        producerDemo.sendMessage("哈哈-insert","direct-exchange","insert");
        producerDemo.sendMessage("哈哈-delete","direct-exchange","delete");
    }
    

    5、通配符模式


    在RabbitMqConfiguration中定义队列,交换机以及将交换机和队列进行绑定,当然队列和交换机的绑定也是可以通过在客户端(浏览器上进行绑定)
    // 声明通配符模式的队列

    @Bean
    public Queue topicQueueA() {
        return new Queue("topic-queue-A");
    }
    
    @Bean
    public Queue topicQueueB() {
        return new Queue("topic-queue-B");
    }
    
    // 声明通配符模式交换机
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange("topic-exchange");
    }
    
    // 通配符模式下交换机与队列进行绑定并定义routingKey
    @Bean
    public Binding bindingTopicQueueAToTopicExchange(Queue topicQueueA, TopicExchange topicExchange) {
        return BindingBuilder.bind(topicQueueA).to(topicExchange).with("topic.update");
    }
    @Bean
    public Binding bindingTopicQueueBToTopicExchangeI(Queue topicQueueB, TopicExchange topicExchange) {
        return BindingBuilder.bind(topicQueueB).to(topicExchange).with("topic.#");
    }
    在消费者类中添加
    
    @RabbitListener(queues = "topic-queue-A")
    public void topicQueueA(String message) throws InterruptedException {
        logger.info("topicQueueA================接收到的消息是:" + message);
    }
    
    @RabbitListener(queues = "topic-queue-B")
    public void topicQueueB(String message) throws InterruptedException {
        logger.info("topicQueueB================接收到的消息是:" + message);
    }
    

    测试类

    @Test
    public void test5() {
        producerDemo.sendMessage("哈哈-update","topic-exchange","topic.update");
        producerDemo.sendMessage("哈哈-insert","topic-exchange","topic.insert");
        producerDemo.sendMessage("哈哈-delete","topic-exchange","topic.delete");
    }
    

    四、 SpringBoot集成RabbitMq扩展
    当然如果你对这种形式的配置不习惯的话,你也可以使用外部的配置文件来使用
    在启动类上使用@ImportResource注解来引入其他的xml配置文件


    image.png

    五、 SpringBoot配置多个RabbitMq
    有时候由于业务的复杂性,需要配置多个RabbitMq
    步骤
    1、在application.properties中增加一个rabbitmq配置


    image.png

    2、在配置类中增加


    image.png

    3、新建一个生产者


    image.png
    4、新建一个消费者
    image.png

    5、测试类


    image.png

    相关文章

      网友评论

          本文标题:Springboot集成RabbitMq

          本文链接:https://www.haomeiwen.com/subject/ltuxbftx.html