[TOC]
redis 双写实现策略 && hash取模
需求场景
背景
对于redis集群而言,一般业务方使用的时候,会在服务端对key做hash策略,hash算法一般可以分为:一致性hash、hash取模等,当然还有其他常用算法。一致性hash在扩缩容的时候比较麻烦,因此公司层面要求都要使用hash取模,然而,如果当前线上已经是一致性hash,那么要更改hash算法为hash取模,那么我们该如何做?
可能的解决方案
我们的解决方案要能够平滑过渡,不能影响业务正常运行,因此,我们可以通过双写策略来实现,正如我前面的文章《线上redis迁移思路》里面说的一样,双写是万能的。 基于此,我们可以通过先双写,再去掉一致性hash的方案来解决
实现方案
- redis的配置,先使用两套, 一套是原有的一致性hash算法Ketema, 一套是新增的Compat.
- 业务层上做双向方案
从一致性hash过渡为hash取模方式的双写方案
举例说明,假如目前是2个一致性hash节点(实例),那么要调整为2个取模方式节点的步骤大致如下
-
业务上双写一致性hash的2个节点和取模2个节点,此时,取模节点里面的数据是新写的数据,只写不读
-
通过写迁移工具,扫描所有一致性hash的节点的列表(key列表),从一致性hash节点get数据,然后set到取模节点。这种情况理论上会出现瞬间的并发问题(比如get后有新数据,最终set进去老数据,不过只是在瞬间会产生),不过没关系,即便有脏数据(数据不一致),也会再下一步的check工具里面处理好。
-
数据验证和check工具修复
- 这个check的时候,不会有问题,因为check只是check旧的数据,对于新写入的数据都是最新的,因为新旧节点都是双写的
- 曾经刚开始的时候有想过,如果 check的时候产生了新数据怎么办,但是其实是多余的,这个情况是OK的。
-
业务切换读到新的取模节点
- 这个最终都是需要业务层调整代码,使用新的集群或者方案
从一致性hash过渡为hash取模方式的具体实现
如下代码来源于闪聊项目,也是闪聊实际经历过切换方案
-
配置里面, 针对需要进行调整的redis实例,增加新的redis实例配置(取模相关),如下
[redis.gunread_new] shard = "compat" servers = ["192.168.xxx.xxx:6380;;1", "192.168.xxx.xxx:6381;;1"]
-
setupRedis里面增加新的redis实例配置
// 遍历所有redis pool也就是所有redis类别实例 for _, name := range conf.RedisPoolNames { func(instance string) { // 原有的redis实例 clusterConfig.Configs = conf.Redis[instance] if len(clusterConfig.Configs) == 0 { logger.Errorf(nil, "get redis config for %s failed", instance) return } currentCluster := newRedisCluster(instance, clusterConfig) // 同时加载新的redis实例,并通过SetDualWrite赋值给dualWrite. dualInstance := instance + "_new" clusterConfig.Configs = conf.Redis[dualInstance] if len(clusterConfig.Configs) > 0 { dualWriteCluster := newRedisCluster(dualInstance, clusterConfig) currentCluster.SetDualWrite(dualWriteCluster) logger.Infof(nil, "set redis dual write to %v", instance) } redisClusterMap[instance] = currentCluster }(name) }
-
增加开关控制,默认打开双写开关
这点需要重点说明一下,在实际工程应用中,我们的项目可能有部分功能需要再某个版本启用,某个版本弃用;或者某个新增的功能,为了防止异常需要能够有个开关配置,随时可以开启这个功能或者关闭这个功能;或者在流量高峰,我们需要关闭掉或者降级某个功能。诸如这类型的需求,一个比较推荐的做法就是增加开关配置,全局的开关,抽象出一个开关模型出来。如:
type Switch struct { Name string On bool listeners []ChangeListener } func (s *Switch) TurnOn() { s.On = true s.notifyListeners() } func (s *Switch) TurnOff() { s.On = false s.notifyListeners() } var AsyncProcedure = &Switch{Name: "demo.msg.procedure.async", On: true} 当我们打开开关的时候执行 if switches.AsyncProcedure.IsOn() { }
-
client操作的时候redis实例的时候,如写数据的时候,对每一个操作都进行双写处理
func (r *Cluster) ZAdd(key string, scoremembers ...interface{}) (int, error) { if len(scoremembers)%2 != 0 { return 0, fmt.Errorf("zadd for %v expects even number of score members", key) } // 如果双写开关打开,并且有双写的实例,就异步写这个新的实例 if r.dualWrite != nil && r.writeDual { go r.dualWrite.ZAdd(key, scoremembers...) } args := append([]interface{}{key}, scoremembers...) return redis.Int(r.doWrite(r.getClient(key), "ZADD", args...)) }
这样之后就开始了双写,然后需要做的就是check数据
-
做一个check工具
这个要分为两步走,首先,同步老的数据到新的集群里面;同步完之前,要 通过check 工具校验所有数据是否相等,并进行相关补偿调整
-
所有这些步骤搞定后,当check完数据后,我们就可以再在配置里面去掉老一致性hash的配置,只保留新的hash取模的配置
如
[redis.gunread] // 把原有的配置的server地址换为_new的地址 shard = "compat" servers = ["192.168.xxx.xxx:6378;;1", "192.168.xxx.xxx:6379;;1"] [redis.gunread_new] // 去掉这个_new的配置 shard = "compat" servers = ["192.168.xxx.xxx:6380;;1", "192.168.xxx.xxx:6381;;1"]
网友评论