美文网首页
RabbitMQ 死信队列

RabbitMQ 死信队列

作者: 567f84810acc | 来源:发表于2020-01-13 10:28 被阅读0次

    死信队列

    "死信"模式 指的是,当消费者不能处理接收到的消息时,将这个消息重新发布到另外一个队列中,等待重试或者人工干预。

    • 消息被拒绝 (basic.reject or basic.nack) 且带 requeue=false 参数
    • 消息的TTL-存活时间已经过期
    • 队列长度限制被超越(队列满)

    x-max-length :限制队列的最大长度
    x-dead-letter-exchange:出现dead letter之后将dead letter重新发送到指定exchange
    x-dead-letter-routing-key:出现dead letter之后将dead letter重新按照指定的routing-key发送

    延迟消息使用

    通过死信队列实现

    形成条件:

    • 消息被丢弃 ? requeue=false (reject 并且不删除消息)
    • 消息的ttl 到期
    • 超过队列设置长度

    设置队列参数

    Optional name of an exchange to which messages will be republished if they are rejected or expire.
    实际上,官方用词 "rejected" 应该被翻译成 : "丢弃"或者"抛弃",而不是"拒绝",也就是说,这里的 rejected 和 expire 是队列里面的消息的状态.而不是队列的动作.
    
    • 🌟 - x-overflow:String 支持参数 drop-head 默认丢弃 /reject-publish 拒绝消息 请使用 默认参数

    • x-dead-letter-exchange --> 重新发送到指定exchange

    • x-dead-letter-routing-key --> 按照指定的routing-key发送

    发送消息

    • x-message-ttl:Number ,发布的消息在队列中存在多长时间后被取消(单位毫秒)此配置项针对队列使用

    • Properties 针对消息使用

    content_type
    content_encoding
    priority
    correlation_id
    reply_to
    expiration //过期时间 毫秒 5000 --->> 5s
    message_id
    timestamp
    type
    user_id
    app_id
    cluster_id
    
    # 使用 bschmitt/laravel-amqp
    
    \Amqp::publish('routing-key', 'message' , [
          'vhost'=>'/test',
        'exchange_type' => 'direct',
        'exchange' => 'test.direct',
        'exchange_durable'=>true, //交换机 持久化
        'exchange_internal'=>false, //内部使用
        'exchange_auto_delete'  => false,
        'exchange_properties'=>[
    
        ],
        'queue_force_declare' => true, //强制持久化 队列持久 的话 交换机也要持久化 否则无法连接
        'queue_exclusive' => true, // 排他性 只对当前连接可见
        'queue_auto_delete'     => false,
        'queue' => 'test_queue_name',
        'queue_properties'=>[
            'x-message-ttl' => ['I',30000], //I 看上一片下面类型介绍 30s 过期
            'x-dead-letter-exchange'=>'ttt.exchange',
            'x-dead-letter-routing-key'=>'ttt_bk'
        ]
    ]);
    

    接收消息

    ACK && REJECT
        /**
         * Acknowledges a message
         *
         * @param AMQPMessage $message
         */
        public function acknowledge(AMQPMessage $message)
        {
            $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
            if ($message->body === 'quit') {
                $message->delivery_info['channel']->basic_cancel($message->delivery_info['consumer_tag']);
            }
        }
        /**
         * Rejects a message and requeues it if wanted (default: false)
         *
         * @param AMQPMessage $message
         * @param bool    $requeue
         */
        public function reject(AMQPMessage $message, $requeue = false)
        {
            $message->delivery_info['channel']->basic_reject($message->delivery_info['delivery_tag'], $requeue);
        }
    
    -------------------------------------------------
    \Amqp::consume('ttt_bk', function ($message, $resolver) {
        var_dump($message->body);
        $resolver->acknowledge($message); //ack
        # $resolver->reject($message,true|false); // Rejects a message and requeues it if wanted (default: false)  拒绝消息
    }, [
        'exchange' => 'ttt.exchange',
        'exchange_type' => 'fanout',
        'queue_force_declare' => true,
         .... 参数和发布的一样 省略。。。。
        'persistent' => true
    ]);
    

    然后 消费者 指定 x-dead-letter-exchange 和 routing-key 进行消费处理

    使用注意

    rabbitmq 的延迟队列是针对队列的延迟  单条消息的过期时间 是按照顺序执行的 无法保证消息不同粒度的过期时间同时过期
    

    redis 实现简单延迟队列

    方案一:
    zset 有序集合 socre 对应执行时间戳
    
    每s 扫描出 小于当前的的任务 
    
    缺点 : 效率较慢  针对数据量比较大的情况 延迟高 
    ------------------------------------------------
    方案二:
        监听redis 的key过期删除事件
        redis 文档表示 key的过期时间 ttl 为0 不代表立即删除 
    
    

    来源 : www.alonexy.com

    相关文章

      网友评论

          本文标题:RabbitMQ 死信队列

          本文链接:https://www.haomeiwen.com/subject/ehegactx.html