美文网首页
Spring boot 集成rabbitmq

Spring boot 集成rabbitmq

作者: 刘小刀tina | 来源:发表于2020-05-17 11:33 被阅读0次

1 .pom.xml

 <!-- Spring boot 集成rabbitmq-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>2.2.2.RELEASE</version>
        </dependency>

2 . 配置

spring:
  application:
    name: demo-mq



#配置rabbitmq
  rabbitmq:
    addresses: 152.136.27.48:5672
    username: guest
    password: guest
    #虚拟主机地址
    virtual-host: /
    #连接超时时间
    connection-timeout: 15000
    #publisher-confirms: true  #开启发送确认
    publisher-returns: true #开启发送失败退回
    template:
      mandatory: true
    #消费端配置
    listener:
      simple:
        #消费端
        concurrency: 10
        #最大消费端数
        max-concurrency: 20
        #自动签收auto  手动 manual
        acknowledge-mode: manual #开启ACK
        #限流
        prefetch: 50

3 .rabbitmq配置

package com.example.demo.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @program: demo-rabbitmq
 * @description
 * @author: tina.liu
 * @create: 2020-05-16 21:36
 **/
@Configuration
public class RabbitConfig {

    /**
     * 创建广播类型的exchange queue 将二者绑定
     */

    //队列名
    public static final String FANOUT_QUEUE_NAME = "test_fanout_queue";
    public static final String FANOUT_QUEUE_NAME1 = "test_fanout_queue1";
    public static final String TEST_FANOUT_EXCHANGE = "testFanoutExchange";

    //创建队列
    @Bean
    public Queue createFanoutQueue() {
        return new Queue(FANOUT_QUEUE_NAME);
    }

    //创建队列
    @Bean
    public Queue createFanoutQueue1() {
        return new Queue(FANOUT_QUEUE_NAME1);
    }

    //创建广播类型的交换机
    @Bean
    public FanoutExchange defFanoutExchange() {
        return new FanoutExchange(TEST_FANOUT_EXCHANGE);
    }

    //队列与交换机进行绑定
    @Bean
    Binding bindingFanout() {
        return BindingBuilder.bind(createFanoutQueue()).
                to(defFanoutExchange());
    }

    //队列与交换机进行绑定
    @Bean
    Binding bindingFanout1() {
        return BindingBuilder.bind(createFanoutQueue1()).
                to(defFanoutExchange());
    }

    /**
     * 创建直连的queue exchange 绑定
     * @return
     */
    public static final String DIRECT_QUEUE_NAME = "test_direct_queue"; //queue
    public static final String TEST_DIRECT_EXCHANGE = "testDirectExchange"; //exchange
    public static final String DIRECT_ROUTINGKEY = "test"; //routingKey

    /**
     * 创建直连的队列
     * @return
     */
    @Bean
    public Queue createDirectQueue() {
        return new Queue(DIRECT_QUEUE_NAME);
    }

    /**
     * 创建直连交换机
     * @return
     */
    @Bean
    DirectExchange directExchange(){
        return new DirectExchange(TEST_DIRECT_EXCHANGE);
    }

    /**
     * 将交换机和队列绑定
     * @return
     */
    @Bean
    Binding bindingDirect() {
        return BindingBuilder.bind(createDirectQueue()).
                to(directExchange()).
                with(DIRECT_ROUTINGKEY);
    }


    /**
     * 创建通配符类型的exchange queue 将二者绑定
     */
    public static final String TOPIC_QUEUE_NAME = "test_topic_queue"; //queue
    public static final String TEST_TOPIC_EXCHANGE = "testTopicExchange"; //exchange
    public static final String TOPIC_ROUTINGKEY = "test.*";//routingKey

    //创建队列
    @Bean
    public Queue createTopicQueue() {
        return new Queue(TOPIC_QUEUE_NAME);
    }

    /**
     * 创建通配符交换机
     * @return
     */
    @Bean
    TopicExchange defTopicExchange(){
        return new TopicExchange(TEST_TOPIC_EXCHANGE);
    }

    /**
     * 将交换机和队列进行绑定
     * @return
     */
    @Bean
    Binding bindingTopic() {
        return BindingBuilder.bind(createTopicQueue()).
                to(defTopicExchange()).
                with(TOPIC_ROUTINGKEY);
    }


}


4 . 生产者

package com.example.demo.rabbitmq.rabbitmq;

import com.example.demo.rabbitmq.config.RabbitConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @program: demo-rabbitmq
 * @description 消息生成者
 * @author: tina.liu
 * @create: 2020-05-16 21:42
 **/
@Component
@Slf4j
public class MsgProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 通过广播类型的交换机生成消息
     * @param massage
     */
    public void send2FanoutTestQueue(String massage){
        //exchange key msg
        rabbitTemplate.convertAndSend(RabbitConfig.TEST_FANOUT_EXCHANGE,"", massage);
        log.info("广播类型的交换机绑定消息队列,生成者发送消息成功");
    }

    /**
     * 通过直连类型的交换机生成消息
     * @param massage
     */
    public void send2DirectTestQueue(String massage){
        rabbitTemplate.convertAndSend(RabbitConfig.TEST_DIRECT_EXCHANGE, RabbitConfig.DIRECT_ROUTINGKEY, massage);
        log.info("直连类型的交换机绑定消息队列,生成者发送消息成功");
    }

    /**
     * 通过通配符类型的交换机生成消息
     * @param massage
     */
    public void send2TopicTestAQueue(String massage){
        rabbitTemplate.convertAndSend(RabbitConfig.TEST_TOPIC_EXCHANGE, "a.test.aaa", massage);
        log.info("通配符类型A的交换机绑定消息队列,生成者发送消息成功");
    }

    public void send2TopicTestBQueue(String massage){
        rabbitTemplate.convertAndSend(RabbitConfig.TEST_TOPIC_EXCHANGE, "test.bbb", massage);
        log.info("通配符类型B的交换机绑定消息队列,生成者发送消息成功");
    }

}


