php使用redis的有序集合zset实现延迟队列
我们通过redis的有序集合zset来实现简单的延迟队列,将消息数据序列化,作为zset的基本元素,把 消息生产时间戳 + 消息处理延迟时间戳 作为score,每次通过zRangeByScore获取一条消息进行处理,后通过zRem删除集合元素:相当于移除需要消费的 Job。
优点:
- 实现简单,适合做中小型对延迟时间要求不高的业务场景。
缺点:
1.不适合延迟时间高的业务场景。延迟时间可能有几秒钟的误差。
2.不适合大型项目 ,大型项目建议使用rabbitmq的延迟i消息队列
下面是简单的实现demo
/**
* redis 通过有序集合 zset 实现延迟队列 发送定时邮件
*/
class DelayQueue
{
private $zsetQueue = "delay:email";
/**
* 链接redis 服务器
*/
public function connect():\Redis{
try {
$redis = new \Redis();
$redis->connect("127.0.0.1",6379);
$redis->auth("123456");
return $redis;
} catch (Exception $e) {
throw new Exception("connect error");
}
}
/**
* 生产者
* @param int $secreod 延迟的时间 单位:秒
* @param mixed $data 投入到队列的数据,由于集合元素具有唯一性,故在调用生产消息时,在消息体$data中加入 uuid属性,确保消息体的唯一性
* @return int|false 返回值=1的时候表示成功
*/
public function produceTask(int $secreod,/* */ $data){
$redis = $this->connect();
// 向有序集合插入一个元素,元素关联一个数值,插入成功返回1,同时集合元素不可以重复, 如果元素已经存在返回 0
return $redis->zAdd(
$this->zsetQueue, // 集合队列名称
time()+$secreod,// 相当于 score
json_encode($data,JSON_UNESCAPED_UNICODE) // 需要消费的消息
);
}
/**
* 消费者方法,在cli命令行中运行。建议最好在swoole中运行
* 纯演示代码,模拟守护进程在cli中运行
*/
public function consumeTask(){
$redis = $this->connect();
while (true) {
sheep(3);
try{
if($redis->ping() != "+PONG"){
$redis = $this->connect();
}
}catch(Throwable $e){
return null;// 模拟跳出当前函数,退出当前子进程,swoole会重新拉起新的进程
}
// 取出排序值 score=0-当前时间戳的元素,一次取出一个元素
$job = $redis->zRangeByScore($this->zsetQueue, 0, time(), ["limit" => [0, 1]]);
if(!$job || empty($job[0])) continue;// 无消费数据,返回
$json = $job[0];
$redis->zRem($this->zsetQueue,$json); // 删除集合中的元素[移除队列中的job],删除成功返回1
$std = json_decode($json);
// ...处理你的业务逻辑
// ... todo
}
}
}
$delayQueue = new DelayQueue();
// 将消息加入zset 消息队列 ,延迟30s执行
$delayQueue ->produceTask(30,["id"=>1,"msg"=>"....."]);
// 将消息加入zset 消息队列 ,延迟2小时执行
$delayQueue ->produceTask(2*3600,["id"=>2,"msg"=>"....."]);
网友评论