美文网首页
Redis 多消费者模式如何保证消息顺序执行

Redis 多消费者模式如何保证消息顺序执行

作者: 567f84810acc | 来源:发表于2020-01-13 10:31 被阅读0次

    多消费者模式如何保证消息顺序执行

    • 应用场景 : [用户订单更新 创建 ->更新 ->删除]
    • 假设 有3条数据 data1[create] ,data2[update] ,data3[delete] 需要顺序执行
    • 假设 有3个消费者 消费 可能会造成消息顺序错乱的问题 例如 data1->data3->data2 造成数据错乱
    • 示例代码 PHP

    测试数据

    // 分别代表 data 1-3

    {
      "reqId":"0a7c458c-d619-af31-3ffb-f499995eacd5",
      "user_id":1002, 
      "order_id":2302393013,
      "data":{"status":1},
      "q_time":1563978617
    }
    
    {
      "reqId":"6be0c2e3-3514-7d24-0ab6-1e61949a7833",
      "user_id":1002, 
      "order_id":2302393013,
      "data":{"status":2},
      "q_time":1563978618
    }
    
    {
      "reqId":"6843cb1b-ebce-ab10-991c-88a55cd7112d",
      "user_id":1002, 
      "order_id":2302393013,
      "data":{"status":2},
      "q_time":1563978619
    }
    

    分析 :

    • user_id + order_id 该条消息的用户和订单 唯一值
    • reqId 消息唯一ID
    • q_time 消息时间

    多消费 或者 API-> 组装消息

    • 为了保证消息的顺序执行 和 处理消息的吞吐量
     $reqId    = $data['reqId'] ;
     $order_id = $data['order_id'] ;
     $user_id  = $data['user_id'] ;
     $q_time   = $data['q_time'] ;
    
     $job_name = "xxx_job";
     $message_key = "{$user_id}_{$order_id}";
     $hash_order_key = "{$job_name}:{$message_key}";
     
     $redis->hset($hash_order_key,$reqId,json_encode($data)); 
     $redis->zadd($hash_order_key.'_s',$q_time,$reqId);
     // 投递到下级队列
     $res = $redis->setnx("{$job_name}:allow:{$message_key}",1);
     if($res){
              // $message_key  投入下级队列
               $service->xxxJobDistribution($message_key);
     }
     // 可以单独 独立 检测 xxx_job:*的key 批量投递
    
    
    • zadd 使用时间戳作为权重值 保证消息的顺序
    • 同个账户的同一个订单 顺序明确
    • 设置 setnx 不存在才会设置成功 保证下级的多消费中只有一次

    下级队列 xxxjob 消费

     $job_name = "xxx_job";
     $hash_order_key = "{$job_name}:{$message_key}";
    
     $reqIds = $redis->zrange($hash_order_key.'_s',0,-1);
     $MsgLists = $redis->hmget($hash_order_key,$reqIds); 
     
     foreach($MsgLists as $k => $val){
      //do any ...
    
        $redis->hdel($hash_order_key,$reqIds[$k]); //true or false
        $redis->zrem("{$hash_order_key}_s",$reqIds[$k]);
     }
     $redis->del(["{$job_name}:allow:{$message_key}"]);
    
    • 消费完成后删除锁
    • 业务注意 try catch 数据回滚

    DEMO

    <?php
    /**
     * Created by PhpStorm.
     * User: alonexy
     * Date: 19/7/25
     * Time: 18:23
     */
    
    namespace Services;
    
    
    use App\Common\Functions;
    
    class JobSequenceService
    {
        public  $prifix;
        private $redis;
        private static $_instance ;
    
        public function __construct()
        {
            ## redis 自行替换
            $this->redis  = \RedisDB::connection('default');
            $this->prifix = 'job_sequence:';
    
        }
        public static function getInstance()
        {
            if(self::$_instance instanceof self)
            {
                return self::$_instance;
            }
            return self::$_instance = new  self();
        }
    
        private $HandleFunc = null;
    
        /**
         * 数据分组顺序拼接数据
         * @param array $jobData 任务数组数据
         * @param $reqIdKey 消息唯一ID key
         * @param $scoreKey  消息权重 key
         * @param array $groupKeys 消息 分组Key array
         * @param $jobName 使用的job
         * @return array
         */
        public function DataGroupJobSplicing(array $jobData, $reqIdKey, $scoreKey, array $groupKeys, $jobName)
        {
            try {
                $reqId      = $jobData[$reqIdKey];
                $score      = $jobData[$scoreKey];
                $unqiueArrs = [];
                foreach ($groupKeys as $gk) {
                    $unqiueArrs[] = $jobData[$gk];
                }
                $message_key    = implode('_', $unqiueArrs);
                $hash_order_key = $this->prifix."{$jobName}:{$message_key}";
                $res            = $this->addJobData($hash_order_key, $reqId, $jobData, $score);
                return [true, $message_key];
            }
            catch (\RedisException $e) {
                return [false, $e->getMessage()];
            }
        }
    
        /**
         * 添加任务分组数据
         * @param $hash_order_key
         * @param $reqId
         * @param $jobData
         * @param $score
         * @return null
         */
        private function addJobData($hash_order_key, $reqId, $jobData, $score)
        {
            $options = array(
                'cas' => true,
                'retry' => 2,
            );
            $this->redis->transaction(
                $options, function ($tx) use ($hash_order_key, $reqId, $jobData, $score) {
                $tx->multi();   // With CAS, MULTI *must* be explicitly invoked.
                $tx->hset($hash_order_key, $reqId, json_encode($jobData));
                $tx->zadd($hash_order_key . '_s', $score, $reqId);
            });
            return $this->redis->zcard($hash_order_key . '_s');
        }
        public function zpop($key,$num=1)
        {
            $options = array(
                'cas' => true,
                'retry' => 2,
            );
            $limit  = max(0,($num-1));
            $arr = [];
            $this->redis->transaction(
                $options, function ($tx) use ($key,&$arr,$limit) {
                $tx->multi();   // With CAS, MULTI *must* be explicitly invoked.
                $arr = $tx->zrange($key,0,$limit);
    
                if(!empty($arr)){
                    $tx->zrem($key,$arr);
                }
            });
            return $arr;
        }
    
        private function delJobData($hash_order_key, $value)
        {
            $options = array(
                'cas' => true,
                'retry' => 2,
            );
            $res     = $this->redis->transaction(
                $options, function ($tx) use ($hash_order_key, $value) {
                $tx->multi();   // With CAS, MULTI *must* be explicitly invoked.
                $tx->hdel($hash_order_key, $value);
                $tx->zrem("{$hash_order_key}_s", $value);
            });
            return $res;
        }
    
        /**
         * 设置数据处理方法
         * @param $function
         */
        public function SetGroupDataHandleFun($function)
        {
            $this->HandleFunc = $function;
        }
    
        /**
         * 分组数据批量处理
         * @param $jobName
         * @param $messageKey
         * @return array
         * @throws \Exception
         */
        public function GroupDatasHandle($jobName, $messageKey)
        {
            $hash_order_key = $this->prifix."{$jobName}:{$messageKey}";
    
            $reqIds = $this->redis->zrange($hash_order_key . '_s', 0, -1);
            if (empty($reqIds)) {
                return [];
            }
            $MsgLists = $this->redis->hmget($hash_order_key, $reqIds);
            if (is_null($this->HandleFunc)) {
                throw new \Exception("SetHandleFun is nil");
            }
            foreach ($MsgLists as $k => $val) {
                try {
                    call_user_func_array($this->HandleFunc, array(&$val));
                }
                catch (\Exception $e) {
                    throw new \Exception($e->getMessage());
                }
                $this->delJobData($hash_order_key, $reqIds[$k]);
            }
            $this->unlock($jobName, $messageKey);
        }
    
        /**
         * 获取job下分组数据key
         * @param $jobName
         * @return array
         */
        public function getJobGroupKeys($jobName)
        {
            $keys        = "$jobName:*";
            $ks          = $this->redis->keys($this->prifix.$keys);
    
            $msssageKeys = [];
            foreach ($ks as $k) {
                preg_match_all('/' . $this->prifix.$jobName . ':(.*)_s/', $k, $ma);
                if (isset($ma[1][0])) {
                    $mKey = $ma[1][0];
                    if (!$this->is_lock($jobName, $mKey)) {
                        $msssageKeys[] = $mKey;
                    }
                }
            }
            return $msssageKeys;
        }
    
        /**
         * 获取是否存在锁
         * @param $jobName
         * @param $messageKey
         * @return mixed
         */
        public function is_lock($jobName, $messageKey)
        {
            return $this->redis->get($this->prifix."{$jobName}:lock:{$messageKey}");
        }
    
        /**
         * 任务处理时 锁
         * @param $jobName
         * @param $messageKey
         * @return mixed
         */
        public function lock($jobName, $messageKey,$expTime=3000)
        {
            $key = $this->prifix."{$jobName}:lock:{$messageKey}";
            $isLock = $this->redis->setnx($key,time()+$expTime);
            if($isLock)
            {
                return true;
            }
            else
            {
                //加锁失败的情况下。判断锁是否已经存在,如果锁存在切已经过期,那么删除锁。进行重新加锁
                $val = $this->redis->get($key);
                if($val&&$val<time())
                {
                    $this->redis->del($key);
                }
                return  $this->redis->setnx($key,time()+$expTime);
            }
        }
        /**
         * 处理完成后删除 锁
         * @param $jobName
         * @param $messageKey
         * @return mixed
         */
        public function unlock($jobName, $messageKey)
        {
             return $this->redis->del([$this->prifix."{$jobName}:lock:{$messageKey}"]);
        }
    
        /**
         * 获取请求ID
         * @return string
         */
        public function getReqId()
        {
            $date = date('Y-m-d');
            return $this->redis->incr($this->prifix."job_req_id:{$date}") . '_' . Functions::uuids();
        }
    }
    

    使用

           $jobData1 = [
                'reqId'=>'0a7c458c-d619-af31-3ffb-f499995eacd5',
                'user_id'=>'1002',
                'order_id'=>'232323',
                'data'=>[
                    'status'=>1
                ],
                'reqTime'=>1563978617
            ];
            $jobData2 = [
                'reqId'=>'000001-d619-af31-3ffb-f499995eacd5',
                'user_id'=>'1002',
                'order_id'=>'232323',
                'data'=>[
                    'status'=>2
                ],
                'reqTime'=>1563978618
            ];
            $jobData3 = [
                'reqId'=>'000002-d619-af31-3ffb-f499995eacd5',
                'user_id'=>'1002',
                'order_id'=>'232323',
                'data'=>[
                    'status'=>3
                ],
                'reqTime'=>1563978619
            ];
        
            $service = JobSequenceService::getInstance();;
            // 插入数据
           $service->DataGroupJobSplicing($jobData1,'reqId','reqTime',['user_id','order_id'],'xxxjob');
           $service->DataGroupJobSplicing($jobData3,'reqId','reqTime',['user_id','order_id'],'xxxjob');
           $service->DataGroupJobSplicing($jobData2,'reqId','reqTime',['user_id','order_id'],'xxxjob');
    
           //消费数据
           $service->lock('xxxjob','1002_232323'); //lock;
           $service->SetGroupDataHandleFun(function($data){
              try{
    //               dump($data);
                  $orderData = \GuzzleHttp\json_decode($data,1);
                  unset($orderData['reqId']);
                  unset($orderData['reqTime']);
                  dump($orderData);
                  sleep(10);
    //               throw new \Exception("test");
              }catch (\Exception $e){
                  //rollback data
                  throw new \Exception($e->getMessage());
              }
           });
           dump($service->GroupDatasHandle('xxxjob','1002_232323'));
           
           dd($service->getJobGroupKeys('xxxjob'));
    

    来源 :www.alonexy.com

    相关文章

      网友评论

          本文标题:Redis 多消费者模式如何保证消息顺序执行

          本文链接:https://www.haomeiwen.com/subject/jsegactx.html