前言:
中间件系统的定义是两个独立的不同的系统在中间构建起传递消息的工具。但是同一个系统也可以通过中间件来榨取性能,大家肯定项目中遇到过性能瓶颈,比如发送邮件,发送短信,转换视频格式等等。这些业务都是比较耗性能,又对先后顺序不敏感的业务。这种业务就非常适合使用消息队列系统来异步处理,使性能提升。
说起消息队列,比较著名的RabbitMQ,下图是阿里云的产品介绍。

看看他的介绍,就两个字牛逼。一般的中小型系统往往用不了这个量级的中间件。只需要一个消息队列来优化下性能。所以今天就搞个简单的轻量队列系统。主要介绍一个大家都容易遇到的场景那就是发邮件
工具介绍:
这次用到的工具有:
Redis(redis本身其实支持消息队列,订阅分发之类的)
PHP7.3
ThinkPHP框架
composer包 think-queue(tp官方团队产品)
安装工具
首先我的系统已经装好php,composer,Apache(其实今天的文章和非必要),redis等东西。这里就不介绍了。直接上框架和工具介绍
先装个框架,composer安装会拉一个tp5.1的框架。tp5.1之后安装就只有composer安装方式,我认为这是一个非常大的进步。
composer create-project topthink/think tp5-queue
再引入队列工具的包
composer require topthink/think-queue
好了,工具就全部安装完毕了。
下面来配置下项目

这里我推荐使用redis。既然用队列了。当然选择性能强的。
根据自己的情况。配置好之后,就需要来写一个消费类了。
这里的消费类是我的一个比较习惯的叫法,我一般将制造数据,写队列的类或对象称为生产者。而监听队列,接收数据,处理数据的类或对象称为消费者
首先我们在application 下面建立一个job文件夹,再创建一个Job1的类,这也是官方比较推荐的规范

下面来到第一个场景。发邮件。我们模拟下发送的过程,sleep 5秒。总之很慢就对了
/**
* 消费队列程序(这里的方法名和传入参数都是规定好了的)
* @param Job $job 注入对象,队列对象
* @param string $data 约定队列传递的数据。String类型
*/
public function fire(Job $job, $data){
//将data里面的数据写到文件缓存里面去
try{
if ($job->attempts()>3) {
echo "该任务大于3次重试了";
$job->delete();
//失败3次后,放弃消费。这里往往会加入日志
}
echo "开始发送邮件id:".$data;
sleep(5);
echo "发送邮件成功";
$job->delete();
}catch (Exception $e){
//出错将当前消息,从新推向队列中
echo $e->getMessage();
$job->release();
}
}
有了消费类。我就需要有个生产类。既然是发邮件。那么抛入队列的消息就应该是标题内容收件人等等,业内比较通用的是传递一个json格式。由于我只是演示,就传递个id代表邮件的数据好了
创建一个控制器方法,通过访问这个方法来推动消息
public function sendJob()
{
//消费类命名空间
$jobClass = 'app\job\Job1';
//队列名称
$queueName = "first_queue";
//将消息推送到队列中
$result=Queue::push($jobClass, $data = rand(0, 1000), $queueName);
if ($result) {
echo "push MQ SUCCESS";
}
}
现在我们就可以访问这个地址

推送成功,虽然我们这时候已经写好了消费者,但是我们并没有开始消费。但是推送消息依然是成功的。这个就是中间件的优势。他连接两个系统,但是又不会互相耦合,生产者并不会因为消费者的异常而影响到自己。
消息推送成功之后,如果没有消费者,消息会堆积在队列中。不过别怕,消息堆积很正常,并且一般的中间件堆积能力是非常强的。比如阿里就宣传自己mq可以堆积上亿条数据。非常可怕
那我们现在来启动消费者来进行消费。由于php的特性。在web应用中,php-fpm在收到web服务器转发的请求后,会执行.php文件中的脚本,在执行结束后,自动释放内存和进程。
这样的话,就不适合来做监听队列的工作。但是php有另外一种模式,那就是cli。命令行模式,命令行模式可以常驻内存不停的执行php代码。这样就可以达到类似于静态语言的java的效果。
当然具体怎么监听,框架已经帮我们写好了

命令在官方文档里面写了的。但是我们还是看看帮助文档,看看这个怎么用。由图中我们看到我只需要传递一个--queue 队列名称 就可以开始搞了
php think queue:listen --queue first_queue
一开始监听队列。刚刚在队列中堆积的消息立刻就被获取到,开始执行了代码。中途有5秒钟的等待,最后执行完成,删除了消息

这时候,我们可以再多搞点数据进入我们的队列,看我们的队列是如何从容处理的

我们搞个2000条数据来玩。

非常迅速的完成2000条数据的推送,可见性能还是非常nice。那么我们来看看消费这边呢

消费者这边也是非常从容的处理着数据。由此可见,中间件的确可以帮助我们大大的提升性能。让我们的项目更上一层楼。
当然这里的消费者,如果是在生产环境,则推荐使用进程守护来监控进程。同时,在配置消费者的时候,注意有一个参数代表最大使用内存,如果内存溢出则队列会停止消费。所以大家如果消费队列需要使用很大的内存的话,则可以把参数调大一些
注意的坑
-
在执行消费者的程序的时候,有可能会抛出一个错误。内容忘了截图,总之原因是因为,listen命令需要使用函数proc_open(‘php think queue:work’) 来创建work进程来消费队列。但是往往php.ini中都是对这类函数禁用了的。解决方法就是进入php.ini 将 disable_functions 下面的proc_open和proc_get_status这两个删除。就可以了。
-
容易出现bug的地方。在消费程序中,切记处理完了数据之后$job->delete();来删除队列中的消息。如果不删除就非常容易出现bug
-
在消费程序中使用exit() , die()这类函数会导致进程退出
在work --daemon 单进程循环消费的时候,改了代码是不会生效的。这时脚本语言有点类似于静态语言在执行。所以需要我们用 queue:restart 重启 work 进程
好了,今天的介绍就写到这里。其实哪怕是tp自带的队列很弱小,都有很多值得学习和使用的价值。工具简单或者技术low都应该是我们拒绝使用的理由。毕竟真正厉害的工具是我们解决问题的思路。技术只是负责去完成我们思路的工具而已。如果有大神发现有什么问题的话,欢迎指点。感激不尽
已上
网友评论