//生产者 代码【在构造函数中进行了连接操作 析构函数使用关闭连接操作】
public function jianDan()
{
$this->channel->queue_declare('hello', false, false, false, false);
$data = [
'id' =>100
];
$msg =new AMQPMessage(json_encode($data));
$this->channel->basic_publish($msg, '', 'hello');
echo " [x] Sent 'Hello World!'\n";
}
//消费者代码 我这里使用的TP6框架的console
protected function execute(Input$input, Output$output)
{
$this->connection =new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$this->channel =$this->connection->channel();
$this->channel->queue_declare('hello', false, false, false, false);echo " [*] Waiting for messages. To exit press CTRL+C\n";
$callback =function ($msg) {
echo date("Y-m-d H:i:s");
$body =json_decode($msg->getBody(), true);
$id =$body['id'];
print_r(app(MqTestController::class)->userInfo($id));
};
$this->channel->basic_consume('hello', '', false, true, false, false, $callback);//自动ack
try {
$this->channel->consume();
}catch (\Throwable$exception) {
echo $exception->getMessage();
}
$this->channel->close();
$this->connection->close();
}
每次生产者进行消息投递
如果消费者未开启
那么可以通过rabbitMq的管理面板进行查看队列情况

通过命令 php think 查看console 配置是否成功

然后执行 php think TestMq 然后消费者会一直挂起 只要有消息投递 就会接收到消息

网友评论