美文网首页
1-4 RabbitMQ-Boot整合

1-4 RabbitMQ-Boot整合

作者: Finlay_Li | 来源:发表于2020-07-08 18:38 被阅读0次

    概念介绍:队列、绑定、虚拟主机、消息

    Binding基本概念

    • Exchange和Exchange、Queue之间的绑定关系。
    • Binding中可以包含Routing Key或者参数。

    Queue基本概念

    • 消息队列,实际存储消息数据。
    • Durability:是否持久化。Durable:是,Transient:否。
    • Auto Delete:如果选true,代表当最后一个监听被移除之后,该Queue会自动删除。

    Message基本概念

    • Message是服务器和应用程序之前传送的数据。
    • Message本质上就是一段数据,由Properties和Payload(Body)组成。

    Message的属性

    private String contentType; 
    private String contentEncoding;
    private Map<String,Object> headers; // 自定义属性
    private Integer deliveryMode;   // 消息的送达模式
    private Integer priority;   // 消息的优先级
    private String correlationId;   // 消息的唯一ID
    private String replyTo; // 消息失败返回的队列
    private String expiration;  // 消息的过期时间
    private String messageId;   // 消息的ID
    private Date timestamp; // 时间戳
    private String type; 
    private String userId;
    private String appId;
    private String clusterId;
    

    依赖

    BOOT 2.0.2RELEASE

    <!--spring-rabbit和amqp-client不能同时存在,否则会出现class引用错误-->
    <!--<dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>3.7.12</version>
    </dependency>-->
    
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    

    配置连接

    #连接工厂
    spring.rabbitmq.addresses=zzz:5677,xxxx:5678
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=qwg-rabbitmq@guest
    #虚拟机
    spring.rabbitmq.virtual-host=qwg-app-dev
    #------------生产端
    # 开启:消息确认
    spring.rabbitmq.publisherConfirms= true
    # 开启:路由不可达的消息返回
    spring.rabbitmq.publisher-returns= true
    # 设置true 监听器会收到:路由不可达的消息,从而可对路由不可达的消息进行处理,保证消息的路由成功
    spring.rabbitmq.template.mandatory= true
    #------------消费端
    # 手工签收
    spring.rabbitmq.listener.simple.acknowledge-mode=manual
    

    声明队列

    public interface MqQueue {
        String HELLO_MQ = "hello_mq";
    }
    
    @Configuration
    public class MqQueueConfiguration {
    
        @Bean
        public Queue helloQueue() {
            //持久化队列
            return new Queue(MqQueue.HELLO_MQ, true);
        }
    }
    

    生产者

    @Component
    public class MqSender {
    
    //使用spring amqp提供的模板,完成消息发送
        @Autowired
        private AmqpTemplate rabbitTemplate;
    
        public void send() {
            String msg = "Hello RabbitMq";
            rabbitTemplate.convertAndSend(MqQueue.HELLO_MQ, msg);
        }
    }
    

    消费者

    //自动监听
    @Component
    public class MqReceiver {
    
     /*@RabbitListener
        声明监听者
    
        @RabbitHandler 
        声明消费消息的方法*/
    
        @RabbitHandler
        @RabbitListener(queues = MqQueue.HELLO_MQ)
        public void receiver(String msg, @Header(AmqpHeaders.DELIVERY_TAG) long tag, Channel channel) throws IOException {
            System.out.println("Receiver ----------------- 消费消息: " + msg);
            //手动确认消费完成
            channel.basicAck(tag, false);
        }
    }
    

    AMQP default

    为什么我们没有声明交换机,却消费成功?RabbitMQ是必须要交换机的呀~

    image.png

    相关文章

      网友评论

          本文标题:1-4 RabbitMQ-Boot整合

          本文链接:https://www.haomeiwen.com/subject/doaccktx.html