1.家伙准备好
1)安装rabbitmq-c类库
rabbitmq-c是一个用于C语言的,与AMQP server进行交互的client库,AMQP协议为版本0-9-1。rabbitmq-c与server进行交互前需要首先进行login操作,在操作后,可以根据AMQP协议规范,执行一系列操作。
RUN apt install librabbitmq-dev -y
RUN wget https://github.com/alanxz/rabbitmq-c/releases/download/v0.8.0/rabbitmq-c-0.8.0.tar.gz -O rabbitmq.tar.gz \
&& mkdir -p rabbitmq \
&& tar -xf rabbitmq.tar.gz -C rabbitmq --strip-components=1 \
&& rm rabbitmq.tar.gz \
&& cd rabbitmq \
&& ./configure --prefix=/usr/local/rabbitmq-dev \
&& make \
&& make install \
&& cd .. \
&& rm -rf rabbitmq
2)安装amqp扩展
RUN wget http://pecl.php.net/get/amqp-1.9.3.tgz -O amqp.tar.gz \
&& mkdir -p amqp \
&& tar -xf amqp.tar.gz -C amqp --strip-components=1 \
&& rm amqp.tar.gz \
&& cd amqp \
&& phpize \
&& ./configure --with-php-config=/usr/local/bin/php-config --with-amqp --with-librabbitmq-dir=/usr/local/rabbitmq-dev \
&& make -j$(nproc) \
&& make install \
&& docker-php-ext-enable amqp
3)加载类库
composer require php-amqplib/php-amqplib
或者
{
"require": {
"php-amqplib/php-amqplib": ">=2.6.1"
}
}
2.万事具备 只欠东风
新建队列
image.png新建交换器
image.png备注:这里声明Exchange类型 direct 名字为 exchange_test 需要绑定 Routing key随便起个名hello,具体的Exchange类型大家可以继续探索
生产者
// 设置交换机名、路由键、队列名
$_conf = [
'exchange_name' => 'exchange_test',
'route_key' => 'hello',
'queue_name' => 'test',
];
// 连接 rabbitmq 服务
$connection = new AMQPStreamConnection('139.199.230.57', 5672, 'test', 'test');
// 获取信道
$channel = $connection->channel();
// 创建消息
$data = "{$_conf['exchange_name']}#{$_conf['route_key']}#{$_conf['queue_name']} " . date('H:i:s');
$msg = new AMQPMessage($data);
// 发送消息
$channel->basic_publish($msg , $_conf['exchange_name'], $_conf['route_key']);
// 关闭信道和连接
$channel->close();
$connection->close();
消费者
// 设置交换机名、路由键、队列名
$_conf = [
'exchange_name' => 'exchange_test',
'route_key' => 'hello',
'queue_name' => 'test',
];
// 连接 rabbitmq 服务
$connection = new AMQPStreamConnection('139.199.230.57', 5672, 'test', 'test');
// 获取信道
$channel = $connection->channel();
// 声明队列
$channel->queue_declare($_conf['queue_name']);
// 绑定队列
$channel->queue_bind($_conf['queue_name'], $_conf['exchange_name'], $_conf['route_key']);
// 定义回调函数
$callback = function ($msg) {
// 消息确认
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
if ($msg->body == 'quit') {
// 停止消费并退出
$msg->delivery_info['channel']->basic_cancel($msg->delivery_info['consumer_tag']);
} else {
echo 'Hello ', $msg->body, PHP_EOL;
}
};
// 消费者订阅队列
$channel->basic_consume($_conf['queue_name'],
'',
false,
false,
false,
false,
$callback);
// 开始消费
while (count($channel->callbacks)) {
$channel->wait();
}
// 关闭信道和连接
$channel->close();
$connection->close();
执行生产者代码可以看到积压的消息
image.png
一般后台执行一个守护进程一直去检测队列如果有消息就去执行相关的逻辑代码
image.png
3遇到问题
Exception 'PhpAmqpLib\Exception\AMQPProtocolChannelException' with message 'PRECONDITION_FAILED - parameters for queue 'test' in vhost '/' not equivalent'
image.png
注意:客户端的配置需要与RabbitMQ_Management设置的参数要一致
网友评论