PHP当消费者抛出异常,代表消费失败
使用nack(true)
或reject(true)
,可以让消息重回队列(队列头),紧接着再次消费,再失败...造成死循环,队列中其他的数据也无法消费.
消费者
$callback = function(AMQPMessage $msg){
try{
//业务逻辑
throw new \Exception('消费失败');
}catch (\Exception $e) {
echo $e->getMessage().PHP_EOL;
//重新入队
$msg->nack(true);
}
}
死循环的现象
image.png
所以需要记录重试的次数,比如重试3次,仍失败,就做特殊处理.
- 生产者发送消息时,增加字段,名字随意,这里叫
retry
,初始值0
- 消费失败时,捕获异常,判断
retry
的值,小于3次,retry++
重新入队,大于3次走其他逻辑(人工处理)
下边是PHP代码部分.网上找不到文档,只能看源码了.
生产者发送消息时,增加字段
retry
,关键是如何设置,设置在哪.
创建AMQPMessage
对象
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
两个参数的构造方法,
第一个参数String类型,消息体
第二个参数Array类型,属性
$message = new AMQPMessage('hello world',array());
最麻烦的是,第二个参数传什么值,具体怎么设置,没找到文档,查看AMQPMessage
的源码.
AMQPMessage
类的构造函数,第二个参数$properties
是一个数组.当我们传入数组时,它通过求数组交集的操作,限制了我们可以设置的内容.
其中有个
application_headers
,他的值是table_object
.就是AMQPTable
类,所以还需要一个AMQPTable
对象.该对象的构造函数可以直接传入任意数组.具体可以自己看下源码.
所以生产者设置
retry
字段应该是下边这样子,可以随便设置
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
两个参数的构造方法,
第一个参数String类型,消息体
第二个参数Array类型,属性
$message = new AMQPMessage('hello world',
[
'application_headers'=>new AMQPTable(['retry'=>0,'test'=>888]),//自定义的
'timestamp'=>time(),//内置的
'content_type'=>'text',//内置的
]
);
然后正常发送消息就行了.在管理后台取出消息,就能看到我们设置的retry
字段
最关键的,消费者判断
retry
次数,重新发送
这里仅展示回调消费者的回调函数
关键点是借助getNativeData()
方法获取信息,判断重试次数.
重试次数+1重新发送消息到原队列
$callback = function(AMQPMessage $msg){
try{
//业务逻辑
throw new \Exception('消费失败');
}catch (\Exception $e){
$exchange = $msg->getExchange();
$routingKey = $msg->getRoutingKey();
$channel = $msg->getChannel();
$body = $msg->getBody();
//headersObject 是一个AMQPTable对象
$headersObject = $msg->get_properties()['application_headers'];
//调用getNativeData()得到一个数组
$headersArray = $headersObject->getNativeData();
if($headersArray['retry'] < 3){
$headersArray['retry']++;//次数+1
echo '第'.$headersArray['retry'].'次失败,消息重新入队'.PHP_EOL;
$channel->basic_publish(
new AMQPMessage($body,['application_headers'=>new AMQPTable($headersArray)]),
$exchange,
$routingKey
);
//回复server,表示处理成功.
//实际上消费者失败了,但是我们把消息重新发送给队列
//所以这里可以认为处理成功
$msg->ack();
}else{
//TODO 超过三次,自己实现业务逻辑
echo '失败次数过多,直接丢弃,可以自己决定如何处理'.PHP_EOL;
var_dump($headersArray);
$msg->ack();
}
}
};
运行看效果
image.png
队列为空,消息确实被丢弃掉了
image.png
网友评论