美文网首页
RabbitMQ学习笔记

RabbitMQ学习笔记

作者: TomDwan | 来源:发表于2018-12-10 15:11 被阅读0次

四种交换机类型

  • fanout   它会将发送到该交换机的消息路由到所有与该交互机存在binding的队列中,无视RoutingKey

  • direct  把消费发送到RoutingKey与BindingKey完全匹配的队列中。

  • topic   类似direct交换机,但是BindingKey可以是模糊匹配的规则

    * 代表任意个单词(可为零)

    # 代表只有一个单词

  • headers   性能较差,一般不使用

重名交换机或队列

生产者和消费者都可以声明一个队列。如果声明一个已存在的交换机或队列,只要声明的参数完全匹配现存的交换机或队列,则RabbitMQ什么都不做,并成功返回。否则,将会抛出异常。

声明交换机

声明交换机时有两个参数需要注意

  • durable: 是否持久化。 持久化可以讲交换机保存在磁盘中,服务器重启后不会丢失信息。

  • autoDelete: 是否自动删除。自动删除的前提是至少有一个交换机或对队列与这个交换机绑定,之后所有的交换机和队列与这个交换机解绑。

声明队列

  • durable: 是否持久化。 持久化可以讲交换机保存在磁盘中,服务器重启后不会丢失信息。

  • exclusive: 是否排他。如果一个队列被声明为排他队列,则该队列仅对首次声明它的连接可见,并且在连接断开的时候自动删除。这里需要注意三点:排他队列是基于连接(connection)可见的,同一个连接的不同信道(channel)可以访问同一连接创建的排他队列。“首次” 是指如果体格连接已经创建了一个排他队列,则其他连接不允许再创建同名的排他队列。即使该排他队列是持久化的(durable=true),一点连接关闭或客户端退出,该队列就会被删除。

  • autoDelete: 是否自动删除。自动删除的前提是:至少有一个消费者连接到该队列,之后所有的消费者与这个队列的连接都断开时,才会自动删除。

queueDeclarePassive

Queue.DeclareOk queueDeclarePassive(String queue) throws IOException;

这个方法用来检测队列是否已经存在。如果存在则正常返回,不存在则抛异常:404 channel exception 同时channel也会被关闭。

未确认的消息

RabbitMQ不会为未确认的消息设置过期时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者是否断开连接,这么设计的原因是RabbitMQ需要消费者话很久的时间来处理这条消息。

mandatory参数

当mandatory参数设置为true的时候,若交换机无法根据自己的类型和路由键找到合适的队列,那么RabbitMQ会调用basic.return命令将该消息返回给生产者。当mandatory设置为false时,该消息将会被直接丢弃。

在springboot整合RabbitMQ的starter中,对应的方法为RabbitTempalte.ReturnCallback接口的returnedMessage方法。

注意:当找不到队里时才会调用这个returnedMessage方法。当交换机找不到时,会直接到RabbitTempalte.ConfirmCallback的confirm方法

消息的过期时间

针对队列中的所有消息设置过期时间

@Bean
public Queue queue() {
    Map<String,Object> args = new HashMap<>();
    args.put("x-message-ttl","100");// 设置队列中消息的过期时间,单位毫秒
    return new Queue(QUEUE_1, true,false,false,args);
}

针对单个消息设置过期时间

public void sendDirectAck(){
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    MessageProperties messageProperties = new MessageProperties();
    // 设置消息的过期时间
    messageProperties.setExpiration("1000"); //设置每条消息具体的过期时间 单位毫秒
    Message message = new Message(user1.toString().getBytes(),messageProperties);// 不要在意字节数组的细节
    rabbitTemplate.convertAndSend(RabbitmqConfig.TOPIC_EXCHANGE,RabbitmqConfig.ROUTING_KEY1,message,correlationData);
}

死信队列

DLX, Dead-Letter-Exchange。利用DLX, 当消息在一个队列中变成死信(dead message)之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX。消息变成死信一向有一下几种情况:

  • 消息被拒绝(basic.reject/ basic.nack)并且requeue=false
  • 消息TTL过期
  • 队列达到最大长度

DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性,当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列,可以监听这个队列中消息做相应的处理,这个特性可以弥补RabbitMQ 3.0以前支持的immediate参数的功能。

核心代码实现:通过在queueDeclare方法中加入“x-dead-letter-exchange”实现。

channel.exchangeDeclare("some.exchange.name", "direct");

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "some.exchange.name");
channel.queueDeclare("myqueue", false, false, false, args);

你也可以为这个DLX指定routing key,如果没有特殊指定,则使用原队列的routing key

args.put("x-dead-letter-routing-key", "some-routing-key");

延迟队列

延迟队列存储的对象肯定是对应的延迟消息,所谓”延迟消息”是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费。

场景一:在订单系统中,一个用户下单之后通常有30分钟的时间进行支付,如果30分钟之内没有支付成功,那么这个订单将进行一场处理。这是就可以使用延迟队列将订单信息发送到延迟队列。

