我们在开发项目的时候,经常会遇到“群发短信”,“订单系统有大量的日志”,“秒杀设计”等等,服务器没法处理这种瞬间迸发的压力,这种情况要保证系统正常有效的使用,就需要“消息队列”的帮助,而我们熟知的消息队列场景有
1、解耦(订单+配送系统)
2、流量削峰(秒杀)
3、RabbitMQ(更专业的消息系统实现)
今天我们着重讲一下RabbitMQ的按照使用。
一、安装RabbitMQ
// 按照rabbitmq
brew install rabbitmq
安装rabbitmq
二、启动rabbitmq服务
rabbitmq安装成功后路径为/usr/local/Cellar/rabbitmq/3.7.14 (版本根据安装版本确定)
进入该目录执行sbin/rabbitmq-server命令,提示rabbitmq启动成功
rabbitmq-server
三、浏览器登录
浏览器输入http://localhost:15672即可进入rabbitmq控制终端登录页面,默认用户名和密码为 guest/guest
login rabbitmq
以上说明安装rabbitmq 成功,下面我们开始安装php扩展
四、安装PHP扩展
1)、安装rabbitmq-c
php中的rabbitmq 扩展是amqp ,而amqp依赖于rabbitmq-c ,首先需要安装rabbitmq-c
安装成功后,我们可以再/usr/local/Cellar下看到rabbitmq-c
安装rabbitmq-c
2)、安装amqp
使用pecl 安装amqp,直至出现Set the path to librabbitmq install prefix [autodetect] ,这里我们将rabbitmq-c的路径复制过去,然后回车(我这里路径为/usr/local/Cellar/rabbitmq-c/0.9.0)
3)、添加扩展到php.ini
安装成功后我们就能在php的扩展中看到amqp.so文件,我们打开php.ini配置文件,加上extension=amqp.so文件,重启apache,打开phpinfo文件便能看到已经成功安装amqp扩展
amqp
到此我们的RabbitMQ都已经安装完成,我们现在写一段测试代码;
1、首先启动服务
2、消费者代码consume.php
<?php
//声明连接参数
$config = array(
'host' => '127.0.0.1',
'vhost' => '/',
'port' => 5672,
'login' => 'guest',
'password' => 'guest'
);
//连接broker
$cnn = new AMQPConnection($config);
if (!$cnn->connect()) {
echo "Cannot connect to the broker";
exit();
}
//在连接内创建一个通道
$ch = new AMQPChannel($cnn);
//创建一个交换机
$ex = new AMQPExchange($ch);
//声明路由键
$routingKey = 'key_1';
//声明交换机名称
$exchangeName = 'exchange_1';
//设置交换机名称
$ex->setName($exchangeName);
//设置交换机类型
//AMQP_EX_TYPE_DIRECT:直连交换机
//AMQP_EX_TYPE_FANOUT:扇形交换机
//AMQP_EX_TYPE_HEADERS:头交换机
//AMQP_EX_TYPE_TOPIC:主题交换机
$ex->setType(AMQP_EX_TYPE_DIRECT);
//设置交换机持久
$ex->setFlags(AMQP_DURABLE);
//声明交换机
$ex->declareExchange();
//创建一个消息队列
$q = new AMQPQueue($ch);
//设置队列名称
$q->setName('queue_1');
//设置队列持久
$q->setFlags(AMQP_DURABLE);
//声明消息队列
$q->declareQueue();
//交换机和队列通过$routingKey进行绑定
$q->bind($ex->getName(), $routingKey);
//接收消息并进行处理的回调方法
function receive($envelope, $queue) {
//休眠两秒,
sleep(2);
//echo消息内容
echo $envelope->getBody()."\n";
//显式确认,队列收到消费者显式确认后,会删除该消息
$queue->ack($envelope->getDeliveryTag());
}
//设置消息队列消费者回调方法,并进行阻塞
$q->consume("receive");
//$q->consume("receive", AMQP_AUTOACK);//隐式确认,不推荐
以上是消费者代码,打开两个命令行/终端
image.png此时再看RabbitMQ管理界面:
Connections 出现两个连接,这两个就是消费者,因为他们在阻塞着等待消息 Channels 消费者在各自的连接里都打开了一个通道 Exchanges: 其中一个消费者创建了一个持久的直连交换机 Queues: 消息队列已经创建,但消息数是0,因为此时还没有生产者生产者代码publisher.php
<?php
$config = array(
'host' => '127.0.0.1',
'vhost' => '/',
'port' => 5672,
'login' => 'guest',
'password' => 'guest'
);
$cnn = new AMQPConnection($config);
if (!$cnn->connect()) {
echo "Cannot connect to the broker";
exit();
}
$ch = new AMQPChannel($cnn);
$ex = new AMQPExchange($ch);
//消息的路由键,一定要和消费者端一致
$routingKey = 'key_1';
//交换机名称,一定要和消费者端一致,
$exchangeName = 'exchange_1';
$ex->setName($exchangeName);
$ex->setType(AMQP_EX_TYPE_DIRECT);
$ex->setFlags(AMQP_DURABLE);
$ex->declareExchange();
//创建10个消息
for ($i=1;$i<=10;$i++){
//消息内容
$msg = array(
'data' => 'message_'.$I,
'hello' => 'world',
);
//发送消息到交换机,并返回发送结果
//delivery_mode:2声明消息持久,持久的队列+持久的消息在RabbitMQ重启后才不会丢失
echo "Send Message:".$ex->publish(json_encode($msg), $routingKey, AMQP_NOPARAM, array('delivery_mode' => 2))."\n";
//代码执行完毕后进程会自动退出
}
在执行之前,先关掉前面的两个消费者,打开一个命令行/终端,输入php publisher.php,由于生产者不需要阻塞,执行完进程便退出,所以现在RabbitMQ管理界面中既没有Connections也没有Channels,但是Queues已经被Exchanges投递过去了10条消息
image.png image.png
因为我们执行生产者之前已经关掉了全部消费者,所以此时消息在队列中等待获取;
因为在发送消息时设置了delivery_mode:2来声明消息持久化,此时如果重启RabbitMQ,消息还会恢复;此时重新执行消费者,假设还是两个,打开两个命令行/终端,输入php consumer.php,我们可以看到消息被消费,如下图:
提醒:生产者在生产消息时,如果不存在指定队列,并且没有创建队列,或者队列存在但消息路由键和交换机与队列绑定的键(路由规则)不一致(直连交换机必须一致),则消息会被交换机丢弃。
这里我们简单的了解了RabbitMQ,消息队列一直是各个大项目中必备的技术,希望大家能够多去了解,一起学习进步!
网友评论