说明:以数据库为驱动的消息队列,框架是tp5.1,我是测试将一张表数据插入到另一张表中。
1.安装thinkphp-queue扩展,这里版本我下载的2.0,根据自身框架版本下载对应扩展版本
composer require topthink/think-queue:2.*
2.配置queue文件和database文件,在项目根目录config下面,如下图:

1612163792(1).jpg

1612164144(1).jpg
3.创建数据表
(1)创建一张测试表,里面插入一些数据(随便创建的)
CREATE TABLE `yks_oms_orders` (
`oid` int(25) unsigned NOT NULL AUTO_INCREMENT COMMENT '订单流水号',
`order_id` varchar(50) NOT NULL DEFAULT '' COMMENT '订单ID',
`address` varchar(500) DEFAULT NULL COMMENT '收货人地址',
`phone` varchar(30) DEFAULT '' COMMENT '收货人手机号',
`name` varchar(200) DEFAULT '' COMMENT '收货人姓名',
`weight` double(12,3) DEFAULT '0.000' COMMENT '订单重量kg',
PRIMARY KEY (`oid`),
) ENGINE=InnoDB AUTO_INCREMENT=573047 DEFAULT CHARSET=utf8;
(2)创建一张表接受数据
CREATE TABLE `yks_oms_orders_bak` (
`oid` int(25) unsigned NOT NULL AUTO_INCREMENT COMMENT '订单流水号',
`order_id` varchar(50) NOT NULL DEFAULT '' COMMENT '订单ID',
`address` varchar(500) DEFAULT NULL COMMENT '收货人地址',
`phone` varchar(30) DEFAULT '' COMMENT '收货人手机号',
`name` varchar(200) DEFAULT '' COMMENT '收货人姓名',
`weight` double(12,3) DEFAULT '0.000' COMMENT '订单重量kg',
PRIMARY KEY (`oid`),
) ENGINE=InnoDB AUTO_INCREMENT=573047 DEFAULT CHARSET=utf8;
(3)创建一张用来存放队列的表
CREATE TABLE `yks_jobs` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`queue` varchar(255) NOT NULL,
`payload` longtext NOT NULL,
`attempts` tinyint(3) unsigned NOT NULL,
`reserved` tinyint(3) unsigned NOT NULL,
`reserved_at` int(10) unsigned DEFAULT NULL,
`available_at` int(10) unsigned NOT NULL,
`created_at` int(10) unsigned NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=3001 DEFAULT CHARSET=utf8;
4.创建一个方法,将表中数据推送到队列
<?php
namespace app\index\controller;
use think\Queue;
use think\Controller;
use think\Db;
class Jobtest extends Controller
{
public function test()
{
$data = Db::table('yks_oms_orders')->select();
foreach($data as $v){
//将该任务推送到消息队列,等待对应的消费者去执行
//参数1:当前任务将由哪个类来负责处理,当轮到该任务时,系统将生成一个该类的实例,并调用其 fire 方法;
//参数2:一维数组(表中每一行数据)
//参数3:当前任务归属的队列名称,如果为新队列,会自动创建
$isPushed = Queue::push("app\admin\job\JobTest", $v, $queue = 'baidu');
}
dump($isPushed);
}
}
5.消费消息
<?php
namespace app\admin\job;
use think\queue\Job;
use think\Db;
class JobTest
{
/**
* fire方法是消息队列默认调用的方法
* @param Job $job 当前的任务对象
* @param array|mixed $data 发布任务时自定义的数据
*/
public function fire(Job $job, $data)
{
// 插入备份表
$isJobDone = Db::table('yks_oms_orders_bak')->insert($data);
if ($isJobDone) {
// 如果任务执行成功,记得删除任务
$job->delete();
} else {
// 通过这个方法可以检查这个任务已经重试了几次了
if ($job->attempts() > 3) {
$job->delete();
// 也可以重新发布这个任务
//$job->release(2); // $delay为延迟时间,表示该任务延迟2秒后再执行
}
}
}
}
最后执行命令就会看到效果:php think queue:listen --queue baidu
网友评论