美文网首页
RabbitMQ PHP 实例 Demo

RabbitMQ PHP 实例 Demo

作者: 街头民工 | 来源:发表于2022-04-11 08:43 被阅读0次

参考文档-demo1
参考文档-demo2
参考文档-工作流程

  • 我理解的工作流程是,生产者(发送消息的一方)> 交换机 > 队列 > 消费者(处理消息的一方)

  • 上面我们安装完rabbitmq+php扩展后 搜了一些 demo去实际操作了一下以方便理解,第一个参考文档感觉最容易理解。

# 创建表 mysql
CREATE TABLE `rabbitmq_table` (
`id` int(11) NOT NULL AUTO_INCREMENT,
  `sJson` varchar(255) NOT NULL DEFAULT '',
  PRIMARY KEY (`id`)
) ENGINE=MyISAM DEFAULT CHARSET=utf8mb4;
  • 创建消费者文件 consumer1.php
<?php

$conn_args = array(
    'host' => '127.0.0.1',
    'port' => '5672',
    'login' => 'guest',
    'password' => 'guest',
    'vhost'=>'/'
);

$e_name = 'e_linvo'; //交换机名
$q_name = 'q_linvo'; //队列名
$k_route = 'key_2'; //路由key

//创建连接和channel
$conn = new AMQPConnection($conn_args);
if (!$conn->connect()) {
    die("Cannot connect to the broker!\n");
}
$channel = new AMQPChannel($conn);

//创建交换机
$ex = new AMQPExchange($channel);
$ex->setName($e_name);
$ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型
$ex->setFlags(AMQP_DURABLE); //持久化
//$ex->declare();
$ex->declareExchange();

//创建队列
$q = new AMQPQueue($channel);
$q->setName($q_name);
$q->setFlags(AMQP_DURABLE); //持久化
//$q->declare();     //最好队列object在这里declare()下,否则如果是新的queue会报错
$q->declareQueue();     //最好队列object在这里declare()下,否则如果是新的queue会报错

//绑定交换机与队列,并指定路由键,可以多个路由键
$q->bind($e_name, 'key_1');

echo "Message:".PHP_EOL;
//阻塞模式接收消息
$q->consume('processMessage', AMQP_AUTOACK); //自动ACK应答

//非阻塞模式接收消息
/*while(True){
    //消息获取
    $arr = $q->get();
    $res = $q->ack($arr->getDeliveryTag());     //手动发送ACK应答
    $msg = $arr->getBody();
    mysql_insert($msg);
    log_insert($msg);
}*/

$conn->disconnect();

/**
 * 消费回调函数
 * 处理消息
 */
function processMessage($envelope, $queue) {
    var_dump($envelope->getRoutingKey());
    $msg = $envelope->getBody();
    mysql_insert($msg);
    log_insert($msg);
}

//插入日志
function log_insert($json){
    file_put_contents('./consumer1.txt',$json.PHP_EOL,FILE_APPEND );
}
//消息入库
function mysql_insert($json){
    //连接MySQL数据库
    $pdo = new PDO("mysql:host=localhost;dbname=test","root","168168" );
    $pdo->query('SET NAMES UTF8MB4');//设置UTF8字符编码
//    $pdo->query('SET NAMES UTF8');
    $sql = "insert into `rabbitmq_table` (sJson) values ('{$json}')";
    echo $sql.PHP_EOL;
    if ($pdo->exec($sql)){
        echo "mysql insert success".PHP_EOL;
    } else {
        echo "mysql insert fail".PHP_EOL;
    }
}
  • 创建生产者文件 publisher1.php
$conn_args = array(
    'host' => '127.0.0.1',
    'port' => '5672',
    'login' => 'guest',
    'password' => 'guest',
    'vhost'=>'/'
);

//创建连接和channel
$conn = new AMQPConnection($conn_args);
if (!$conn->connect()) {
    die("Cannot connect to the broker!\n");
}
$channel = new AMQPChannel($conn);

//创建交换机
$e_name = 'e_linvo'; //交换机名
$ex = new AMQPExchange($channel);
$ex->setName($e_name);
$ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型
$ex->setFlags(AMQP_DURABLE); //持久化
echo "Exchange Status:".$ex->declareExchange().PHP_EOL;


echo "Send Message:".$ex->publish("rabbitmq消息测试,key_1" . date('H:i:s', time()), 'key_1').PHP_EOL;
//echo "Send Message:".$ex->publish("rabbitmq消息测试,key_2 by xust" . date('H:i:s', time()), 'key_2').PHP_EOL;
  • 使用方法:先运行consumer2.php,再运行publisher2.php

  • 第二个demo

  • 创建文件 consumer2.php

<?php
//配置信息
$conn_args = array(
    'host' => '127.0.0.1',
    'port' => '5672',
    'login' => 'guest',
    'password' => 'guest',
    'vhost'=>'/'
);
$e_name = 'e_linvo'; //交换机名
$q_name = 'q_linvo'; //队列名
$k_route = 'key_1'; //路由key

//创建连接和channel
$conn = new AMQPConnection($conn_args);
if (!$conn->connect()) {
    die("Cannot connect to the broker!\n");
}
$channel = new AMQPChannel($conn);

//创建交换机
$ex = new AMQPExchange($channel);
$ex->setName($e_name);
$ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型
$ex->setFlags(AMQP_DURABLE); //持久化
echo "Exchange Status:".$ex->declareExchange()."\n";

//创建队列
$q = new AMQPQueue($channel);
$q->setName($q_name);
$q->setFlags(AMQP_DURABLE); //持久化
echo "Message Total:".$q->declareQueue()."\n";

//绑定交换机与队列,并指定路由键
echo 'Queue Bind: '.$q->bind($e_name, $k_route)."\n";

//阻塞模式接收消息
echo "Message:".PHP_EOL;
while(True){
    $q->consume('processMessage');
    //$q->consume('processMessage', AMQP_AUTOACK); //自动ACK应答
}
$conn->disconnect();

/**
 * 消费回调函数
 * 处理消息
 */
function processMessage($envelope, $queue) {
    $msg = $envelope->getBody();
    echo $msg.PHP_EOL; //处理消息
    $queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答
}
  • 创建 publisher2.php
<?php
$conn_args = array(
    'host' => '127.0.0.1',
    'port' => '5672',
    'login' => 'guest',
    'password' => 'guest',
    'vhost'=>'/'
);
$e_name = 'e_linvo'; //交换机名
//$q_name = 'q_linvo'; //无需队列名
$k_route = 'key_1'; //路由key

//创建连接和channel
$conn = new AMQPConnection($conn_args);
if (!$conn->connect()) {
    die("Cannot connect to the broker!\n");
}
$channel = new AMQPChannel($conn);

//消息内容
$message = json_encode(['msg'=>"RabbitMQ消息发送成功~~",'order_id'=>time()],JSON_UNESCAPED_UNICODE);

//创建交换机对象
$ex = new AMQPExchange($channel);
$ex->setName($e_name);

//发送消息
//$channel->startTransaction(); //开始事务
for($i=1; $i<=5; ++$i){
    echo "Send Message:".$ex->publish($message.$i, $k_route).PHP_EOL;
    sleep(1);
}
//$channel->commitTransaction(); //提交事务

$conn->disconnect();
  • 使用方法:先运行consumer2.php,再运行publisher2.php

相关文章

网友评论

      本文标题:RabbitMQ PHP 实例 Demo

      本文链接:https://www.haomeiwen.com/subject/rqbtjrtx.html