MGET源码
public RedisFuture<List<KeyValue<K, V>>> mget(Iterable<K> keys) {
//获取分区slot和key的映射关系
Map<Integer, List<K>> partitioned = SlotHash.partition(codec, keys);
//如果分区数小于2也就是只有一个分区即所有key都落在一个分区就直接获取
if (partitioned.size() < 2) {
return super.mget(keys);
}
//每个key与slot映射关系
Map<K, Integer> slots = SlotHash.getSlots(partitioned);
Map<Integer, RedisFuture<List<KeyValue<K, V>>>> executions = new HashMap<>();
//遍历分片信息,逐个发送
for (Map.Entry<Integer, List<K>> entry : partitioned.entrySet()) {
RedisFuture<List<KeyValue<K, V>>> mget = super.mget(entry.getValue());
executions.put(entry.getKey(), mget);
}
// restore order of key 恢复key的顺序
return new PipelinedRedisFuture<>(executions, objectPipelinedRedisFuture -> {
List<KeyValue<K, V>> result = new ArrayList<>();
for (K opKey : keys) {
int slot = slots.get(opKey);
int position = partitioned.get(slot).indexOf(opKey);
RedisFuture<List<KeyValue<K, V>>> listRedisFuture = executions.get(slot);
result.add(MultiNodeExecution.execute(() -> listRedisFuture.get().get(position)));
}
return result;
});
}
整个mget操作其实分为了以下几步:
1、 获取分区slot和key的映射关系,遍历出所需key对应的每个分区slot。
2、 判定,slot个数是不是小于2,也就是是否所有的key都在同一分区,如果是,发起一次mget命令,直接获取。
3、 如果分区数量大于2,keys对应多个分区,那么遍历所有分区,分别向redis发起mget请求获取数据。
4、 等待所有请求执行结束,重新组装结果数据,保持跟入参key的顺序一致,然后返回结果。
可以看到,当使用mget方法获取多个key,并且这些key还存在于不同的slot分区中,那么一次mget操作其实会对redis发起多次mget命令的请求,有多少个slot,就发起多少次,然后在所有请求执行完毕之后,整个mget方法才会能够继续执行下去。看似一次mget方法调用,其实底层对应的是多次redis调用和多次io交互。
解决办法
方案1 - hashtag
hashtag 强制将key放在一个redis node上。这个方案,相当于将redis集群退化成了单机redis,系统的高可用,容灾能力就大打折扣了,只能尝试使用主从,哨兵等其他分布式架构来缓减,但是,既然选择了集群,肯定集群模式是相比于其他模式是最符合当前系统架构现状的,使用这种方案,可能会引发更大的问题。不推荐。
方案2 -并发调用
我们从上面的代码中可以看到,for循环内多次串行的redis调用,是导致执行rt上涨的原因,那么,自然而然可以想到,是否可以用并行替代底层串行的逻辑。也就是将mget中的keys,根据slot分片规则,先groupBy一下,然后用多线程的方式并行执行。
首先,这个方案中,用于并发调用提交redis mget任务的线程池的设计非常重要,各种参数的调校,势必需要非常充分的压测,这本身难度就比较大。其次,我们在日常使用中,一次mget的key基本上在几十到100,相比于redis 16384的固定槽位数量,是数量级上的差距,所以,我们一次请求的这些key,基本上是分布在不同的slot中的,换句话讲,如果按照这么拆分keys,大概率是相当于拆出了等于key数量的get请求。。也就丧失了mget的意义。
实践
func test16() {
_, err := redisCli.Do("SET", "aa", "wd")
if err != nil {
fmt.Println("redis set error:", err)
return
}
var keys []interface{}
keys = append(keys, "dd", "aa")
redisData, err := redis.Strings(redisCli.Do("MGET", keys...))
if err != nil {
fmt.Println(err)
return
}
fmt.Println(len(redisData))
for _, info := range redisData {
if info == "" {
fmt.Println("空字符串")
continue
}
fmt.Printf("info:%+v\n", info)
}
}
1、set aa=wd
2、mget dd和aa
3、打印结果,发现结果数组和key的个数一致,也就是每个key的结果一定会返回
会不会有部分成功,部分失败的情况?
func Strings(reply interface{}, err error) ([]string, error) {
var result []string
err = sliceHelper(reply, err, "Strings", func(n int) { result = make([]string, n) }, func(i int, v interface{}) error {
switch v := v.(type) {
case string:
result[i] = v
return nil
case []byte:
result[i] = string(v)
return nil
default:
return fmt.Errorf("redigo: unexpected element type for Strings, got type %T", v)
}
})
return result, err
}
将所有返回结果转成字符串数组,如果redis返回的结果为空,则数组中默认为空字符串,若其中一个失败,则整个函数返回失败,可认为是全部成功或者全部失败
网友评论