RabbitMQ :Spring AMQP 快速入门

作者: 聪明的奇瑞 | 来源:发表于2018-02-05 10:49 被阅读254次
    • Spring Boot 对 AMQP 协议的消息队列产品提供了良好的支持,spring-amqp 模块对 AMQP 协议的一个抽象和封装,目的是为了简化 AMQP 协议的消息队列框架的使用

    确保 RabbitMQ 正常运行

    启动 RabbitMQ

    /sbin/rabbitmq-server -detached
    

    添加两个新用户

    /sbin/rabbitmqctl add_user producer 123456
    /sbin/rabbitmqctl add_user consumer 123456
    

    为用户赋予权限(否则会出现401权限错误)

    /sbin/rabbitmqctl set_permissions producer '.*' '.*' '.*'
    /sbin/rabbitmqctl set_permissions consumer '.*' '.*' '.*'
    

    结合 Spring Boot

    引入 spring-boot-starter-amqp 依赖

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    

    消息生产者

    添加 RabbitMQ 配置属性

    server:
      port: 8003
    spring:
      rabbitmq:
        host: localhost
        username: producer
        password: 123456
        port: 5672
      application:
        name: producer       
    

    RabbitMQ 配置类

    @Configuration
    public class RabbitMQConfig {
        //声明队列
        @Bean
        public Queue queue1() {
            return new Queue("hello.queue1", true);     // true表示持久化该队列
        }
    
        @Bean
        public Queue queue2() {
            return new Queue("hello.queue2", true);
        }
    
        //声明交互器
        @Bean
        public TopicExchange topicExchange() {
            return new TopicExchange("topicExchange");
        }
    
        //绑定
        @Bean
        public Binding binding1() {
            return BindingBuilder.bind(queue1()).to(topicExchange()).with("key.1");
        }
    
        @Bean
        public Binding binding2() {
            return BindingBuilder.bind(queue2()).to(topicExchange()).with("key.#");
        }
    }
    

    消息生产者类

    @Component
    public class Sender {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
        
        //发送消息
        public void send(String msg){
            CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
            System.out.println("开始发送消息 : " + msg.toLowerCase());
            rabbitTemplate.convertSendAndReceive("topicExchange", "key.1", msg, correlationId);
            System.out.println("结束发送消息 : " + msg.toLowerCase());
        }
    }
    

    测试发送消息

    @RunWith(SpringRunner.class)
    @SpringBootTest(classes = ProducerApplication.class)
    public class SenderTest {
        @Autowired
        private Sender sender;
    
        @Test
        public void sender(){
            sender.send("hello");
        }
    }
    

    RabbitMQ 查看队列与消息

    • 访问 RabbitMQ 图形管理界面,可看见已创建两个持久的队列 hello.queue1、hello.queue2
    uhTI2.png
    • 点击进 hello.queue1、hello.queue2 队列,通过 Get messages 获取消息
    uhm9a.png

    消息消费者

    添加 RabbitMQ 配置属性

    server:
      port: 8004
    spring:
      rabbitmq:
        host: localhost
        username: consumer
        password: 123456
        port: 5672
        listener:
          simple:
            concurrency: 2        #最小消息监听线程数
            max-concurrency: 2    #最大消息监听线程数
    

    消息监听类,通过 @RabbitListener 注解监听队列

    @Component
    public class Receiver {
    
        @RabbitListener(queues = "hello.queue1")
        public String processMessage1(String msg) {
            System.out.println(Thread.currentThread().getName() + " 接收到来自hello.queue1队列的消息:" + msg);
            return msg.toUpperCase();
        }
    
        @RabbitListener(queues = "hello.queue2")
        public void processMessage2(String msg) {
            System.out.println(Thread.currentThread().getName() + " 接收到来自hello.queue2队列的消息:" + msg);
        }
    
    }
    
    

    测试

    • 启动消息消费者服务
    • 通过消息生产者生产消息
    • RabbitMQ 会将消息分发给消息消费者,可在消息消费者服务控制台看到消息

    相关文章

      网友评论

        本文标题:RabbitMQ :Spring AMQP 快速入门

        本文链接:https://www.haomeiwen.com/subject/pmbezxtx.html