美文网首页
rabbitmq 工作队列

rabbitmq 工作队列

作者: 该死的金箍 | 来源:发表于2024-04-18 13:57 被阅读0次

    //生产者 代码【在构造函数中进行了连接操作 析构函数使用关闭连接操作】

    public function taskQueue()
    {
        //durable  true 开启队列持久化 第三个参数
        $this->channel->queue_declare('task_queue', false, true, false, false);
        $argv = ['a', 'b', 'c', 'd'];
        $data =implode(' ', array_slice($argv, 1));
        if (empty($data)) {
            $data ="Hello World!";
        }
        $msg =new AMQPMessage(
            $data,
            array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
        );
        $this->channel->basic_publish($msg, '', 'task_queue');
        echo ' [x] Sent ', $data, "\n";
    }

    //消费者代码  我这里使用的TP6框架的console

    protected function execute(Input$input, Output$output)
    {
        $this->channel->queue_declare('task_queue', false, true, false, false); //durable  true 开启队列持久化 第三个参数
        echo " [*] Waiting for messages. To exit press CTRL+C\n";
        $callback =function ($msg) {
            echo ' [x] Received ', $msg->getBody(), "\n";
            //这里用延迟执行 表示程序正在执行中
            sleep(substr_count($msg->getBody(), ' '));
            echo " [x] Done\n";
            $msg->ack();//手动确认ack
        };
        //限制RabbitMQ只发不超过1条的消息给同一个消费者。当消息处理完毕后,有了反馈,才会进行第二次发送。
        $this->channel->basic_qos(null, 1, null);
        $this->channel->basic_consume('task_queue', '', false, false, false, false, $callback);//第四个参数为false 手动ack  true 自动ack
        try {
            $this->channel->consume();
        }catch (\Throwable$exception) {
            echo $exception->getMessage();
        }
        $this->channel->close();
        $this->connection->close();
    }

    这里消费者投递多个队列

    可以使用多个窗口进行生产 发现每个窗口都会拿到 消息
    通过sleep()延时 能清晰看到区别
    加了下面这句代码 和没有加上这句代码的区别
     $this->channel->basic_qos(null, 1, null);

    消息持久化

       $this->channel->queue_declare('task_queue', false, true, false, false); //durable  true 开启队列持久化 第三个参数
        并且 $msg =new AMQPMessage($data,        array('delivery_mode' => 2)    );消息持久化

    当生产者投递了消息  然后没有生产者进行消费 进行关闭rabbitmq服务
    重启后 会发现消息依旧存在  

    手动ACK机制

     $this->channel->basic_consume('task_queue', '', false, false, false, false, $callback);//第四个参数为false 手动ack  true 自动ack
    当生产者正在处理队列中的消息 中途突然断开连接  然后并没有执行到
     $msg->ack();//手动确认ack
    未处理的消息会依旧存在   【正常流程取出就会删除掉】

    相关文章

      网友评论

          本文标题:rabbitmq 工作队列

          本文链接:https://www.haomeiwen.com/subject/shbsxjtx.html