美文网首页
RabbitMQ(二)、PHP服务

RabbitMQ(二)、PHP服务

作者: 上善丨若水 | 来源:发表于2020-12-29 16:03 被阅读0次

PHP客户端

1. 安装 php-amqplib

composer require php-amqplib/php-amqplib

2. 配置开发项目下的 .env。roadrunner环境下,需要在gitlab某个开发项目下配置此变量。配置示例如下:

  RABBITMQ_HOST=localhost
  RABBITMQ_PORT=5672
  RABBITMQ_USER=guest
  RABBITMQ_PASSWORD=guest
  RABBITMQ_VHOST=/

3. 发送单个消息

     /**
     * @param string $exchange exchange名字 
     * @param string $payload 消息内容
     */
    public function publishMsg(string $exchange, string $payload)
    {
        //建议创建config/rabbitmq配置文件,通过下面代码获取配置信息,或者你也可通过env('xxx')获取配置信息
        $cnf = config('rabbitmq');
        $connection = new AMQPStreamConnection($cnf['host'], $cnf['port'], $cnf['user'], $cnf['pass'], $cnf['vhost']);
        $channel = $connection->channel();
        $channel->exchange_declare($exchange, AMQPExchangeType::FANOUT, false, false, false);

        $message = new AMQPMessage($payload, ['content_type' => 'text/plain']);
        $channel->basic_publish($message, $exchange);

        $channel->close();
        $connection->close();
    }

4. 批量发送多个消息

    /**
     * @param string $exchange exchange名字
     * @param array $payloads 消息内容
     */
    public function batchPublishMsg(string $exchange, array $payloads)
    {
        $cnf = config('rabbitmq');
        $connection = new AMQPStreamConnection($cnf['host'], $cnf['port'], $cnf['user'], $cnf['pass'], $cnf['vhost']);
        $channel = $connection->channel();
        $channel->exchange_declare($exchange, AMQPExchangeType::FANOUT, false, false, false);

        foreach ($payloads as $msg_body) {
            $message = new AMQPMessage(json_encode($msg_body), ['content_type' => 'text/plain']);
            $channel->batch_basic_publish($message, $exchange);
        }

        $channel->publish_batch();
        $channel->close();
        $connection->close();
    }

5. 接收消息

    /**
     * @param string $exchange exchange名字
     * @param string $queueName queue名字
     */
    public function receiveMsg(string $exchange, string $queueName)
    {
        $cnf = config('rabbitmq');
        $connection = new AMQPStreamConnection($cnf['host'], $cnf['port'], $cnf['user'], $cnf['pass'], $cnf['vhost']);
    
        $channel = $connection->channel();
        $channel->queue_declare($exchange, false, false, false, false);
    
        $callback = function ($msg) {
            echo ' [x] Received ', $msg->body, "\n";
        };
        $channel->basic_consume($queueName, '', false, true, false, false, $callback);
        while ($channel->is_consuming()) {
            $channel->wait();
        }
        $channel->close();
        $connection->close();
    }

相关文章

网友评论

      本文标题:RabbitMQ(二)、PHP服务

      本文链接:https://www.haomeiwen.com/subject/fezfoktx.html