场景二:用户希望通过手机远程遥控家里的智能设备在指定的时间进行工作。这时候就可以将用户指令发送到延迟队列,当指令设定的时间到了再将指令推送到只能设备。


延迟队列

消息的持久化

// 设置消息持久化 (默认是持久化的)    
messageProperties.setDeliveryMode(MessageProperties.DEFAULT_DELIVERY_MODE);

如果队列没有设置持久化,那么及时消息设置了持久化,重启后消息依旧会消失

持久化会严重影响rabbitmq的性能

消息的推拉模式

在rabbitmq中支持两种消息处理的模式,一种是订阅模式(也叫push模式),由broker主动将消息推送给订阅队列的消费者;另一种是检索模式(也叫pull模式),需要消费者调用channel.basicGet方法,主动从队列中拉取消息。

订阅模式(push)

订阅模式接收消息是最有效的一种消息处理方式,当消息到达broker时,broker会自动将消息投递给匹配的消费者,而不需要消费端手动去拉取。在同一个通道channel中,每个消费者Consumer都有着不同的consumer-tag标识,这个标识可以是客户端指定,也可以由broker服务端自动生成(如果客户端手动指定了,则以客户端的为准,如果没有指定则由服务端自动生成)。

检索模式(pull)

通过使用Channel.basicGet显示拉取消息,返回的数据类型是GetResponse实例。

消息分发

多个消费者订阅同一个队列,这时候队列中的消息会采用轮询(Round-Robin)的方式发送给消费者,即每个消息只会有一些消费者来处理,避免消息的重复消费。

但这种模式有一个潜在的问题,就说如果消费者A处理消息的速度很快,而B处理得很慢。采用轮询分发的时候有可能出现A消费者处理空闲,而B消费者却出现消息堆积的问题。

此时可以在消费者端调用channel.basicQos(...)方法指定每个消费者未确认的消息的数量(只在推(push)模式下有效

此方法有三个重载

void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;

void basicQos(int prefetchCount, boolean global) throws IOException;

void basicQos(int prefetchCount) throws IOException;

关于 global和prefetch的含义,参考官方文档Consumer Prefetch

Unfortunately the channel is not the ideal scope for this - since a single channel may consume from multiple queues, the channel and the queue(s) need to coordinate with each other for every message sent to ensure they don't go over the limit. This is slow on a single machine, and very slow when consuming across a cluster.

Channel channel = ...;
Consumer consumer1 = ...;
Consumer consumer2 = ...;
channel.basicQos(10); // Per consumer limit
channel.basicConsume("my-queue1", false, consumer1);
channel.basicConsume("my-queue2", false, consumer2);

当单个channel有多个消费者时,协调两个消费者不超过limit会严重影响性能。

Therefore RabbitMQ redefines the meaning of the global flag in the basic.qos method:

global Meaning of prefetch_count in AMQP 0-9-1 Meaning of prefetch_count in RabbitMQ
false shared across all consumers on the channel applied separately to each new consumer on the channel
true shared across all consumers on the connection shared across all consumers on the channel

可以设置global=false,是每个消费者独享消息数量的限制(默认即为false)

相关文章

  • RabbitMQ 学习笔记:安装

    RabbitMQ 学习笔记系列 上一系列:MySQL 学习实践 RabbitMQ 学习笔记:安装 RabbitMQ...

  • RabbitMQ 问答式总结

    RabbitMQ的学习笔记 关于RabbitMQ的几个角色如下: 关于名词的通俗解析: 首先我们肯定知道Rabbi...

  • 统一配置

    spring cloud学习笔记 上文 学习使用Docker下文 RabbitMQ的基本使用 由于 spring ...

  • RabbitMQ学习笔记

    RabbitMQ 简介 MQ 消息队列,上承生产者,下接消费者。从生产者侧获取消息,然后将消息转发给消费者。由此可...

  • RabbitMQ 学习笔记

    RabbitMQ 是一个支持多种消息传递协议的消息代理, 支持 AMQP(一个具有强大路由功能的开放式连接协议),...

  • rabbitmq学习笔记

    在Window7下安装rabbitmq 下载 otp_win64_17.3.exe 和 rabbitmq-serv...

  • Rabbitmq学习笔记

    什么叫消息队列 消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂...

  • RabbitMQ学习笔记

    本文作者:陈进坚个人博客:https://jian1098.github.io[https://jian1098....

  • RabbitMQ学习笔记

    安装 追加内容export PATH=$PATH:/data/install/erlang/bin

  • RabbitMQ学习笔记

    RabbitMQ简介 RabbitMQ是一个由erlang开发的AMQP(Advanved Mesaage Que...

网友评论

      本文标题:RabbitMQ学习笔记

      本文链接:https://www.haomeiwen.com/subject/nworhqtx.html