PHP客户端
1. 安装 php-amqplib
composer require php-amqplib/php-amqplib
2. 配置开发项目下的 .env。roadrunner环境下,需要在gitlab某个开发项目下配置此变量。配置示例如下:
RABBITMQ_HOST=localhost
RABBITMQ_PORT=5672
RABBITMQ_USER=guest
RABBITMQ_PASSWORD=guest
RABBITMQ_VHOST=/
3. 发送单个消息
/**
* @param string $exchange exchange名字
* @param string $payload 消息内容
*/
public function publishMsg(string $exchange, string $payload)
{
//建议创建config/rabbitmq配置文件,通过下面代码获取配置信息,或者你也可通过env('xxx')获取配置信息
$cnf = config('rabbitmq');
$connection = new AMQPStreamConnection($cnf['host'], $cnf['port'], $cnf['user'], $cnf['pass'], $cnf['vhost']);
$channel = $connection->channel();
$channel->exchange_declare($exchange, AMQPExchangeType::FANOUT, false, false, false);
$message = new AMQPMessage($payload, ['content_type' => 'text/plain']);
$channel->basic_publish($message, $exchange);
$channel->close();
$connection->close();
}
4. 批量发送多个消息
/**
* @param string $exchange exchange名字
* @param array $payloads 消息内容
*/
public function batchPublishMsg(string $exchange, array $payloads)
{
$cnf = config('rabbitmq');
$connection = new AMQPStreamConnection($cnf['host'], $cnf['port'], $cnf['user'], $cnf['pass'], $cnf['vhost']);
$channel = $connection->channel();
$channel->exchange_declare($exchange, AMQPExchangeType::FANOUT, false, false, false);
foreach ($payloads as $msg_body) {
$message = new AMQPMessage(json_encode($msg_body), ['content_type' => 'text/plain']);
$channel->batch_basic_publish($message, $exchange);
}
$channel->publish_batch();
$channel->close();
$connection->close();
}
5. 接收消息
/**
* @param string $exchange exchange名字
* @param string $queueName queue名字
*/
public function receiveMsg(string $exchange, string $queueName)
{
$cnf = config('rabbitmq');
$connection = new AMQPStreamConnection($cnf['host'], $cnf['port'], $cnf['user'], $cnf['pass'], $cnf['vhost']);
$channel = $connection->channel();
$channel->queue_declare($exchange, false, false, false, false);
$callback = function ($msg) {
echo ' [x] Received ', $msg->body, "\n";
};
$channel->basic_consume($queueName, '', false, true, false, false, $callback);
while ($channel->is_consuming()) {
$channel->wait();
}
$channel->close();
$connection->close();
}
网友评论