PHP RabbitMQ路由模式(Direct模式),使用的交换机类型为direct,跟发布订阅模式的区别就是Direct交换机将消息投递到路由参数完全匹配的队列中,架构如下图
image.png提示:无论使用RabbitMQ那种工作模式,区别就是使用的交换机(Exchange)类型和路由参数不一样。
1.前置教程
请先阅读下面章节,了解相关知识
2.定义Direct交换机
// 声明交换机
$channel->exchange_declare(
'tizi365.direct', // 交换机名,需要唯一,不能重复
'direct', // 交换机类型
false,
false, // 是否持久化
false
);
提示: 无论是消息生产者还是消费者都需要交换机。
3.发送消息
我们将消息发送给交换机,由交换机根据路由规则投递消息到对应的队列。
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
// 创建rabbitmq连接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// 创建Channel
$channel = $connection->channel();
// 声明交换机
$channel->exchange_declare(
'tizi365.direct', // 交换机名,需要唯一,不能重复
'direct', // 交换机类型
false,
false, // 是否持久化
false
);
// 消息对象,参数是消息内容
$msg = new AMQPMessage("hello tizi365.com");
// 注意第三个参数,路由参数
$channel->basic_publish(
$msg, // 消息对象
'tizi365.direct', // 交换机名字
"tizi365" // 路由参数,可以根据需求,任意定义。
);
echo ' [x] Sent ', $msg->getBody(), "\n";
// 释放资源
$channel->close();
$connection->close();
提示:basic_publish方法中的第三个参数,这是个关键参数。
4.接收消息
4.1.定义队列&绑定交换机
要想消费队列消息,需要先定义一个队列,然后将队列绑定到目标交换机上。
// 声明一个匿名队列
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
// 队列绑定指定交换机
$channel->queue_bind(
$queue_name, // 队列名
'tizi365.fanout', // 交换机名字
"tizi365" // 绑定路由参数,这里绑定tizi365
);
提示:根据direct交换机规则,发送消息时携带的路由参数,如果匹配队列绑定交换机的时候设置的路由参数,则将消息投递到这个队列。
4.2.完整的接收消息代码
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
// 创建rabbitmq连接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// 创建Channel
$channel = $connection->channel();
// 声明交换机
$channel->exchange_declare(
'tizi365.direct', // 交换机名,需要唯一,不能重复
'direct', // 交换机类型
false,
false, // 是否持久化
false
);
// 声明一个匿名队列
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
// 队列绑定指定交换机
$channel->queue_bind(
$queue_name, // 队列名
'tizi365.fanout', // 交换机名字
"tizi365" // 绑定路由参数,这里绑定tizi365
);
echo " [*] Waiting for message. To exit press CTRL+C\n";
// 定义消息处理函数(这里使用匿名函数)
$callback = function ($msg) {
// 消息处理逻辑
echo ' [x] ', $msg->body, "\n";
};
// 创建消费者
$channel->basic_consume(
$queue_name, // 队列名,需要消费的队列名
'', // 消费者名,忽略,则自动生成一个唯一ID
false,
true, // 是否自动提交消息,即自动告诉rabbitmq消息已经处理成功。
false,
false,
$callback // 消息处理函数
);
// 如果信道没有关闭,则一直阻塞进程,避免进程退出
while ($channel->is_open()) {
$channel->wait();
}
// 释放资源
$channel->close();
$connection->close();
网友评论