在大数据时代,互联网应用系统会产生海量的数据。由于我们无法只用一台机器缓存大量的数据,所以需要采取分布式缓存的方式。那么我们又该怎样决定哪些数据缓存到哪台机器上呢?常见的分配策略有以下几种:
策略 | 规则 | 缺点 |
---|---|---|
随机访问策略 | 随机访问 | 负载压力不均衡 |
轮询策略 | 轮流访问 | 忽视节点性能差异 |
权重轮询策略 | 根据权重值轮流访问 | 静态配置权重值,无法动态调节 |
Hash 取模策略 | 计算数据的哈希值,用节点个数对哈希值取模 | 增加和缩减节点,会影响到大量数据 |
一致性 Hash 算法
上述的分配策略都有各自的缺点,在实际工作中,我们用得最多的是 Hash 取模策略。Hash 取模策略最大的问题是,当我们需要增加(数据增多)或缩减节点(机器故障)时,数据的存储位置要通过重新计算数据的哈希值取模系统机器个数得出,然后将数据移动到新的节点上面。在移动的过程中,缓存系统的可用性会大大下降,还会对数据库突然造成很大的访问压力。
为了解决 Hash 取模策略的缺点,后来又出现了一致性 Hash 算法。一致性 Hash 算法也是使用取模的方法,但不同于 Hash 取模策略对机器个数进行取模,它是对整个哈希值空间取模。
算法步骤
一致性 Hash 算法将整个哈希值空间组织成一个虚拟的圆环,如假设某哈希函数H的值空间为 0 ~ 2^32-1(即哈希值是一个32位无符号整型)。 一致性 Hash 算法的步骤:
- 求出(节点)的哈希值,并将其配置到 0 ~ 2^32-1 的虚拟圆上。
- 采用同样的方法求出存储数据的键的哈希值,并映射到相同的虚拟圆上。
- 从数据映射到的位置开始顺时针查找,将数据保存到找到的第一个节点上。如果超过 2^32 仍然找不到,就会保存到第一台机器上。
![](https://img.haomeiwen.com/i6640927/76c59998b5312207.png)
如下图添加一台机器(node5)时,并没有影响到所有的数据,只有node4之前的节点node2到node5之间的数据受影响。
![](https://img.haomeiwen.com/i6640927/6d0db8e958a5b427.png)
数据倾斜
一致性 Hash 算法在节点较少时,容易因为节点分布不均匀而造成数据倾斜问题(节点集中分布左半边或右半边圆环上)。
为了解决上面出现的数据倾斜问题,一致性 Hash 算法引入了虚拟节点机制,即对每一个节点计算多个哈希,每个计算结果位置都放置一个此节点,称为虚拟节点。
![](https://img.haomeiwen.com/i6640927/10da852912bc738e.png)
Go 语言实现一致性 Hash 算法
import (
"errors"
"hash/crc32"
"sort"
"strconv"
"sync"
)
//声明新切片类型
type units []uint32
//返回切片长度
func (x units) Len() int {
return len(x)
}
//比对两个数大小
func (x units) Less(i, j int) bool {
return x[i] < x[j]
}
//切片中两个值的交换
func (x units) Swap(i, j int) {
x[i], x[j] = x[j], x[i]
}
//当hash环上没有数据时,提示错误
var errEmpty = errors.New("Hash 环没有数据")
//创建结构体保存一致性hash信息
type Consistent struct {
//hash环,key为哈希值,值存放节点的信息
circle map[uint32]string
//已经排序的节点hash切片
sortedHashes units
//虚拟节点个数,用来增加hash的平衡性
VirtualNode int
//map 读写锁
sync.RWMutex
}
//创建一致性hash算法结构体,设置默认节点数量
func NewConsistent() *Consistent {
return &Consistent{
//初始化变量
circle: make(map[uint32]string),
//设置虚拟节点个数
VirtualNode: 20,
}
}
//自动生成key值
func (c *Consistent) generateKey(element string, index int) string {
//副本key生成逻辑
return element + strconv.Itoa(index)
}
//获取hash位置
func (c *Consistent) hashkey(key string) uint32 {
if len(key) < 64 {
//声明一个数组长度为64
var srcatch [64]byte
//拷贝数据到数组中
copy(srcatch[:], key)
//使用IEEE 多项式返回数据的CRC-32校验和
return crc32.ChecksumIEEE(srcatch[:len(key)])
}
return crc32.ChecksumIEEE([]byte(key))
}
//更新排序,方便查找
func (c *Consistent) updateSortedHashes() {
hashes := c.sortedHashes[:0]
//判断切片容量,是否过大,如果过大则重置
if cap(c.sortedHashes)/(c.VirtualNode*4) > len(c.circle) {
hashes = nil
}
//添加hashes
for k := range c.circle {
hashes = append(hashes, k)
}
//对所有节点hash值进行排序,
//方便之后进行二分查找
sort.Sort(hashes)
//重新赋值
c.sortedHashes = hashes
}
//向hash环中添加节点
func (c *Consistent) Add(element string) {
//加锁
c.Lock()
//解锁
defer c.Unlock()
c.add(element)
}
//添加节点
func (c *Consistent) add(element string) {
//循环虚拟节点,设置副本
for i := 0; i < c.VirtualNode; i++ {
//根据生成的节点添加到hash环中
c.circle[c.hashkey(c.generateKey(element, i))] = element
}
//更新排序
c.updateSortedHashes()
}
//删除节点
func (c *Consistent) remove(element string) {
for i := 0; i < c.VirtualNode; i++ {
delete(c.circle, c.hashkey(c.generateKey(element, i)))
}
c.updateSortedHashes()
}
//删除一个节点
func (c *Consistent) Remove(element string) {
c.Lock()
defer c.Unlock()
c.remove(element)
}
//顺时针查找最近的节点
func (c *Consistent) search(key uint32) int {
//查找算法
f := func(x int) bool {
return c.sortedHashes[x] > key
}
//使用"二分查找"算法来搜索指定切片满足条件的最小值
i := sort.Search(len(c.sortedHashes), f)
//如果超出范围则设置i=0
if i >= len(c.sortedHashes) {
i = 0
}
return i
}
//根据数据标示获取最近的服务器节点信息
func (c *Consistent) Get(name string) (string, error) {
//添加锁
c.RLock()
//解锁
defer c.RUnlock()
//如果为零则返回错误
if len(c.circle) == 0 {
return "", errEmpty
}
//计算hash值
key := c.hashkey(name)
i := c.search(key)
return c.circle[c.sortedHashes[i]], nil
}
参考:
分布式存储--一致性哈希
网友评论