5 .消费者

package com.example.demo.rabbitmq.rabbitmq;

import com.example.demo.rabbitmq.config.RabbitConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;

/**
 * @program: demo-rabbitmq
 * @description 消息消费者
 * @author: tina.liu
 * @create: 2020-05-16 21:42
 **/
@Component
@Slf4j
public class MsgConsumer {

    /**
     * 从跟广播类型的交换机绑定的队列中消费消息
     * @param massage
     */
    @RabbitListener(
            bindings =
                    {
                            @QueueBinding(value = @Queue(value = RabbitConfig.FANOUT_QUEUE_NAME, durable = "true"),
                                    exchange = @Exchange(value = RabbitConfig.TEST_FANOUT_EXCHANGE, type = "fanout"))
                    })
    @RabbitHandler
    public void processFanoutMsg(Message massage) {
        String msg = new String(massage.getBody(), StandardCharsets.UTF_8);
        log.info("从广播类型的交换机绑定的队列中消费消息success, message : {}" + msg);
    }


    /**
     * 从跟广播类型的交换机绑定的队列中消费消息
     * @param massage
     */
    @RabbitListener(
            bindings =
                    {
                            @QueueBinding(value = @Queue(value = RabbitConfig.FANOUT_QUEUE_NAME1, durable = "true"),
                                    exchange = @Exchange(value = RabbitConfig.TEST_FANOUT_EXCHANGE, type = "fanout"))

                    })
    @RabbitHandler
    public void processFanout1Msg(Message massage) {
        String msg = new String(massage.getBody(), StandardCharsets.UTF_8);
        log.info("从广播类型的交换机绑定的队列中消费消息success, message : {}" , msg);
    }

    /**
     * 通过直连类型的交换机绑定的消息队列中消费消息
     * @param massage
     */
    @RabbitListener(
            bindings =
                    {
                            @QueueBinding(value = @Queue(value = RabbitConfig.DIRECT_QUEUE_NAME, durable = "true"),
                                    exchange = @Exchange(value = RabbitConfig.TEST_DIRECT_EXCHANGE),
                                    key = RabbitConfig.DIRECT_ROUTINGKEY)
                    })
    @RabbitHandler
    public void processDirectMsg(Message massage) {
        String msg = new String(massage.getBody(), StandardCharsets.UTF_8);
        log.info("从直连类型的交换机绑定的消息队列中消费消息success, message : {}" , msg);
    }


    /**
     * 通过通配符类型的交换机绑定的消息队列中消费消息
     * @param massage
     */
    @RabbitListener(
            bindings =
                    {
                            @QueueBinding(value = @Queue(value = RabbitConfig.TOPIC_QUEUE_NAME, durable = "true"),
                                    exchange = @Exchange(value = RabbitConfig.TEST_TOPIC_EXCHANGE, type = "topic"),
                                    key = RabbitConfig.TOPIC_ROUTINGKEY)
                    })
    @RabbitHandler
    public void processTopicMsg(Message massage) {
        String msg = new String(massage.getBody(), StandardCharsets.UTF_8);
        log.info("从通配符类型的交换机绑定的消息队列中消费消息success, message : {}" + msg);
    }

}


6 .controller

package com.example.demo.rabbitmq.controller;

import com.example.demo.rabbitmq.config.RabbitConfig;
import com.example.demo.rabbitmq.rabbitmq.MsgProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @program: demo-rabbitmq
 * @description
 * @author: tina.liu
 * @create: 2020-05-16 21:49
 **/
@RestController
public class TestController {

    //注入消息生成者
    @Autowired
    private MsgProducer msgProducer;

    /**
     * 测试广播类型的交换器 消息队列
     */
    @GetMapping("/test/fanout")
    public String Test1(){
        msgProducer.send2FanoutTestQueue("借助广播类型的交换机 队列传递信息");
        return "利用广播类型的交换机绑定队列发送消息";
    }

    /**
     * 测试直连类型的交换机 消息队列
     */
    @GetMapping("/test/direct")
    public String Test2(){
        msgProducer.send2DirectTestQueue("借助直连类型的交换机 队列传递信息");
        return "利用直连类型的交换机绑定队列发送消息";
    }

    /**
     * 测试通配符交换机 消息队列
     */
    @GetMapping(value = "/test/topic/A")
    private  String test3(){
        msgProducer.send2TopicTestAQueue("借助通配符类型的交换机A,队列传递信息");
        return "利用通配符类型的交换机绑定队列发送消息A";
    }

    @GetMapping(value = "/test/topic/B")
    private  String test4(){
        msgProducer.send2TopicTestBQueue("借助通配符类型的交换机B,队列传递信息");
        return "利用通配符类型的交换机绑定队列发送消息B";
    }

}



相关文章

网友评论

      本文标题:Spring boot 集成rabbitmq

      本文链接:https://www.haomeiwen.com/subject/qtwjohtx.html