美文网首页
php 消息队列rabbitMQ

php 消息队列rabbitMQ

作者: 王宣成 | 来源:发表于2020-08-22 22:12 被阅读0次

    一、安装

    使用docker安装rabbitMQ
    docker pull rabbitmq:3.7.7-management
    
    docker images
    
    docker run -d --name rabbitmq3.7.7 -p 5672:5672 -p 15672:15672 -v `pwd`/data:/var/lib/rabbitmq --hostname myRabbit -e RABBITMQ_DEFAULT_VHOST=my_vhost -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin 镜像id
    
    
    打开浏览器访问 账号: admin 密码: admin
    http://127.0.0.1:15672
    
    windows安装扩展,根据自己的php选择下载
    https://pecl.php.net/package/amqp/
    
    php_amqp.dll 文件放到php目录的ext文件夹下
    php.ini里面添加
    extension=php_amqp.dll
    
    rabbitmq.4.dll 放到php根目录下
    重启查看phpinfo 是否成功显示amqp

    二、使用

    新建文件消费者 consumer.php
    vhost 查看
    image.png
    <?php
    
    //声明连接参数
    $config = array(
        'host' => '127.0.0.1',
        'vhost' => 'my_vhost', 
        'port' => 5672,
        'login' => 'admin',
        'password' => 'admin'
    );
    
    //连接broker
    $cnn = new AMQPConnection($config);
    if (!$cnn->connect()) {
        echo "Cannot connect to the broker";
        exit();
    }
    
    //在连接内创建一个通道
    $ch = new AMQPChannel($cnn);
    
    //创建一个交换机
    $ex = new AMQPExchange($ch);
    
    //声明路由键
    $routingKey = 'key_1';
    
    //声明交换机名称
    $exchangeName = 'exchange_1';
    
    //声明队列名称
    $queueName = 'queue_1';
    
    //设置交换机名称
    $ex->setName($exchangeName);
    
    //设置交换机类型
    //AMQP_EX_TYPE_DIRECT:直连交换机
    //AMQP_EX_TYPE_FANOUT:扇形交换机
    //AMQP_EX_TYPE_HEADERS:头交换机
    //AMQP_EX_TYPE_TOPIC:主题交换机
    $ex->setType(AMQP_EX_TYPE_DIRECT);
    
    //设置交换机持久
    $ex->setFlags(AMQP_DURABLE);
    
    //声明交换机
    $ex->declareExchange();
    
    //创建一个消息队列
    $q = new AMQPQueue($ch);
    
    //设置队列名称
    $q->setName($queueName);
    
    //设置队列持久
    $q->setFlags(AMQP_DURABLE);
    
    //声明消息队列
    $q->declareQueue();
    
    //交换机和队列通过$routingKey进行绑定
    $q->bind($ex->getName(), $routingKey);
    
    //接收消息并进行处理的回调方法
    function receive($envelope, $queue)
    {
        //休眠两秒,
        sleep(2);
        //echo消息内容
        echo $envelope->getBody() . "\n";
        //显式确认,队列收到消费者显式确认后,会删除该消息
        $queue->ack($envelope->getDeliveryTag());
    }
    
    //设置消息队列消费者回调方法,并进行阻塞
    $q->consume("receive");
    //$q->consume("receive", AMQP_AUTOACK);//隐式确认,不推荐
    
    新建文件生产者 send.php
    <?php
    
    $config = array(
        'host' => 'localhost',
        'vhost' => 'my_vhost',
        'port' => 5672,
        'login' => 'admin',
        'password' => 'admin'
    );
    
    $cnn = new AMQPConnection($config);
    if (!$cnn->connect()) {
        echo "Cannot connect to the broker";
        exit();
    }
    
    $ch = new AMQPChannel($cnn);
    $ex = new AMQPExchange($ch);
    
    //消息的路由键,一定要和消费者端一致
    $routingKey = 'key_1';
    
    //交换机名称,一定要和消费者端一致,
    $exchangeName = 'exchange_1';
    
    $ex->setName($exchangeName);
    $ex->setType(AMQP_EX_TYPE_DIRECT);
    $ex->setFlags(AMQP_DURABLE);
    $ex->declareExchange();
    
    //创建10个消息
    for ($i = 1; $i <= 10; $i++) {
    
        //消息内容
        $msg = array(
            'data'  => 'message_' . $i,
            'hello' => 'world',
        );
    
        //发送消息到交换机,并返回发送结果
        //delivery_mode:2声明消息持久,持久的队列+持久的消息在RabbitMQ重启后才不会丢失
        echo "<pre>";
        print_r("Send Message:" . $ex->publish(json_encode($msg), $routingKey, AMQP_NOPARAM, array('delivery_mode' => 2)) . "\n");
    
    }
    
    
    服务器上命令模式下执行 consumer.php 消费测试
    php consumer.php
    
    推荐使用supervisor守护进程工具执行 consumer.php
    客户端访问 send.php 生产

    http://localhost/rabbitmq/send.php


    php代码可以使用 php-amqplib 库

    https://github.com/php-amqplib/php-amqplib

    新建composer.json
    {
        "require": {
            "php-amqplib/php-amqplib":"2.8.*"
        }
    }
    
    composer安装
    composer install 
    
    send.php
    <?php
    require_once __DIR__ . '/vendor/autoload.php';
    
    use PhpAmqpLib\Connection\AMQPStreamConnection;
    use PhpAmqpLib\Message\AMQPMessage;
    
    $connection = new AMQPStreamConnection('localhost', 5672, 'admin', 'admin','my_vhost');
    
    $channel = $connection->channel();
    $channel->queue_declare('hello', false, false, false, false);
    $msg = new AMQPMessage('Hello World!');
    $channel->basic_publish($msg, '', 'hello');
    echo " [x] Sent 'Hello World!'\n";
    
    $channel->close();
    $connection->close();
    
    
    consumer.php
    <?php
    require_once __DIR__ . '/vendor/autoload.php';
    
    use PhpAmqpLib\Connection\AMQPStreamConnection;
    
    $connection = new AMQPStreamConnection('localhost', 5672, 'admin', 'admin', 'my_vhost');
    $channel = $connection->channel();
    $channel->queue_declare('hello', false, false, false, false);
    
    echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
    
    $callback = function ($msg) {
        echo " [x] Received ", $msg->body, "\n";
    };
    
    $channel->basic_consume('hello', '', false, true, false, false, $callback);
    
    while (count($channel->callbacks)) {
        $channel->wait();
    }
    
    $channel->close();
    $connection->close();
    
    

    相关文章

      网友评论

          本文标题:php 消息队列rabbitMQ

          本文链接:https://www.haomeiwen.com/subject/ezxbjktx.html