MGET

作者: 彳亍口巴 | 来源:发表于2022-11-21 19:15 被阅读0次

    MGET源码

    public RedisFuture<List<KeyValue<K, V>>> mget(Iterable<K> keys) {
            //获取分区slot和key的映射关系
            Map<Integer, List<K>> partitioned = SlotHash.partition(codec, keys);
     
            //如果分区数小于2也就是只有一个分区即所有key都落在一个分区就直接获取
            if (partitioned.size() < 2) {
                return super.mget(keys);
            }
     
            //每个key与slot映射关系
            Map<K, Integer> slots = SlotHash.getSlots(partitioned);
            Map<Integer, RedisFuture<List<KeyValue<K, V>>>> executions = new HashMap<>();
     
            //遍历分片信息,逐个发送
            for (Map.Entry<Integer, List<K>> entry : partitioned.entrySet()) {
                RedisFuture<List<KeyValue<K, V>>> mget = super.mget(entry.getValue());
                executions.put(entry.getKey(), mget);
            }
     
            // restore order of key 恢复key的顺序
            return new PipelinedRedisFuture<>(executions, objectPipelinedRedisFuture -> {
                List<KeyValue<K, V>> result = new ArrayList<>();
                for (K opKey : keys) {
                    int slot = slots.get(opKey);
     
                    int position = partitioned.get(slot).indexOf(opKey);
                    RedisFuture<List<KeyValue<K, V>>> listRedisFuture = executions.get(slot);
                    result.add(MultiNodeExecution.execute(() -> listRedisFuture.get().get(position)));
                }
     
                return result;
            });
    }
    

    整个mget操作其实分为了以下几步:

    1、 获取分区slot和key的映射关系,遍历出所需key对应的每个分区slot。

    2、 判定,slot个数是不是小于2,也就是是否所有的key都在同一分区,如果是,发起一次mget命令,直接获取。

    3、 如果分区数量大于2,keys对应多个分区,那么遍历所有分区,分别向redis发起mget请求获取数据。

    4、 等待所有请求执行结束,重新组装结果数据,保持跟入参key的顺序一致,然后返回结果。

    可以看到,当使用mget方法获取多个key,并且这些key还存在于不同的slot分区中,那么一次mget操作其实会对redis发起多次mget命令的请求,有多少个slot,就发起多少次,然后在所有请求执行完毕之后,整个mget方法才会能够继续执行下去。看似一次mget方法调用,其实底层对应的是多次redis调用和多次io交互。

    解决办法

    方案1 - hashtag
    hashtag 强制将key放在一个redis node上。这个方案,相当于将redis集群退化成了单机redis,系统的高可用,容灾能力就大打折扣了,只能尝试使用主从,哨兵等其他分布式架构来缓减,但是,既然选择了集群,肯定集群模式是相比于其他模式是最符合当前系统架构现状的,使用这种方案,可能会引发更大的问题。不推荐。

    方案2 -并发调用
    我们从上面的代码中可以看到,for循环内多次串行的redis调用,是导致执行rt上涨的原因,那么,自然而然可以想到,是否可以用并行替代底层串行的逻辑。也就是将mget中的keys,根据slot分片规则,先groupBy一下,然后用多线程的方式并行执行。
    首先,这个方案中,用于并发调用提交redis mget任务的线程池的设计非常重要,各种参数的调校,势必需要非常充分的压测,这本身难度就比较大。其次,我们在日常使用中,一次mget的key基本上在几十到100,相比于redis 16384的固定槽位数量,是数量级上的差距,所以,我们一次请求的这些key,基本上是分布在不同的slot中的,换句话讲,如果按照这么拆分keys,大概率是相当于拆出了等于key数量的get请求。。也就丧失了mget的意义。

    实践

    func test16() {
        _, err := redisCli.Do("SET", "aa", "wd")
        if err != nil {
            fmt.Println("redis set error:", err)
            return
        }
        var keys []interface{}
        keys = append(keys, "dd", "aa")
        redisData, err := redis.Strings(redisCli.Do("MGET", keys...))
        if err != nil {
            fmt.Println(err)
            return
        }
        fmt.Println(len(redisData))
        for _, info := range redisData {
            if info == "" {
                fmt.Println("空字符串")
                continue
            }
            fmt.Printf("info:%+v\n", info)
        }
    }
    

    1、set aa=wd
    2、mget dd和aa
    3、打印结果,发现结果数组和key的个数一致,也就是每个key的结果一定会返回

    会不会有部分成功,部分失败的情况?
    func Strings(reply interface{}, err error) ([]string, error) {
        var result []string
        err = sliceHelper(reply, err, "Strings", func(n int) { result = make([]string, n) }, func(i int, v interface{}) error {
            switch v := v.(type) {
            case string:
                result[i] = v
                return nil
            case []byte:
                result[i] = string(v)
                return nil
            default:
                return fmt.Errorf("redigo: unexpected element type for Strings, got type %T", v)
            }
        })
        return result, err
    }
    

    将所有返回结果转成字符串数组,如果redis返回的结果为空,则数组中默认为空字符串,若其中一个失败,则整个函数返回失败,可认为是全部成功或者全部失败

    相关文章

      网友评论

          本文标题:MGET

          本文链接:https://www.haomeiwen.com/subject/dpunxdtx.html