书中以java语言作为示例代码,我以go语言来实现有必要记录的示例
一、设置消息过期时间
- 通过声明队列时设置x-message-ttl(毫秒单位),
设置为0必须立即投递给消费者,不然就会被立即丢弃
.即给queueDeclare的args添加参数(消息过期,立即从队列抹去,因为过期的消息肯定在队列的头部,所以很好判断
) - 通过调用amqp.Publishing时设置Expiration参数,(
消息过期不会立即从队列抹去,因为每条消息是否过去是在即将投递给消费者之前判断的,每条消息的过期时间不同,要删除所有过期消息势必要扫描整个队列,所以不如等到被消费时在做判断
)
二、设置度列的TTL
x-expires
:毫秒单位,不能设置为0,设置为1000即在一秒中内未被使用
则会被删除.
三、死信队列(DLX):
也可以被当做延迟队列
使用.
消息变成私信:
- 消息被拒绝,(reject/nack),并设置requeue参数为false.
- 消息过期.
- 队列达到最大长度.
queueDeclare设置x-dead-letter-exchange参数
func (r *RabbitMQ) DlxQueue() {
//1.尝试创建交换机
r.channel.ExchangeDeclare("exchange.dlx", "direct", true, false, false, false, nil, )
r.channel.ExchangeDeclare("exchange.normal", "fanout", true, false, false, false, nil, )
args := make(amqp.Table)
args["x-message-ttl"] = 10000
args["x-dead-letter-routing-key"] = "routingKey"
args["x-dead-letter-exchange"] = "exchange.dlx"
r.channel.QueueDeclare("queue.normal", true, false, false, false, args)
r.channel.QueueBind("queue.normal", "", "exchange.normal", false, nil)
r.channel.QueueDeclare("queue.dlx", true, false, false, false, nil)
r.channel.QueueBind("queue.dlx", "routingKey", "exchange.dlx", false, nil)
r.channel.Publish("exchange.normal", "rk", false, false, []byte("dlx"))
}
被投递到queue.normal的消息过了10s未被消费,会被交给exchange.dlx,这是exchange.dlx与queue.dlx匹配,消息最终被投递到了queue.dlx队列.
四、优先级队列
x-max-priority
:queueDeclare的args参数设置。另外在amqp.Publishing中也要设置Priority这个参数。如果消费者速度大于生产者速度,设置优先级没有意义
网友评论