美文网首页
PHP 操作 Redis Stream 消息队列

PHP 操作 Redis Stream 消息队列

作者: 王宣成 | 来源:发表于2020-07-11 01:18 被阅读0次

需要redis5.0 ,借鉴kafka,消息可持久化

<?php 

    //连接reids
    $redis = new Redis();
    $redis->connect('127.0.0.1', 6379);

    //xadd:追加消息
    //xdel:删除消息,删除标志位,不影响消息总长度
    //xrange:消息列表,过滤已删除的消息
    //xlen:消息长度
    //del: 删除所有消息

    $redis->rawCommand('del','codehole');

    // 星号表示自动生成id,后面参数key,value
    $redis->rawCommand('xadd','codehole','*','name','user1','age','20');
    $redis->rawCommand('xadd','codehole','*','name','user2','age','18');
    $redis->rawCommand('xadd','codehole','*','name','user3','age','19');
    $redis->rawCommand('xadd','codehole','*','name','user4','age','19');

    //maxlen 定长长度,将老消息干掉,确保链表不会超过指定长度
    $redis->rawCommand('xadd','codehole','maxlen','3','*','name','user5','age','19');

    //XDEL codehole  id
    //$redis->rawCommand('xdel','codehole','1538561700640-0');

    $res = $redis->rawCommand('xlen','codehole');
    echo "<pre>";
    var_dump($res);
    echo '<br />';

    // -最小值 +最大值
    $res = $redis->rawCommand('xrange','codehole','-','+');
    print_r($res);
    echo '<br />';

    $id = $res[1][0];

    // 指定最小消息列表
    $res = $redis->rawCommand('xrange','codehole',$id,'+');
    // var_dump($res);
    // echo '<br />';

    // 指定最大消息列表
    $res = $redis->rawCommand('xrange','codehole','-',$id);
    // var_dump($res);
    // echo '<br />';
    
    // 指定最大消息列表
    $res = $redis->rawCommand('xrange','codehole','-',$id);
    // var_dump($res);
    // echo '<br />';

    /************************独立消费************************/

    //从stream中头部读取两条消息
    $res = $redis->rawCommand('xread','count','2','streams','codehole','0-0');
    // var_dump($res);
    // echo '<br />';

    //从尾部读取一条消息,这里不会返回任何消息
    $res = $redis->rawCommand('xread','count','1','streams','codehole','$');
    // var_dump($res);
    // echo '<br />';

    //block 0 表示永久阻塞,直到消息到来,block 1000表示阻塞1秒,如果1秒没新消息,返回null
    //从尾部阻塞等待消息到来,然后新开一个窗口塞消息,这时候阻塞解除返回新消息内容
    // $res = $redis->rawCommand('xread','block','0','count','1','streams','codehole','$');
    // var_dump($res);
    // echo '<br />';


    /************************消费组************************/


  // 星号表示自动生成id,后面参数key,value
    $redis->rawCommand('xadd','mq','*','msg','1');
    $redis->rawCommand('xadd','mq','*','msg','2');
    $redis->rawCommand('xadd','mq','*','msg','3');
    $redis->rawCommand('xadd','mq','*','msg','4');
    $redis->rawCommand('xadd','mq','*','msg','5');
    

    //创建消费组mqGroup  为消息队列 mq 从第一条开始消费
    $redis->rawCommand('xgroup','create','mq','mqGroup','0');

    //从从尾部开始消费
    //$redis->rawCommand('xgroup','create','mq','mqGroup','$');

    //消费者A,消费第1条
    $redis->rawCommand('xreadgroup','group', 'mqGroup', 'consumerA', 'count', '1' ,'streams', 'mq', '>');

    //消费者A,消费第2条
    $redis->rawCommand('xreadgroup','group', 'mqGroup', 'consumerA', 'count', '1' ,'streams', 'mq', '>');

    //消费者B,消费第3条
    $redis->rawCommand('xreadgroup','group', 'mqGroup', 'consumerB', 'count', '1' ,'streams', 'mq', '>');

    //消费者A,消费第4条
    $redis->rawCommand('xreadgroup','group', 'mqGroup', 'consumerA', 'count', '1' ,'streams', 'mq', '>');

    //消费者c,消费第5条
    $res = $redis->rawCommand('xreadgroup','group', 'mqGroup', 'consumerC', 'count', '1' ,'streams', 'mq', '>');

    //获取strarm信息
    $res = $redis->rawCommand('xinfo','stream','mq');
    echo "<pre>";
    print_r($res);
    echo '<br />';

    //获取strarm消费组信息
    $res = $redis->rawCommand('xinfo','groups','mq');
    print_r($res);
    echo '<br />';

    //同一个消费组有多个消费者,观察每个消费者的状态
    $res = $redis->rawCommand('xinfo','consumers','mq','mqGroup');
    print_r($res);
    echo '<br />';

    //mpGroup的Pending等待列表情况  + 0 10
    //使用 -:start +:end 10:count 选项可以获取详细信息
    //$res = $redis->rawCommand('xpending','mq','mqGroup');
    //$res = $redis->rawCommand('xpending','mq','mqGroup','-','+','10');
    $res = $redis->rawCommand('xpending','mq','mqGroup','-','+','10','consumerA');
    print_r($res);
    echo '<br />';

    //通知消息处理结束,用消息ID标识
    $msg_id = $res[0][0];
    $res = $redis->rawCommand('xack','mq','mqGroup',$msg_id);
    print_r($res);
    echo '<br />';

    //再次查看Pending列表
    $res = $redis->rawCommand('xpending','mq','mqGroup','-','+','10','consumerA');
    print_r($res);
    echo '<br />';

    //转移超过36s的消息id到消费者B的Pending列表
    $redis->rawCommand('xclaim','mq','mqGroup','consumerB','36000',$msg_id);


       

相关文章

网友评论

      本文标题:PHP 操作 Redis Stream 消息队列

      本文链接:https://www.haomeiwen.com/subject/bgcbcktx.html