//生产者 代码【在构造函数中进行了连接操作 析构函数使用关闭连接操作】
public function taskQueue()
{
//durable true 开启队列持久化 第三个参数
$this->channel->queue_declare('task_queue', false, true, false, false);
$argv = ['a', 'b', 'c', 'd'];
$data =implode(' ', array_slice($argv, 1));
if (empty($data)) {
$data ="Hello World!";
}
$msg =new AMQPMessage(
$data,
array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
);
$this->channel->basic_publish($msg, '', 'task_queue');
echo ' [x] Sent ', $data, "\n";
}
//消费者代码 我这里使用的TP6框架的console
protected function execute(Input$input, Output$output)
{
$this->channel->queue_declare('task_queue', false, true, false, false); //durable true 开启队列持久化 第三个参数
echo " [*] Waiting for messages. To exit press CTRL+C\n";
$callback =function ($msg) {
echo ' [x] Received ', $msg->getBody(), "\n";
//这里用延迟执行 表示程序正在执行中
sleep(substr_count($msg->getBody(), ' '));
echo " [x] Done\n";
$msg->ack();//手动确认ack
};
//限制RabbitMQ只发不超过1条的消息给同一个消费者。当消息处理完毕后,有了反馈,才会进行第二次发送。
$this->channel->basic_qos(null, 1, null);
$this->channel->basic_consume('task_queue', '', false, false, false, false, $callback);//第四个参数为false 手动ack true 自动ack
try {
$this->channel->consume();
}catch (\Throwable$exception) {
echo $exception->getMessage();
}
$this->channel->close();
$this->connection->close();
}
这里消费者投递多个队列
可以使用多个窗口进行生产 发现每个窗口都会拿到 消息
通过sleep()延时 能清晰看到区别
加了下面这句代码 和没有加上这句代码的区别
$this->channel->basic_qos(null, 1, null);
消息持久化
$this->channel->queue_declare('task_queue', false, true, false, false); //durable true 开启队列持久化 第三个参数
并且 $msg =new AMQPMessage($data, array('delivery_mode' => 2) );消息持久化当生产者投递了消息 然后没有生产者进行消费 进行关闭rabbitmq服务
重启后 会发现消息依旧存在
手动ACK机制
$this->channel->basic_consume('task_queue', '', false, false, false, false, $callback);//第四个参数为false 手动ack true 自动ack
当生产者正在处理队列中的消息 中途突然断开连接 然后并没有执行到
$msg->ack();//手动确认ack
未处理的消息会依旧存在 【正常流程取出就会删除掉】
网友评论