美文网首页
Consistent Hashing

Consistent Hashing

作者: JunChow520 | 来源:发表于2021-03-22 19:09 被阅读0次

    哈希

    哈希的本质是一个长度可变的数组,哈希的时间复杂度为O(1)。

    哈希为什么不用在数组中遍历呢?其他类型的数据查找都需要遍历,即便是树、二叉树也要经过几次比对,才能判定查找对象的位置,时间复杂度为O(Log(n))。原因在于哈希由存储的对象本身的哈希值(HashCode)以及数组长度来决定在数组中的位置。计算出这个位置(哈希值)在各个版本中是不同的。

    集群

    一致性哈希(Consistent Hashing)是一种特殊的哈希算法,可用于解决分布式缓存问题,一致性哈希解决了简单哈希算法在分布式哈希表(Distributed Hash Table,DHT)中存在动态伸缩的问题。在添加或移除一个节点服务器时,能够尽可能小的改变已存在的服务请求与处理请求服务器之间的映射关系,尽可能满足单调性的要求。

    在普通分布式集群中,服务请求与处理请求服务器之间可以一一对应,也就是说固定服务请求与处理服务器之间的映射关系,某个请求由固定的服务器去处理。这种方式无法对整个系统进行负载均衡,可能会造成某些服务器过于繁忙以至于无法处理新来的请求。而另一些服务器则过于空闲,整体系统的资源利用率低,并且在当分布式集群中某个服务宕机,会直到导致某些服务请求无法处理。

    改进可利用哈希算法对服务请求与处理服务器之间的关系进行映射,以达到动态分配的目的。通过哈希算法对服务请求进行转换,转换后的结果对服务器节点值进行取模运算,取模后的值就是服务请求对应的请求处理服务器。这种方式可以对应节点失效的情况,当某个分布式集群节点宕机,服务请求可以通过哈希算法重新分配到其他可用的服务器上,避免无法处理请求的状况出现。

    原理

    一致性哈希算法是当前较主流的分布式哈希表(Distributed Hash Table,DHT)协议之一,它是对简单哈希算法进行了修正,解决了热点(hotpot)问题。

    一致性哈希算法原理分为两步

    1. 对存储节点的哈希值进行计算,将其存储空间抽象为一个环(哈希环,HashRing),将存储节点配置到环上。环上所有的节点都具有一个哈希值。
    2. 对数据进行哈希计算,按顺时针方向将其映射到离最近的节点上去。

    当集群中有服务器节点出现故障或离线时,按照一致性哈希算法的映射方法,受影响的仅仅为环上故障点开始逆时针至下一个节点之间区间的数据对象,而这些对象本身就是映射到故障节点之上的。

    一致性哈希

    示例

    例如:假如具有N台缓存服务器,常用的缓存负载均衡策略是对资源key的请求使用hash(key) = key mod N来映射到一台服务器上。

    如果增加或减少一台服务器,此时N会变为N+1或N-1,这样有可能会改变所有资源对应的哈希值,导致服务期间大量的数据迁移,从而造成服务器不稳定。

    这种使用槽映射的方式有一个缺点是所有节点都需要知道槽与节点对应关系,如果客户端不存槽与节点对应的关系,客户端就需要实现重定向的逻辑。也就是说所有的缓存都失效了,使得缓存服务器大量集中地向原始数据服务器请求更新缓存。

    因此需要一种哈希算法来避免这样的问题,新的哈希算法需要满足:

    • 新增或减少一台服务器以后不会造成大面积的访问错误
    • 同一资源仍然会映射到之前同样的服务器上
    • 新增一台服务器时,新增的服务器需要尽量地分担存储其他服务器的缓存资源。
    • 减少一台服务器,其他服务器也可以尽量分担存储它的缓存资源。
    ins := web.NewConsistentHash(4, nil)
    ins.Add("127.0.0.1:9091", "127.0.0.1:9092", "127.0.0.1:9093", "127.0.0.1:9094")
    addr := ins.Get("uid:15")
    t.Log(addr)
    

    特点

    一致性哈希算法的特点

    • 均衡性(Balance)
      均衡性主要是通过算法分配,集群中各节点应该要尽可能均衡。
    • 单调性(Monotonicity)
      单调性主要是指当集群发生变化时,已经分配到的老节点的key尽可能继续分配到之前的节点,防止大量数据迁移。
    • 分散性(Spread)
      分散性主要针对同一个key,当在不同的客户端操作时可能存在客户端获取到的缓存集群的数量不一致,从而导致将key映射到不同节点的问题,这会引起数据的不一致性。
    • 负载(Load)
      负载主要针对一个缓存而言,同一缓存有可能会被用户映射到不同的key上,从而导致该缓存的状态不一致。

    相关

    分布式系统中对象与接口的映射关系

    • 硬哈希算法

    传统方案使用对象的哈希值,即对节点个数取模后再映射到相应编号的节点,这种方案缺陷在于当节点个数增减变动时,绝大多数对象的映射关系会失效而需要数据迁移。

    分布式系统中,假设有n个节点,传统方案使用mod(key, n)映射数据到节点。当节点扩容或缩容时(哪怕只是增减1个节点),映射关系变为mode(key, n + 1)mod(key, n - 1),绝大多数原来的数据映射关系都会失效。

    • 一致性哈希算法(Consistent Hashing)

    当节点个数变动时映射关系失效的对象非常少,迁移成本也非常小。

    1997年,MIT麻省理工学院的David Karger等6个人发布学术论文 《Consistent hashing and random trees:distributed caching protocots for relieving hot spots on the World Wide Web》(一致性哈希和随机数:用于缓解万维网上热点的分布式缓存协议)。

    对于K个关键字和n个槽位(分布式系统中的节点)的哈希表,增减槽位后,平均只需对K/n个关键字重新映射。


    分布式集群中缓存类数据key节点分配问题的解决方案

    • 哈希取模

    哈希取模均衡性没有什么问题,如果集群中新增一个节点,将会有N/(N+1)的数据实效,当N越大失效率越高,这显然是不可接受的。

    • 槽映射(slots)

    Redis采用的槽映射,槽映射核心思想是将key值作一定运算,比如crc16crc32hash,以获得一个整数值。再将该整数值与固定的槽数取模,每个节点处理固定的槽数slots

    获取key所在节点时,先要计算出key与槽的对应关系,再通过槽与节点的对应关系找到节点。每次增减节点时,只需要迁移一定槽对应的key即可,而不迁移的槽点key值不会失效。这种方式将失效率降低到1/(N+1)

    槽映射缺点是,所有节点都需要知道槽与节点之间的对应关系,如果客户端不保存槽与节点的对应关系,则需要实现重定向的逻辑。

    • 一致性哈希

    一致性哈希新增一个节点的失效率仅为1/(N+1),通过一致性哈希最大程度的降低了失效率。同时相比于槽映射的方式,不需要引入槽来做中间对应,最大限度的简化了实现。

    哈希环

    一致性哈希算法的核心思想是将keyhash运算得到一个0到(2^32-1)之间的整数值,这个整数值表示key在哈希环上的位置。

    1. 将代表服务器的key做哈希运算得到服务器节点shard在哈希环上的位置
    2. 将缓存的key使用同样的方式计算出在哈希环上的位置

    在哈希环(HashRing)按顺时针方向找到第一个大于等于缓存key的服务器节点(shard)的位置,从而得到该key需要分配的服务器地址。

    一致性算法

    虚拟节点

    • 虚拟节点提高均衡性

    哈希算法并不能保证100%的平衡性,如果实际节点过少,对象不能均匀地映射到节点上。为了解决这个问题,一致性哈希算法引入虚拟节点的概念。

    由于实际节点过少存在某些节点所在位置周围存在大量的哈希点,从而导致分配到这些节点到key要比其他节点多的多,这样会导致集群中各节点负载不均衡。为解决这个问题,引入虚拟节点,即一个实际节点对应多个虚拟节点。缓存的key作映射时先找到对应的虚拟节点再对应到实际节点。

    虚拟节点是实际节点在哈希空间中的复制品(replica),实际节点对应了若干个虚拟节点,每个实际节点对应的虚拟节点个数称为复制个数,虚拟节点在哈希空间以哈希值排列。

    引入虚拟节点之后,映射关系就从 “对象->节点”变成了“对象->虚拟节点”,虚拟节点的哈希计算可以采用IP加数字后缀的方式。

    算法

    使用Go实现一致性哈希

    • 按常用哈希算法将对应的键hash到一个具有2^32的槽位空间中,将数字头尾相连形成一个闭合环形。

    例如:Redis中会把缓存的key分配到16384个槽位(slot)中

    • 把分布式集群服务器节点对象映射到环形空间

    • 把待缓存数据通过哈希算法处理后映射到环形空间

    每个缓存key都可以通过哈希算法转换为一个32位的二进制数,对应环形空间中某个缓冲区,把所有的缓存key映射到环形空间的不同位置。

    • 将机器通过哈希算法映射到环上

    每个缓存节点(Shard)遵循相同的哈希算法,比如利用IP做哈希映射到环形空间。

    一致性哈希算法常用于分布式分片数据存储系统的key路由设计,把数据key均匀的隐射到集群中的各个存储节点上。

    采用一致性哈希算法优势

    • key均匀的映射到集群中的各个存储节点
    • 当集群中增加和删除节点时,只会影响一部分原先映射到它的相邻节点上的key,不会导致集群中大量的key失效。

    流程

    1. 哈希环

    使用普通哈希算法转换为一个32位的二进制数,生成一个具有2^32个哈希槽位(hash slots)的环形哈希空间。

    var slot uint32 = crc32.ChecksumIEEE([]byte(str))
    
    哈希环
    1. 将分布式集群服务器节点通过哈希算法映射到哈希环上

    对节点的IP或唯一主机名做哈希计算得到一个0到2^32-1之间的散列值(Hash Value),然后映射到哈希环上。

    addr = "192.168.1.1:8080"
    var node uint32 = crc32.ChecksumIEEE([]byte(addr))
    
    节点映射
    1. 把待缓存数据的key映射到哈希环上

    在确定数据的读取或写入节点时,对数据的key进行哈希运算得到一个在0到2^32-1之间的散列值,然后映射到哈希环上。

    key : = "xxx"
    var data uint32 = crc32.ChecksumIEEE([]byte(key))
    
    键映射
    1. 顺时针查找数据对应的节点

    确定了数据在哈希环上的位置后,按顺时针方向在哈希上遇到的第一个节点,即作为写入或读取数据的节点。

    index  := sort.Search(len(HashRing), func(i int) bool {
      return HashRing[i] >= hashvalue
    })
    

    实现1

    $ vim ./web/consistent_hash.go
    
    package web
    
    import (
        "hash/crc32"
        "sort"
        "strconv"
    )
    
    //UInt32Slice 哈希环
    type UInt32Slice []uint32
    
    func (u UInt32Slice) Len() int {
        return len(u)
    }
    func (u UInt32Slice) Less(i, j int) bool {
        return u[i] < u[j]
    }
    func (u UInt32Slice) Swap(i, j int) {
        u[i], u[j] = u[j], u[i]
    }
    
    //HashFunc 哈桑算法
    type HashFunc func(bytes []byte) uint32
    
    //ConsistentHash 一致性哈希数据结构
    type ConsistentHash struct {
        hash     HashFunc          //哈希算法生成节点槽位
        replicas int               //虚拟节点复制数量
        nodes    UInt32Slice       //已排序的节点哈希切片
        kv       map[uint32]string //节点哈希和键的map[node]addr,键是哈希值,值为节点
    }
    
    //NewConsistentHash 创建一致性哈希实例
    func NewConsistentHash(replicas int, fn HashFunc) *ConsistentHash {
        ins := &ConsistentHash{
            replicas: replicas,
            hash:     fn,
            kv:       make(map[uint32]string),
        }
        //默认使用CRC32算法作为哈希算法
        if ins.hash == nil {
            ins.hash = crc32.ChecksumIEEE
        }
        return ins
    }
    
    //IsEmpty 判断节点是否为空
    func (h *ConsistentHash) IsEmpty() bool {
        return len(h.nodes) == 0
    }
    
    //Add 添加节点
    func (h *ConsistentHash) Add(addrs ...string) {
        for _, addr := range addrs {
            //遍历复制因子计算所有虚拟节点的哈希值
            for i := 0; i < h.replicas; i++ {
                //获取哈希值
                key := addr + strconv.Itoa(i) //虚拟节点的哈希计算:IP地址 + 数字
                slot := h.hash([]byte(key))   //计算目标槽位
                //保存所有的key
                h.nodes = append(h.nodes, slot)
                //保存节点哈希值和地址的映射
                h.kv[slot] = addr
            }
        }
        //对虚拟节点的哈希值排序以方便二分查找
        sort.Sort(h.nodes)
    }
    
    //Get 根据给定key获取最靠近它节点的addr
    func (h *ConsistentHash) Get(key string) string {
        //判断节点是否为空
        if h.IsEmpty() {
            return ""
        }
        //获取哈希值
        slot := h.hash([]byte(key))
        //二分法查找最优节点,获取第一个节点,当前哈希值大于目标哈希值的为最优节点
        index := sort.Search(len(h.nodes), func(i int) bool {
            return h.nodes[i] >= slot
        })
        //判断目标索引是否为最大索引
        if index == len(h.nodes) {
            index = 0
        }
    
        k := h.nodes[index]
        addr := h.kv[k]
        return addr
    }
    

    测试

    $ vim ./test/fn_test.go
    
    package test
    
    import (
        "gfw/web"
        "testing"
    )
    
    func TestCache(t *testing.T) {
        ins := web.NewConsistentHash(4, nil)
        ins.Add("127.0.0.1:9091", "127.0.0.1:9092", "127.0.0.1:9093", "127.0.0.1:9094")
        addr := ins.Get("uid:15")
        t.Log(addr)
    }
    
    $ go test -v -run TestCache fn_test.go
    === RUN   TestCache
        fn_test.go:12: 127.0.0.1:9094
    --- PASS: TestCache (0.00s)
    PASS
    ok      command-line-arguments  0.322s
    

    实现2

    • 针对多缓存服务器应用场景,当缓存服务器增减变动时,希望缓存服务器中的数据尽可能被使用到。
    • 需要做好负载均衡,不能因为添加或删除缓存节点造成部分节点负载过重。
    $ vim ./web/consistent_hash.go
    
    package web
    
    import (
        "hash/crc32"
        "sort"
        "strconv"
        "sync"
    )
    
    //HashRing 哈希环 0到2^32-1个槽位 顺时针
    type HashRing []uint32
    
    func (r HashRing) Len() int           { return len(r) }
    func (r HashRing) Less(i, j int) bool { return r[i] < r[j] }
    func (r HashRing) Swap(i, j int)      { r[i], r[j] = r[j], r[i] }
    
    //RingNode 哈希环节点
    type RingNode struct {
        Id     int    //节点编号
        Host   string //节点主机地址
        Port   int    //节点端口号
        Name   string //节点名称
        Weight int    //权重值
    }
    
    //NewRingNode 创建哈希环节点
    func NewRingNode(id int, host string, port int, name string, weight int) *RingNode {
        return &RingNode{Id: id, Host: host, Port: port, Name: name, Weight: weight}
    }
    
    //HashFunc 哈希算法
    type HashFunc func([]byte) uint32
    
    //默认虚拟节点复制数量
    const DEFAULT_REPLICAS = 160
    
    //ConsistentHash 一致性哈希
    type ConsistentHash struct {
        Replicas     int                  //虚拟节点复制数量
        Ring         HashRing             //哈希环 顺时针查找
        Nodes        map[uint32]*RingNode //节点哈希散列值与节点地址的映射
        Resources    map[int]bool         //已存在节点
        sync.RWMutex                      //读写锁
        hash         HashFunc             //哈希算法
    }
    
    //NewConstentHash 创建一致性哈希
    func NewConsistentHash(replicas int, fn HashFunc) *ConsistentHash {
        if replicas == 0 {
            replicas = DEFAULT_REPLICAS
        }
        if fn == nil {
            fn = crc32.ChecksumIEEE
        }
        return &ConsistentHash{
            Replicas:  replicas,
            Ring:      HashRing{},
            Nodes:     make(map[uint32]*RingNode),
            Resources: make(map[int]bool),
            hash:      fn,
        }
    }
    
    //sortRing 哈希环重新排序
    func (c *ConsistentHash) sortRing() {
        c.Ring = HashRing{}
        for key := range c.Nodes {
            c.Ring = append(c.Ring, key)
        }
        sort.Sort(c.Ring)
    }
    
    //value 获取节点唯一哈希值
    func (c *ConsistentHash) value(i int, node *RingNode) uint32 {
        name := node.Host + "*" + strconv.Itoa(node.Weight) + "-" + strconv.Itoa(i) + "-" + strconv.Itoa(node.Id)
        return c.hash([]byte(name))
    }
    
    //Add 添加节点
    func (c *ConsistentHash) Add(node *RingNode) bool {
        c.Lock()
        defer c.Unlock()
    
        //判断节点是否已经存在
        if _, ok := c.Resources[node.Id]; ok {
            return false
        }
        //设置节点
        for i := 0; i < c.Replicas*node.Weight; i++ {
            hash := c.value(i, node)
            c.Nodes[hash] = node
        }
        c.sortRing()
    
        c.Resources[node.Id] = true
    
        return true
    }
    
    //Remove 删除节点
    func (c *ConsistentHash) Remove(node *RingNode) {
        //加锁
        c.Lock()
        defer c.Unlock()
        //判断节点是否存在
        if _, ok := c.Resources[node.Id]; !ok {
            return
        }
        //删除节点
        delete(c.Resources, node.Id)
        for i := 0; i < c.Replicas*node.Weight; i++ {
            hash := c.value(i, node)
            delete(c.Nodes, hash)
        }
        //重新排序
        c.sortRing()
    }
    
    //Get 从节点中获取目标
    func (c *ConsistentHash) Get(key string) *RingNode {
        //加读锁
        c.RLock()
        defer c.RUnlock()
        //计算哈希值
        hash := c.hash([]byte(key))
        //搜索节点
        index := c.Search(hash)
        item := c.Ring[index]
        node := c.Nodes[item]
    
        return node
    }
    
    //Search 从哈希环中获取目标位置
    func (c *ConsistentHash) Search(hash uint32) int {
        index := sort.Search(len(c.Ring), func(i int) bool {
            return c.Ring[i] >= hash
        })
        last := len(c.Ring) - 1
        if index >= len(c.Ring) {
            return last
        }
        if index == last {
            return 0
        } else {
            return index
        }
    }
    

    测试用例

    $ vim ./test/fn_test.go
    
    package test
    
    import (
        "fmt"
        "gfw/web"
        "testing"
    )
    
    func TestCache(t *testing.T) {
        ch := web.NewConsistentHash(3, nil)
        for i := 0; i < 10; i++ {
            name := fmt.Sprintf("cache_addr_%d", i)
            host := fmt.Sprintf("172.15.1.%d", i)
            node := web.NewRingNode(i, host, 8080, name, 1)
            ch.Add(node)
        }
        /*
            for k, v := range ch.Nodes {
                t.Logf("Node: hash = %d, Id = %d, Host = %s", k, v.Id, v.Host)
            }
        */
        addrMap := make(map[string]int, 0)
        for i := 0; i < 100; i++ {
            key := fmt.Sprintf("key:%d", i)
            node := ch.Get(key)
            if _, ok := addrMap[node.Host]; ok {
                addrMap[node.Host] += 1
            } else {
                addrMap[node.Host] = 1
            }
        }
    
        for k, v := range addrMap {
            t.Logf("Host = %s, Count = %d", k, v)
        }
    }
    

    命令行测试

    $ go test -v -run TestCache fn_test.go
    === RUN   TestCache
        fn_test.go:34: Host = 172.15.1.7, Count = 10
        fn_test.go:34: Host = 172.15.1.0, Count = 8
        fn_test.go:34: Host = 172.15.1.6, Count = 7
        fn_test.go:34: Host = 172.15.1.1, Count = 10
        fn_test.go:34: Host = 172.15.1.4, Count = 15
        fn_test.go:34: Host = 172.15.1.3, Count = 16
        fn_test.go:34: Host = 172.15.1.5, Count = 5
        fn_test.go:34: Host = 172.15.1.8, Count = 11
        fn_test.go:34: Host = 172.15.1.2, Count = 15
        fn_test.go:34: Host = 172.15.1.9, Count = 3
    --- PASS: TestCache (0.00s)
    PASS
    ok      command-line-arguments  0.311s
    

    实现3

    • 由于哈希环具有2^32个槽位,因此使用uint32类型来表示哈希槽位。
    //Slot 哈希环槽位
    type Slots []uint32
    
    • 为了能够使用sort.Sort排序函数对哈希环中虚拟节点的槽位进行排序,因此自定义一个[]uint32类型同时实现sort.Sort函数约定的参数接口。
    //实现sort.Sort排序接口,完成顺时针查找。
    func (s Slots) Len() int           { return len(s) }
    func (s Slots) Less(i, j int) bool { return s[i] < s[j] }
    func (s Slots) Swap(i, j int)      { s[i], s[j] = s[j], s[i] }
    

    实现一致性哈希算法的数据结构

    • nodes 使用一个map保存所有虚拟节点和哈希操作的映射关系,mapkey是哈希槽位的uint32类型编号,value是虚拟节点的主机地址或端口。
    • slots 使用一个slice切片按从大到小的顺序保存所有虚拟节点映射到的哈希槽位
    //一致性哈希
    type Hash struct {
        replicas int               //分布式集群节点服务器在哈希环上创建虚拟节点的数量
        slots    Slots             //虚拟节点对应槽位切片
        nodes    map[uint32]string //虚拟节点槽位与主机地址的映射关系
        sync.RWMutex                   //读写锁
    }
    func NewHash(replicas int) *Hash {
        return &Hash{replicas: replicas, slots: Slots{}, nodes: make(map[uint32]string, 0)}
    }
    
    • 默认选择crc32算法作为哈希散列函数以生成uint32散列值
    //hash 散列函数 使用crc32算法计算生成哈希散列值
    func (h *Hash) hash(key string) uint32 {
        return crc32.ChecksumIEEE([]byte(key))
    }
    

    当计算出任意一个key的散列值后,在slots切片中通过二分查找检索出大于等于散列值的最小值以确定虚拟节点位置,然后再通过槽位从nodes中检索到虚拟节点对应的主机地址或端口。

    // sort 将虚拟节点映射到哈希槽位排序,排序后保存到切片。
    func (h *Hash) sort() {
        //将虚拟节点映射到操作
        slots := h.slots[:]
        for slot := range h.nodes {
            slots = append(slots, slot)
        }
        //排序后重新保存
        sort.Sort(slots)
        h.slots = slots
    }
    
    • 为分布式集群哈希环中添加单个节点
    //Add 分布式集群哈希环中新增节点服务器
    func (h *Hash) Add(addr string) {
        //加锁
        h.Lock()
        defer h.Unlock()
        //根据复制因子定义的数量为单台服务器生成多个虚拟节点
        for i := 0; i < h.replicas; i++ {
            //生成唯一哈希槽位标识
            name := fmt.Sprintf("%s%d", addr, i)
            slot := h.hash(name)
            //设置槽位与节点映射关系
            h.nodes[slot] = addr
        }
        //对槽位顺时针排序
        h.sort()
    }
    
    • 为分布式集群哈希环中批量添加多个服务器节点
    //AddAll 批量添加
    func (h *Hash) AddAll(addrs ...string) {
        for _, addr := range addrs {
            h.Add(addr)
        }
    }
    
    • 删除单个节点
    //Delete 删除节点
    func (h *Hash) Delete(addr string) {
        //加锁
        h.Lock()
        h.Unlock()
        //删除虚拟节点
        for i := 0; i < h.replicas; i++ {
            name := fmt.Sprintf("%s%d", addr, i)
            slot := h.hash(name)
            delete(h.nodes, slot)
        }
        //重新排序
        h.sort()
    }
    
    • 查找用于读写某个key的节点
    //Get 二分查找法获取最近位置的节点
    func (h *Hash) Get(key string) (string, error) {
        hash := h.hash(key)
        //二分查找获取最近位置
        index := sort.Search(len(h.slots), func(i int) bool {
            return h.slots[i] >= hash
        })
        //哈希环型结构
        if index >= len(h.slots) {
            index = 0
        }
        //获取节点
        slot := h.slots[index]
        addr, ok := h.nodes[slot]
        if !ok {
            return "", errors.New("node not found")
        }
    
        return addr, nil
    }
    

    测试用例

    $ vim ./test/fn_test.go
    
    package test
    
    import (
        "gfw/web"
        "testing"
    )
    
    func TestCache(t *testing.T) {
        h := web.NewHash(3)
        h.AddAll("124.10.50.11:8080", "124.10.50.12:8080", "124.10.50.13:8080")
        addr, err := h.Get("uid:100")
        t.Log(addr, err)
    }
    

    执行测试

    $ go test -v -run TestCache fn_test.go
    === RUN   TestCache
        fn_test.go:12: 124.10.50.13:8080 <nil>
    --- PASS: TestCache (0.00s)
    PASS
    ok      command-line-arguments  0.311s
    

    实现4

    $ vim ./web/consistent_hash.go
    
    package web
    
    import (
        "fmt"
        "hash/crc32"
        "sort"
        "strconv"
    )
    
    //HashFunc 哈希算法 计算哈希环上的槽位
    type HashFunc func(data []byte) uint32
    
    //ConsistentHash 一致性哈希数据结构
    type ConsistentHash struct {
        replicas int            //虚拟节点复制倍数
        peers    []int          //哈希环 虚拟节点槽位
        hashpeer map[int]string //虚拟节点与真实节点的映射表
        hash     HashFunc       //依赖注入 哈希算法
    }
    
    //NewConsistentHash 构造函数 创建一致性哈希
    func NewConsistentHash(replicas int, fn HashFunc) *ConsistentHash {
        instance := &ConsistentHash{replicas: replicas, hash: fn, hashpeer: make(map[int]string)}
        if instance.hash == nil {
            instance.hash = crc32.ChecksumIEEE
        }
        return instance
    }
    
    //Add 添加节点服务器到哈希环
    func (h *ConsistentHash) Add(addrs ...string) {
        for _, addr := range addrs {
            h.Append(addr)
        }
        //哈希环的哈希值排序
        sort.Ints(h.peers)
    }
    
    //Append 添加节点服务器到哈希环
    func (h *ConsistentHash) Append(addr string) {
        for i := 0; i < h.replicas; i++ {
            key := strconv.Itoa(i) + addr
            hash := int(h.hash([]byte(key)))
            fmt.Printf("addr:%v peer:%v hash:%v\n", addr, key, hash)
            h.peers = append(h.peers, hash)
            h.hashpeer[hash] = addr
        }
    }
    
    //Get 获取key对应的节点
    func (h *ConsistentHash) Get(key string) string {
        //判断节点是否存在
        if len(h.peers) == 0 {
            return ""
        }
        //获取key的哈希值
        hash := int(h.hash([]byte(key)))
        //二分法获取目标节点
        index := sort.Search(len(h.peers), func(i int) bool {
            return h.peers[i] >= hash
        })
        //获取目标虚拟节点
        idx := index % len(h.peers) //对环形结构取余运算获取虚拟节点下标
        peer := h.peers[idx]
        //获取虚拟节点对应的真实节点
        addr := h.hashpeer[peer]
        fmt.Printf("key:%v hash:%v peer:%v addr:%v\n", key, hash, peer, addr)
        return addr
    }
    

    测试用例

    $ vim ./test/ins_test.go
    
    package test
    
    import (
        "gfw/web"
        "strconv"
        "testing"
    )
    
    func TestHash(t *testing.T) {
        ins := web.NewConsistentHash(3, func(key []byte) uint32 {
            i, _ := strconv.Atoi(string(key))
            return uint32(i)
        })
        ins.Add("1", "2", "3")
        ins.Get("2")
    }
    

    命令行测试

    $ go test -v -run TestHash ins_test.go
    === RUN   TestHash
    addr:1 peer:01 hash:1
    addr:1 peer:11 hash:11
    addr:1 peer:21 hash:21
    addr:2 peer:02 hash:2
    addr:2 peer:12 hash:12
    addr:2 peer:22 hash:22
    addr:3 peer:03 hash:3
    addr:3 peer:13 hash:13
    addr:3 peer:23 hash:23
    key:2 hash:2 peer:2 addr:2
    --- PASS: TestHash (0.00s)
    PASS
    ok      command-line-arguments  0.298s
    

    相关文章

      网友评论

          本文标题:Consistent Hashing

          本文链接:https://www.haomeiwen.com/subject/pymwcltx.html