背景:单调哈希的问题是一旦添加或者删除节点(节点挂掉),需要对所有节点上的key重新哈希并迁移数据。此间会导致服务不可用。
实节点:真正提供服务的节点。
虚拟槽:圆环上的一个位置。当有实节点占据这个位置的槽,则该槽的状态为可用。否则为不可以用状态。
原理:固定数量N个槽均匀沿圆环分布,一个key经过hash后落到对应的槽,每个实节点负责处理落到槽中的key。当节点失效时,可用的槽变为不可以用。这个时候沿着圆环顺时针找到一个能用的槽。这样子避免了单调哈希需要重新哈希的问题。
要点:
1.给实节点分配虚拟槽时,一定要避免发生碰撞。一旦发生碰撞,会导致同一个槽对应不同的实节点,此时添加实节点失败。
2.每个实节点占用的虚拟槽的数量并不是越多越好。实节点占据的虚拟槽越多,添加新节点的失败概率越大。理论上,只要哈希后key能均匀分布即可满足要求。测试用例中,50个复制数已经可以实现比较好的分布效果。
3.为什么采用crc32哈希?因为crc32对随机的key进行哈希得到值分布比较均匀。
package main
import (
"crypto/md5"
"hash/crc32"
"strconv"
"fmt"
"errors"
)
type Node struct {
id string
ip string
}
const SLOT_MAX int = 0xffff
type CHashing struct {
replicas int
count int
nodes [SLOT_MAX]*Node
}
func (s *CHashing)Init(rel int, nodes []*Node) error {
s.replicas = rel
s.count = 0
for _,nodes := range nodes {
err := s.Add(nodes)
if err!=nil {
return err
}
s.count++;
}
return nil
}
func (s *CHashing)hash_func(key string) int {
md5Chan := make(chan []byte, 1)
md5Sum := md5.Sum([]byte(key))
md5Chan <- md5Sum[:]
return int(crc32.ChecksumIEEE(<-md5Chan))%SLOT_MAX
}
func (s *CHashing)SetReplicas(repl int) {
s.replicas = repl
}
func (s *CHashing)Add(n *Node) error {
for i:=0;i<s.replicas;i++ {
slot := s.hash_func(n.id+strconv.Itoa(i*11))
// 可能会有碰撞情况
// 解决方法:id加一个新的盐,遇到碰撞则不许添加节点
if s.nodes[slot] != nil {
return errors.New("collision on allocating slot.")
}
s.nodes[slot] = n
}
s.count++;
return nil
}
func (s *CHashing)Del(n *Node) {
for i:=0;i<s.replicas;i++ {
slot := s.hash_func(n.id+strconv.Itoa(i*11))
s.nodes[slot] = nil
}
s.count--;
if(s.count<0) {
s.count=0
}
}
func (s *CHashing)Clear() {
for idx := range s.nodes {
s.nodes[idx] = nil
}
s.replicas = 0
s.count = 0
}
func (s *CHashing)Get(key string) *Node {
slot := s.hash_func(key)
if(s.nodes[slot] != nil) {
return s.nodes[slot]
}
for i:=slot+1;i<SLOT_MAX;i++ {
if(s.nodes[i]!=nil) {
return s.nodes[i]
}
}
for i:=0;i<slot;i++ {
if(s.nodes[i]!=nil) {
return s.nodes[i]
}
}
//this will never happen
return nil
}
func test(ch *CHashing, key_num int) {
fmt.Printf("%d个实节点,每个实节点使用%d个虚拟槽,%d个测试key 访问分布结果 :\n",ch.count,ch.replicas,key_num)
stat := make(map[string]int)
for i:=0;i<key_num;i++ {
nodes := ch.Get(strconv.Itoa(i))
count := stat[nodes.id]
stat[nodes.id] = count+1
}
// 统计各个key的分布情况
for k,v := range stat {
fmt.Printf("%s\t%d\n",k,v)
}
fmt.Printf("========================\n")
}
func main() {
var ch CHashing
nodes := [5]Node{
Node{"0", "192.168.0.0.0"},
Node{"1", "192.168.0.0.1"},
Node{"2", "192.168.0.0.2"},
Node{"3", "192.168.0.0.3"},
Node{"4", "192.168.0.0.4"},
}
ch.SetReplicas(2)
// 先测试2个实节点,2个虚拟槽的情况
err := ch.Add(&nodes[0])
if err!=nil {
fmt.Println("add nodes fails:",err)
return
}
err = ch.Add(&nodes[1])
if err!=nil {
fmt.Println("add nodes fails:",err)
return
}
test(&ch, SLOT_MAX)
test(&ch, SLOT_MAX*2)
test(&ch, SLOT_MAX*3)
// 先测试3个实节点,2个虚拟槽的情况
err = ch.Add(&nodes[2])
if err!=nil {
fmt.Println("add nodes fails:",err)
return
}
test(&ch, SLOT_MAX)
test(&ch, SLOT_MAX*2)
test(&ch, SLOT_MAX*3)
// 先测试4个实节点,2个虚拟槽的情况
err = ch.Add(&nodes[3])
if err!=nil {
fmt.Println("add nodes fails:",err)
return
}
test(&ch, SLOT_MAX)
test(&ch, SLOT_MAX*2)
test(&ch, SLOT_MAX*3)
// 2个实节点,50个虚拟槽的情况
ch.Clear()
ch.SetReplicas(50)
err = ch.Add(&nodes[0])
if err!=nil {
fmt.Println("add nodes fails:",err)
return
}
err = ch.Add(&nodes[1])
if err!=nil {
fmt.Println("add nodes fails:",err)
return
}
test(&ch, SLOT_MAX)
test(&ch, SLOT_MAX*2)
test(&ch, SLOT_MAX*3)
// 3个实节点,50个虚拟槽的情况
err = ch.Add(&nodes[2])
if err!=nil {
fmt.Println("add nodes fails:",err)
return
}
test(&ch, SLOT_MAX)
test(&ch, SLOT_MAX*2)
test(&ch, SLOT_MAX*3)
// 4个实节点,50个虚拟槽的情况
err = ch.Add(&nodes[3])
if err!=nil {
fmt.Println("add nodes fails:",err)
return
}
test(&ch, SLOT_MAX)
test(&ch, SLOT_MAX*2)
test(&ch, SLOT_MAX*3)
}
网友评论