美文网首页
SpringBoot 整合RabbitMQ

SpringBoot 整合RabbitMQ

作者: 月弦 | 来源:发表于2019-11-25 09:49 被阅读0次

    前言

    • 消息队列(Message Queue),是分布式系统中重要的组件,其通用的使用场景可以简单地描述为:当不需要立即获得结果,但是并发量又需要进行控制的时候,差不多就是需要使用消息队列的时候。消息队列主要解决了应用耦合、异步处理、流量削锋等问题。
    • RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。AMQP :Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

    一、RabbitMQ安装

    由于RabbitMQ是基于erlang的,所以,在正式安装RabbitMQ之前,需要先安装一下erlang。具体安装过程可参考windows10环境下的RabbitMQ安装步骤(图文)

    二、pom.xml文件

    这里是通过idea直接勾选得到的。

        <properties>
            <java.version>1.8</java.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-devtools</artifactId>
                <scope>runtime</scope>
                <optional>true</optional>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
                <exclusions>
                    <exclusion>
                        <groupId>org.junit.vintage</groupId>
                        <artifactId>junit-vintage-engine</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>org.springframework.amqp</groupId>
                <artifactId>spring-rabbit-test</artifactId>
                <scope>test</scope>
            </dependency>
        </dependencies>
    

    三、配置文件

    配置rabbitMQ的地址。

    server:
      port: 8080
    spring:
      rabbitmq:
        host: localhost
        port: 5672
        username: guest
        password: guest
    

    四、使用RabbitMQ

    • 简单使用

    1. 队列配置
    @Configuration
    public class RabbitConfig {
    
        @Bean
        public Queue helloQueue() {
            return new Queue("hello");
        }
    }
    
    1. 发送者
    @Component
    public class HelloSender {
    
        @Autowired
        private AmqpTemplate rabbitTemplate;
    
        public void send() {
            String context = "hello " + new Date();
            System.out.println("Sender : " + context);
            this.rabbitTemplate.convertAndSend("hello", context);
        }
    }
    
    1. 接收者
    @Component
    @RabbitListener(queues = "hello")
    public class HelloReceiver {
    
        @RabbitHandler
        public void process(String hello) {
            System.out.println("Receiver  : " + hello);
        }
    }
    
    1. controller类
    @RestController
    public class TestController {
    
        @Autowired
        private HelloSender helloSender;
    
        @GetMapping("hello")
        public String helloTest() {
            helloSender.send();
            return "success";
        }
    }
    
    • 一对多

    1. 队列配置
    @Configuration
    public class RabbitConfig {
    
        @Bean
        public Queue oneQueue() {
            return new Queue("oneQueue");
        }
    }
    
    1. 发送者
    @Component
    public class OneSender {
    
        @Autowired
        private AmqpTemplate amqpTemplate;
    
        public void send(int i) {
            String context = "spirng boot oneQueue queue"+" ****** "+i;
            System.out.println("Sender1" + context);
            this.amqpTemplate.convertAndSend("oneQueue", context);
        }
    }
    
    1. 接收者
      两个接收者。
    @Component
    @RabbitListener(queues = "oneQueue")
    public class OneReceiver1 {
    
        @RabbitHandler
        public void process(String Str) {
            System.out.println("Receiver1:" + Str);
        }
    }
    
    @Component
    @RabbitListener(queues = "oneQueue")
    public class OneReceiver2 {
    
        @RabbitHandler
        public void process(String Str) {
            System.out.println("Receiver2:" + Str);
        }
    }
    
    1. controller类
    @RestController
    public class TestController {
        @Autowired
        private OneSender oneSender;
    
        @GetMapping("oneToMany")
        public String oneToManyTest() {
            for (int i = 0; i < 10; i++) {
                oneSender.send(i);
            }
            return "success";
        }
    }
    
    1. 总结
      一个发送者,N个接受者,经过测试会均匀的将消息发送到N个接收者中。
    • 多对多

    1. 队列配置
    @Configuration
    public class RabbitConfig {
        @Bean
        public Queue manyQueue() {
            return new Queue("manyQueue");
        }
    }
    
    1. 发送者
    @Component
    public class ManySender1 {
    
        @Autowired
        private AmqpTemplate amqpTemplate;
    
        public void send(int i) {
            String context = i + "";
            System.out.println("Sender1: " + context + "--send:");
            amqpTemplate.convertAndSend("manyQueue", context);
        }
    }
    
    @Component
    public class ManySender2 {
    
        @Autowired
        private AmqpTemplate amqpTemplate;
    
        public void send(int i) {
            String context = i + "";
            System.out.println("Sender1: " + context + "--send:");
            amqpTemplate.convertAndSend("manyQueue", context);
        }
    }
    
    1. 接收者
    @Component
    @RabbitListener(queues = "manyQueue")
    public class ManyReceiver1 {
    
        @RabbitHandler
        public void process(String Str) {
            System.out.println("Receiver1:" + Str);
        }
    }
    
    @Component
    @RabbitListener(queues = "manyQueue")
    public class ManyReceiver2 {
    
        @RabbitHandler
        public void process(String Str) {
            System.out.println("Receiver2:" + Str);
        }
    }
    
    1. controller类
    @RestController
    public class TestController {
        @Autowired
        private ManySender1 manySender1;
        @Autowired
        private ManySender2 manySender2;
    
        @GetMapping("manyToMany")
        public String manyToManyTest() {
            for (int i = 0; i < 10; i++) {
                manySender1.send(i);
                manySender2.send(i);
            }
            return "success";
        }
    }
    
    1. 总结
      和一对多一样,接收端仍然会均匀接收到消息。
    • 对象的支持

    1. 队列配置
    @Configuration
    public class RabbitConfig {
        @Bean
        public Queue queue3() {
            return new Queue("object_queue");
        }
    }
    
    1. 实体类
    public class User implements Serializable {
        private String username;
        private String password;
    
        public User(String username, String password) {
            this.username = username;
            this.password = password;
        }
    
        public String getUsername() {
            return username;
        }
    
        public void setUsername(String username) {
            this.username = username;
        }
    
        public String getPassword() {
            return password;
        }
    
        public void setPassword(String password) {
            this.password = password;
        }
    
        @Override
        public String toString() {
            return "User{" +
                    "username='" + username + '\'' +
                    ", password='" + password + '\'' +
                    '}';
        }
    }
    
    1. 发送者
    @Component
    public class ObjectSender {
    
        @Autowired
        AmqpTemplate amqpTemplate;
    
        public void sendUser(User user) {
            System.out.println("Send object:" + user.toString());
            this.amqpTemplate.convertAndSend("object_queue", user);
        }
    }
    
    1. 接收者
    @Component
    @RabbitListener(queues = "object_queue")
    public class ObjectReceiver {
    
        @RabbitHandler
        public void objectReceiver(User user) {
    
            System.out.println("Receiver object:" + user.toString());
        }
    }
    
    1. controller类
    @RestController
    public class TestController {
        @Autowired
        private ObjectSender objectSender;
    
        @GetMapping("objectSender")
        public String objectSenderTest() {
            User user = new User("admin", "123456");
            objectSender.sendUser(user);
            return "success";
        }
    }
    
    • Topic Exchange

    1. 队列配置
    @Configuration
    public class TopicRabbitConfig {
    
        final static String message = "topic.message";
        final static String messages = "topic.messages";
    
        //创建两个 Queue
        @Bean
        public Queue queueMessage() {
            return new Queue(TopicRabbitConfig.message);
        }
    
        @Bean
        public Queue queueMessages() {
            return new Queue(TopicRabbitConfig.messages);
        }
    
        //配置 TopicExchange,指定名称为 topicExchange
        @Bean
        TopicExchange exchange() {
            return new TopicExchange("topicExchange");
        }
    
        //给队列绑定 exchange 和 routing_key
        @Bean
        Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
            return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
        }
    
        @Bean
        Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
            return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
        }
    }
    
    1. 发送者
    @Component
    public class TopicSender {
    
        @Autowired
        AmqpTemplate amqpTemplate;
    
        public void send1() {
            String context = "hi, i am message 1";
            System.out.println("Sender : " + context);
            amqpTemplate.convertAndSend("topicExchange", "topic.message", context);
        }
    
        public void send2() {
            String context = "hi, i am messages 2";
            System.out.println("Sender : " + context);
            amqpTemplate.convertAndSend("topicExchange", "topic.messages", context);
        }
    }
    
    1. 接收者
    @Component
    @RabbitListener(queues = "topic.message")
    public class TopicReceiver1 {
    
        @RabbitHandler
        public void process(String message){
            System.out.println("Receiver topic.message :"+ message);
        }
    }
    
    @Component
    @RabbitListener(queues = "topic.messages")
    public class TopicReceiver2 {
    
        @RabbitHandler
        public void process(String message){
            System.out.println("Receiver topic.messages: "+ message);
        }
    }
    
    1. controller类
    @RestController
    public class TestController {
        @Autowired
        private TopicSender topicSender;
    
        @GetMapping("topicSender")
        public String topicSenderTest() {
            topicSender.send1();
    //        topicSender.send2();
            return "success";
        }
    }
    
    1. 总结
      使用 queueMessages 同时匹配两个队列,queueMessage 只匹配 “topic.message” 队列。
    • Fanout Exchange

    1. 队列配置
    @Configuration
    public class FanoutRabbitConfig {
    
        //创建三个队列
        @Bean
        public Queue AMessage() {
            return new Queue("fanout.A");
        }
    
        @Bean
        public Queue BMessage() {
            return new Queue("fanout.B");
        }
    
        @Bean
        public Queue CMessage() {
            return new Queue("fanout.C");
        }
    
        //创建exchange,指定交换策略
        @Bean
        public FanoutExchange fanoutExchange() {
            return new FanoutExchange("fanoutExchange");
        }
    
        //分别给三个队列指定exchange,这里使用了A、B、C三个队列绑定到Fanout交换机上面,发送端的routing_key写任何字符都会被忽略:
        @Bean
        public Binding bindingExchangeA(Queue AMessage, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(AMessage).to(fanoutExchange);
        }
    
        @Bean
        public Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(BMessage).to(fanoutExchange);
        }
    
        @Bean
        Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(CMessage).to(fanoutExchange);
        }
    }
    
    1. 发送者
    @Component
    public class FanoutSender {
    
        @Autowired
        AmqpTemplate amqpTemplate;
    
        public void send(){
            String context = "hi, fanout msg ";
            System.out.println("Sender : " + context);
            //这里使用了A、B、C三个队列绑定到Fanout交换机上面,发送端的routing_key写任何字符都会被忽略:
            amqpTemplate.convertAndSend("fanoutExchange","", context);
        }
    }
    
    1. 接收者
    @Component
    @RabbitListener(queues = "fanout.A")
    public class FanoutReceiverA {
    
    
        @RabbitHandler
        public void process(String message){
    
            System.out.println("Receiver form fanout.A: "+message);
    
        }
    
    }
    
    @Component
    @RabbitListener(queues = "fanout.B")
    public class FanoutReceiverB {
    
    
        @RabbitHandler
        public void process(String message){
    
            System.out.println("Receiver form fanout.B: "+message);
    
        }
    
    }
    
    @Component
    @RabbitListener(queues = "fanout.C")
    public class FanoutReceiverC {
    
    
        @RabbitHandler
        public void process(String message){
    
            System.out.println("Receiver form fanout.C: "+message);
    
        }
    
    }
    
    1. controller类
    @RestController
    public class TestController {
        @Autowired
        private FanoutSender fanoutSender;
    
        @GetMapping("fanoutSender")
        public String fanoutSenderTest() {
            fanoutSender.send();
            return "success";
        }
    }
    
    1. 总结
      绑定到 fanout 交换机上面的队列都收到了消息。

    五、参考内容

    什么是消息队列?
    消息队列之 RabbitMQ
    消息队列及常见消息队列介绍

    相关文章

      网友评论

          本文标题:SpringBoot 整合RabbitMQ

          本文链接:https://www.haomeiwen.com/subject/oaqoictx.html