美文网首页
RabbitMQ(二)、PHP服务

RabbitMQ(二)、PHP服务

作者: 上善丨若水 | 来源:发表于2020-12-29 16:03 被阅读0次

    PHP客户端

    1. 安装 php-amqplib

    composer require php-amqplib/php-amqplib
    

    2. 配置开发项目下的 .env。roadrunner环境下,需要在gitlab某个开发项目下配置此变量。配置示例如下:

      RABBITMQ_HOST=localhost
      RABBITMQ_PORT=5672
      RABBITMQ_USER=guest
      RABBITMQ_PASSWORD=guest
      RABBITMQ_VHOST=/
    

    3. 发送单个消息

         /**
         * @param string $exchange exchange名字 
         * @param string $payload 消息内容
         */
        public function publishMsg(string $exchange, string $payload)
        {
            //建议创建config/rabbitmq配置文件,通过下面代码获取配置信息,或者你也可通过env('xxx')获取配置信息
            $cnf = config('rabbitmq');
            $connection = new AMQPStreamConnection($cnf['host'], $cnf['port'], $cnf['user'], $cnf['pass'], $cnf['vhost']);
            $channel = $connection->channel();
            $channel->exchange_declare($exchange, AMQPExchangeType::FANOUT, false, false, false);
    
            $message = new AMQPMessage($payload, ['content_type' => 'text/plain']);
            $channel->basic_publish($message, $exchange);
    
            $channel->close();
            $connection->close();
        }
    

    4. 批量发送多个消息

        /**
         * @param string $exchange exchange名字
         * @param array $payloads 消息内容
         */
        public function batchPublishMsg(string $exchange, array $payloads)
        {
            $cnf = config('rabbitmq');
            $connection = new AMQPStreamConnection($cnf['host'], $cnf['port'], $cnf['user'], $cnf['pass'], $cnf['vhost']);
            $channel = $connection->channel();
            $channel->exchange_declare($exchange, AMQPExchangeType::FANOUT, false, false, false);
    
            foreach ($payloads as $msg_body) {
                $message = new AMQPMessage(json_encode($msg_body), ['content_type' => 'text/plain']);
                $channel->batch_basic_publish($message, $exchange);
            }
    
            $channel->publish_batch();
            $channel->close();
            $connection->close();
        }
    

    5. 接收消息

        /**
         * @param string $exchange exchange名字
         * @param string $queueName queue名字
         */
        public function receiveMsg(string $exchange, string $queueName)
        {
            $cnf = config('rabbitmq');
            $connection = new AMQPStreamConnection($cnf['host'], $cnf['port'], $cnf['user'], $cnf['pass'], $cnf['vhost']);
        
            $channel = $connection->channel();
            $channel->queue_declare($exchange, false, false, false, false);
        
            $callback = function ($msg) {
                echo ' [x] Received ', $msg->body, "\n";
            };
            $channel->basic_consume($queueName, '', false, true, false, false, $callback);
            while ($channel->is_consuming()) {
                $channel->wait();
            }
            $channel->close();
            $connection->close();
        }
    

    相关文章

      网友评论

          本文标题:RabbitMQ(二)、PHP服务

          本文链接:https://www.haomeiwen.com/subject/fezfoktx.html