美文网首页
php RabbitMQ重试

php RabbitMQ重试

作者: 江河湖海琴瑟琵琶 | 来源:发表于2020-11-25 14:48 被阅读0次

    PHP当消费者抛出异常,代表消费失败
    使用nack(true)reject(true),可以让消息重回队列(队列头),紧接着再次消费,再失败...造成死循环,队列中其他的数据也无法消费.

    消费者
    $callback = function(AMQPMessage $msg){
            try{
                //业务逻辑
                throw new \Exception('消费失败');
            }catch (\Exception $e) {
                echo $e->getMessage().PHP_EOL;
                //重新入队
                $msg->nack(true);
            }
        }
    
    死循环的现象 image.png

    所以需要记录重试的次数,比如重试3次,仍失败,就做特殊处理.

    1. 生产者发送消息时,增加字段,名字随意,这里叫retry,初始值0
    2. 消费失败时,捕获异常,判断retry的值,小于3次,retry++重新入队,大于3次走其他逻辑(人工处理)

    下边是PHP代码部分.网上找不到文档,只能看源码了.

    生产者发送消息时,增加字段retry,关键是如何设置,设置在哪.

    创建AMQPMessage对象

    use PhpAmqpLib\Connection\AMQPStreamConnection;
    use PhpAmqpLib\Message\AMQPMessage;
    use PhpAmqpLib\Wire\AMQPTable;
    
    两个参数的构造方法,
    第一个参数String类型,消息体
    第二个参数Array类型,属性
    $message = new AMQPMessage('hello world',array());
    

    最麻烦的是,第二个参数传什么值,具体怎么设置,没找到文档,查看AMQPMessage的源码.

    AMQPMessage.png
    AMQPMessage类的构造函数,第二个参数$properties是一个数组.
    当我们传入数组时,它通过求数组交集的操作,限制了我们可以设置的内容.
    其中有个application_headers,他的值是table_object.就是AMQPTable类,所以还需要一个AMQPTable对象.
    该对象的构造函数可以直接传入任意数组.具体可以自己看下源码.
    所以生产者设置retry字段应该是下边这样子,可以随便设置
    use PhpAmqpLib\Connection\AMQPStreamConnection;
    use PhpAmqpLib\Message\AMQPMessage;
    use PhpAmqpLib\Wire\AMQPTable;
    
    两个参数的构造方法,
    第一个参数String类型,消息体
    第二个参数Array类型,属性
    $message = new AMQPMessage('hello world',
            [
                'application_headers'=>new AMQPTable(['retry'=>0,'test'=>888]),//自定义的
                'timestamp'=>time(),//内置的
                'content_type'=>'text',//内置的
            ]
        );
    

    然后正常发送消息就行了.在管理后台取出消息,就能看到我们设置的retry字段

    image.png

    最关键的,消费者判断retry次数,重新发送

    这里仅展示回调消费者的回调函数
    关键点是借助getNativeData()方法获取信息,判断重试次数.
    重试次数+1重新发送消息到原队列

    $callback = function(AMQPMessage $msg){
            try{
                //业务逻辑
                throw new \Exception('消费失败');
            }catch (\Exception $e){
    
                $exchange      = $msg->getExchange();
                $routingKey    = $msg->getRoutingKey();
                $channel       = $msg->getChannel();
                $body          = $msg->getBody();
                //headersObject 是一个AMQPTable对象
                $headersObject = $msg->get_properties()['application_headers'];
                //调用getNativeData()得到一个数组
                $headersArray  = $headersObject->getNativeData();
    
                if($headersArray['retry'] < 3){
                    $headersArray['retry']++;//次数+1
                    echo '第'.$headersArray['retry'].'次失败,消息重新入队'.PHP_EOL;
                    $channel->basic_publish(
                        new AMQPMessage($body,['application_headers'=>new AMQPTable($headersArray)]),
                        $exchange,
                        $routingKey
                    );
    
                    //回复server,表示处理成功.
                    //实际上消费者失败了,但是我们把消息重新发送给队列
                    //所以这里可以认为处理成功
                    $msg->ack();
    
                }else{
                    //TODO 超过三次,自己实现业务逻辑
                    echo '失败次数过多,直接丢弃,可以自己决定如何处理'.PHP_EOL;
                    var_dump($headersArray);
                    $msg->ack();
                }
            }
        };
    
    
    运行看效果 image.png 队列为空,消息确实被丢弃掉了 image.png

    相关文章

      网友评论

          本文标题:php RabbitMQ重试

          本文链接:https://www.haomeiwen.com/subject/laogiktx.html