美文网首页程序员
Redis 消息队列的两种实现方式

Redis 消息队列的两种实现方式

作者: Bing的天涯路 | 来源:发表于2019-12-31 13:55 被阅读0次

    索引:
    基于list的实现方式
    基于publish/subscribe
    实战

    消息队列简介

    消息队列:是消息的顺序集合。
    比如网站的PV统计和查看,传统方式就是每个页面发一个AJAX然后mysql给PV+1。用户量非常大的时候,没有办法实时插入PV。
    结合Redis消息队列的实现,也是每个用户访问的时候发送ajax到控制器,这个时候redis每次rpush pvlog,相当于直接往数组后面插入一万个行为,接下来用一个脚本运输处理pvlog,set pv查看时候get pv,如果想处理用户请求时间等等,同样可以这样异步处理。

    常见场景和解决的问题:

    应对流量峰值
    异步消费(不定速的插入,生产和匀速的处理,消费)
    解耦应用(不同来源的生产和同步去向的消费,基于publish/subscribe实现),即消息队列作为消息池,同时往里面写入的可能有多种数据,根据不同的场景来进行消费。
    redis实现消息队列原理
    使用redis实现的最主要优势是简单快捷,性能没有kafka高,但是安装简便,kafka性能高但是比较重,如果消息队列不是很多,比如说一个博客计算pv,那么kafka可能比整个项目还要大。

    实现方式:

    方法一: 基于list的实现方式

    Screen-Shot-2019-04-06-at-7.01.54-PM.png

    核心代码
    没有用消息队列的方式,使用incrBy大概上限在1000万

    <?php
    $redis = new Redis();
    $redis->connect('127.0.0.1', 6380);
    $res->select(0);
    $key = 'pv:index';
    //  看一下是不是没有,如果没有的话就设置成0
    if(false === $redis->get($key))
    {
        $redis->set($key, 0);
    }
    // 如果有了就增加1 注意incrBy的上线大概在1000万
    $redis->incrBy($key,1);
    

    用list来实现

    // 用list消息队列实现
    $key = 'listpv:index';
    $redis->rPush($key, 1);
     
     
    // 后台的cron来实现
    // 先连接redis
    $redis = new Redis();
    $redis->connect('127.0.0.1', 6380);
    $res->select(0);
    $key = 'pv:index';
    // 用一个死循环
    while(true)
    {
        if(false !== $redis->lPop($key))
        {
            $redis->incrBy('pv:index');
        }
    }
     
    // 然后在terminal中无线循环这个脚本
    

    基于list来实现消息队列的特点:

    与水库相似的地方:

    水库的容量决定承载能力 — redis的容量决定业务的承载能力
    每一滴随只可能经过一个闸门 — 每条消息只能被一个消费者消费
    与水库不同的地方:

    水库用于蓄水 — 一般要把消息全部消费掉
    不要的随扔掉 — 处理失败的消息要做容错

    方法二:基于publish/subscribe

    Screen-Shot-2019-04-06-at-7.53.22-PM.png

    频道固定,生产者和消费者不固定,可能一对多,也可能多对一,也可能多对多。
    命令行的实现方式:

    首先起两个redis cli,一个作为订阅,一个作为发布

    订阅者:SUBSCRIBE channel1 channel2

    发布者:
    PUBLISH channel1 helloChannel
    PUBLISH channel2 helloChannel2

    • 如果这个时候发布到一个没有被订阅的channel,那么这条消息就会丢失。
    • 如果有多个订阅了同一个channel,但有信息发布到同一个channel的时候,他们都会受到
      代码实现:

    发布者:

    <?php
    // 发布者
    $redis = new Redis();
    $redis->connect('127.0.0.1', 6380);
     
    $res = $redis->publish('c1','hello c1');
    echo "clients reading c1:{$res}\n";
     
    $res = $redis->publish('c2','hello c2');
    echo "clients reading c2:{$res}\n";
     
    $res = $redis->publish('c3','hello c3');
    echo "clients reading c3:{$res}\n";
    

    监听者:

    <?php
    $redis = new Redis();
    $redis->connect('127.0.0.1', 6380);
    // 超时控制
    $redis->setOption(Redis::OPT_READ_TIMEOUT, -1);
    // 订阅
    $redis->subscribe(['c1','c2'],function(Redis $instance, $channel, $message){
        echo "received message form {$channel} : {$message}\n";
    })
    ```php
    
    ### 实战部分-生成内容页的质量分:
    实现三个功能:
    - 统计首页、列表页、内容页的PV
    - 统计浏览时间超过5s的内容页
    - 内容页的PV+1分,浏览时间超过5s+5分,不超过5秒-1分,生成内容页的质量分
    前端部分:
    ```javascript
    <script>
    // ajax 访问ajax.php,给内容页增加PV
    $get('ajax.php?action=pv&from=article&aid=<?=$aid?>');
     
    // 如果页面打开时间超过5秒,则发出统计
    setTimeout(function(){
        $.get('ajax.php?action=get5&aid=<?=$aid?>');
    },5000);
     
    </script>
    

    后端部分:
    发布的实现:

    <?php
    $action = $_GET['action'];
    $redis = new Redis();
    $redis->connect('127.0.0.1', 6380);
     
    // 首页的PV
    $channelPvIndex = 'pv:index'; 
    //内容页的pv
    $channelPvList ='pv:list'; 
    // 内容页的PV
    $channelPvArticle = 'pv:article'; 
    // 内容页浏览超过5秒
    $channelGT5 = 'gt5:article';
     
     
    if('pv' === $action)
    {
        $from = $_GET['from'];
     
        if('index' === $from)
        {
            $redis->publish($channelPVIndex, 1);
        }
        else if('list' === $from)
        {
            $tid = intval($_GET['tid']);
            $redis->publish($channelPvList, $tid);
        }
        else if('articel' === $_GET['aid'])
        {
            $aid = intval($_GET['aid']);
            $redis->publish($channelPvArticle, $aid);
        }
     
    }
     
    else if('gt5' == $action)
    {
        $aid = intval($GET['aid'])
        {
            $redis->publish($channelGT5, $aid);
        }
    }
     
    else
    {
        // unknown action
    }
    

    订阅的实现:

    <?php
    // 订阅的实现
    $redis = new Redis();
    $redis->connect('127.0.0.1', 6380);
     
    // 首页的PV
    $channelPvIndex = 'pv:index'; 
    //内容页的pv
    $channelPvList ='pv:list'; 
    // 内容页的PV
    $channelPvArticle = 'pv:article'; 
    // 内容页浏览超过5秒
    $channelGT5 = 'gt5:article';
     
    // 频道和PV的key的映射
    $keyMap = [
        $channelPVIndex => 'realtimepv:index',
        $channelPvList => 'realtimepv:list',
        $channelPvArticle => 'realtimepv:article'
    ];
     
    $redis->setOption(Redis::OPT_READ_TIMEOUT, -1);
    $redis->subscribe(
        [$channelPVIndex,$channelPvList,$channelPvArticle,$channelGT5],
        function(Redis $instance, $channel, $message)
    {
        // 注意在subscribe的回调中只能够执行订阅、取消订阅、模式订阅、模式取消订阅,无法执行incrBy
        // 尝试取消订阅命令(证明上面一句话)
        // $instance->unsubscribe([$channelName]);
     
        // 因此要想incryBy只能重新实例化一个redis
        $redis2 = new Redis();
        $redis2->connect('127.0.0.1',6380);
        global $keyMap; //这里可以使用闭包实现
        if(!isset($$keyMap[$channelName]))
        {
            $realTimePvKey = $keyMap[$channelName]; // 映射过来
            $redis2->incrBy($realTimePvKey, 1);
     
        }
    }   
    )
    

    注意在subscribe的回调中只能够执行订阅、取消订阅、模式订阅、模式取消订阅,无法执行incrBy,因此要想incryBy只能重新实例化一个redis

    计算质量分:

    <?php
     
    // 订阅的实现
    $redis = new Redis();
    $redis->connect('127.0.0.1', 6380);
     
    // 内容页的PV
    $channelPvArticle = 'pv:article'; 
    // 内容页浏览超过5秒
    $channelGT5 = 'gt5:article';
     
    $redis->setOption(Redis::OPT_READ_TIMEOUT, -1);
    $redis->subscribe([$channelPVIndex,$channelPvList,$channelPvArticle,$channelGT5],
        function(Redis $instance, $channel, $message){
            /**
             * 使用pv和gt5的数量计算文章的质量分
             * 1. 如何计算? = gt5*6
             * 2. 以什么形式保存? HASH
             * gt5: int
             * score: int
             */
            $redis2 = new Redis();
            $redis2->connect('127.0.0.1',6380);
            global $keyMap; //这里可以使用闭包实现
            if('gt5:article' === $channelName)
            {
                echo "${channelName}\n";
                $key = 'realtimescore:'.intval($message);
                $res = $redis2->hIncrBy($keym 'gt5', 1);
                echo "${channelName}\n";
                if($res)
                {
                    $score = $res * 6;
                    $redis2->hSet($key, 'score', $score);
                    echo "{$score}\n";
                }
                else
                {
                    // 报警
                }
            }
        }
    )
    

    本文作者熊冰,个人网站Bing的天涯路,转载请注明出处。

    相关文章

      网友评论

        本文标题:Redis 消息队列的两种实现方式

        本文链接:https://www.haomeiwen.com/subject/hbmboctx.html