需要redis5.0 ,借鉴kafka,消息可持久化
<?php
//连接reids
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
//xadd:追加消息
//xdel:删除消息,删除标志位,不影响消息总长度
//xrange:消息列表,过滤已删除的消息
//xlen:消息长度
//del: 删除所有消息
$redis->rawCommand('del','codehole');
// 星号表示自动生成id,后面参数key,value
$redis->rawCommand('xadd','codehole','*','name','user1','age','20');
$redis->rawCommand('xadd','codehole','*','name','user2','age','18');
$redis->rawCommand('xadd','codehole','*','name','user3','age','19');
$redis->rawCommand('xadd','codehole','*','name','user4','age','19');
//maxlen 定长长度,将老消息干掉,确保链表不会超过指定长度
$redis->rawCommand('xadd','codehole','maxlen','3','*','name','user5','age','19');
//XDEL codehole id
//$redis->rawCommand('xdel','codehole','1538561700640-0');
$res = $redis->rawCommand('xlen','codehole');
echo "<pre>";
var_dump($res);
echo '<br />';
// -最小值 +最大值
$res = $redis->rawCommand('xrange','codehole','-','+');
print_r($res);
echo '<br />';
$id = $res[1][0];
// 指定最小消息列表
$res = $redis->rawCommand('xrange','codehole',$id,'+');
// var_dump($res);
// echo '<br />';
// 指定最大消息列表
$res = $redis->rawCommand('xrange','codehole','-',$id);
// var_dump($res);
// echo '<br />';
// 指定最大消息列表
$res = $redis->rawCommand('xrange','codehole','-',$id);
// var_dump($res);
// echo '<br />';
/************************独立消费************************/
//从stream中头部读取两条消息
$res = $redis->rawCommand('xread','count','2','streams','codehole','0-0');
// var_dump($res);
// echo '<br />';
//从尾部读取一条消息,这里不会返回任何消息
$res = $redis->rawCommand('xread','count','1','streams','codehole','$');
// var_dump($res);
// echo '<br />';
//block 0 表示永久阻塞,直到消息到来,block 1000表示阻塞1秒,如果1秒没新消息,返回null
//从尾部阻塞等待消息到来,然后新开一个窗口塞消息,这时候阻塞解除返回新消息内容
// $res = $redis->rawCommand('xread','block','0','count','1','streams','codehole','$');
// var_dump($res);
// echo '<br />';
/************************消费组************************/
// 星号表示自动生成id,后面参数key,value
$redis->rawCommand('xadd','mq','*','msg','1');
$redis->rawCommand('xadd','mq','*','msg','2');
$redis->rawCommand('xadd','mq','*','msg','3');
$redis->rawCommand('xadd','mq','*','msg','4');
$redis->rawCommand('xadd','mq','*','msg','5');
//创建消费组mqGroup 为消息队列 mq 从第一条开始消费
$redis->rawCommand('xgroup','create','mq','mqGroup','0');
//从从尾部开始消费
//$redis->rawCommand('xgroup','create','mq','mqGroup','$');
//消费者A,消费第1条
$redis->rawCommand('xreadgroup','group', 'mqGroup', 'consumerA', 'count', '1' ,'streams', 'mq', '>');
//消费者A,消费第2条
$redis->rawCommand('xreadgroup','group', 'mqGroup', 'consumerA', 'count', '1' ,'streams', 'mq', '>');
//消费者B,消费第3条
$redis->rawCommand('xreadgroup','group', 'mqGroup', 'consumerB', 'count', '1' ,'streams', 'mq', '>');
//消费者A,消费第4条
$redis->rawCommand('xreadgroup','group', 'mqGroup', 'consumerA', 'count', '1' ,'streams', 'mq', '>');
//消费者c,消费第5条
$res = $redis->rawCommand('xreadgroup','group', 'mqGroup', 'consumerC', 'count', '1' ,'streams', 'mq', '>');
//获取strarm信息
$res = $redis->rawCommand('xinfo','stream','mq');
echo "<pre>";
print_r($res);
echo '<br />';
//获取strarm消费组信息
$res = $redis->rawCommand('xinfo','groups','mq');
print_r($res);
echo '<br />';
//同一个消费组有多个消费者,观察每个消费者的状态
$res = $redis->rawCommand('xinfo','consumers','mq','mqGroup');
print_r($res);
echo '<br />';
//mpGroup的Pending等待列表情况 + 0 10
//使用 -:start +:end 10:count 选项可以获取详细信息
//$res = $redis->rawCommand('xpending','mq','mqGroup');
//$res = $redis->rawCommand('xpending','mq','mqGroup','-','+','10');
$res = $redis->rawCommand('xpending','mq','mqGroup','-','+','10','consumerA');
print_r($res);
echo '<br />';
//通知消息处理结束,用消息ID标识
$msg_id = $res[0][0];
$res = $redis->rawCommand('xack','mq','mqGroup',$msg_id);
print_r($res);
echo '<br />';
//再次查看Pending列表
$res = $redis->rawCommand('xpending','mq','mqGroup','-','+','10','consumerA');
print_r($res);
echo '<br />';
//转移超过36s的消息id到消费者B的Pending列表
$redis->rawCommand('xclaim','mq','mqGroup','consumerB','36000',$msg_id);
网友评论