美文网首页
​Etcd源码剖析(一)

​Etcd源码剖析(一)

作者: 神奇的考拉 | 来源:发表于2018-08-23 19:43 被阅读0次

    简介

    etcd是一个高可用的分布式键值(key-value)数据库。etcd内部采用raft协议作为一致性算法,etcd基于Go语言实现。
    etcd是一个服务发现系统,具备以下的特点:
    简单:安装配置简单,而且提供了HTTP API进行交互,使用也很简单
    安全:支持SSL证书验证
    快速:根据官方提供的benchmark数据,单实例支持每秒2k+读操作
    可靠:采用raft算法,实现分布式系统数据的可用性和一致性

    一、store 存储

    1、接口定义 Store

    type Store interface {
       Version() int  // 记录版本
      Index() uint64 // 唯一id
    
      Get(nodePath string, recursive, sorted bool) (*Event, error)  // 查询
      Set(nodePath string, dir bool, value string, expireOpts TTLOptionSet) (*Event, error) // 设置
      Update(nodePath string, newValue string, expireOpts TTLOptionSet) (*Event, error) // 更新
       Create(nodePath string, dir bool, value string, unique bool,
          expireOpts TTLOptionSet) (*Event, error) // 创建
       CompareAndSwap(nodePath string, prevValue string, prevIndex uint64,
          value string, expireOpts TTLOptionSet) (*Event, error)  // 交换
       Delete(nodePath string, dir, recursive bool) (*Event, error)  // 删除
       CompareAndDelete(nodePath string, prevValue string, prevIndex uint64) (*Event, error) // 比较删除
    
       Watch(prefix string, recursive, stream bool, sinceIndex uint64) (Watcher, error) // watcher
    
       Save() ([]byte, error)  // 保存
       Recovery(state []byte) error  // 恢复
    
       Clone() Store   // 备份
       SaveNoCopy() ([]byte, error) // 保存
    
       JsonStats() []byte   // 统计
       DeleteExpiredKeys(cutoff time.Time)  // 删除失效期的key
    
      HasTTLKeys() bool  // 是否有ttl的key
    
    }
    

    在Store里面定义了etcd涉及存储方面的操作行为: 创建、修改、删除、查询以及额外的node恢复、有效期、watch机制等

    2、TTL接口定义: TTLOptionSet

    type TTLOptionSet struct {
        ExpireTime time.Time  // key的有效期
       Refresh    bool       // 刷新
    }
    

    用于指定key的ttl

    3、node的存储具体类

    type store struct {
        Root           *node            // 根节点
       WatcherHub     *watcherHub      // 关于node的所有key的watcher
       CurrentIndex   uint64           // 对应存储内容的index
       Stats          *Stats           // 
       CurrentVersion int              // 最新数据的版本
       ttlKeyHeap     *ttlKeyHeap      // 用于数据恢复的(需手动操作)
       worldLock      sync.RWMutex     // 停止当前存储的world锁
       clock          clockwork.Clock    // 
       readonlySet    types.Set       //  只读操作
    }
    

    etcd中真正的存储类结构

    4、创建Store 基于指定的namespace创建对应的目录

    func New(namespaces ...string) Store {
       s := newStore(namespaces...)         // 
       s.clock = clockwork.NewRealClock()         // 存储时间
       return s
    }
    

    该方法定义新建Store的工厂类

    以下方法具体创建Store的方法

    func newStore(namespaces ...string) *store {
       s := new(store)                        // 新增store实例
      s.CurrentVersion = defaultVersion       // 指定其当前版本   
    
      s.Root = newDir(s, "/", s.CurrentIndex, nil, Permanent) //创建其在etcd中对应的目录,第一个目录是以(/)
    
    for _, namespace := range namespaces {  // 循环迭代namespace,一个namespace对应一个目录
          s.Root.Add(newDir(s, namespace, s.CurrentIndex, s.Root, Permanent))
       }
       s.Stats = newStats()       // 累计值
       s.WatcherHub = newWatchHub(1000)  // 新建关于store的water数
       s.ttlKeyHeap = newTtlKeyHeap()    // 新建key heap
       s.readonlySet = types.NewUnsafeSet(append(namespaces, "/")...) //
       return s
    }
    

    5、store的具体操作

    // 当前store的版本.
    func (s *store) Version() int {
       return s.CurrentVersion
    }
    // 当前store的index.
    // 通过使用store的world lock【读锁】锁定当前store的 防止读取当前store的index出现数据安全问题
    func (s *store) Index() uint64 {
       s.worldLock.RLock() 
       defer s.worldLock.RUnlock()
       return s.CurrentIndex
    }
    
    Get操作

    // 当recursive=true时,即将获取指定的node下面所有的内容 否则只获取当前node内容(不包括子node内容)
    // 当stored=true,将按照key的自然排序输出node的内容

    func (s *store) Get(nodePath string, recursive, sorted bool) (*Event, error) {
       var err *etcdErr.Error  // 定义etcd的 error
       // get操作时为了防止内容读取过程中出现变更 通过使用读取锁store的world lock,来锁定当前store
      s.worldLock.RLock()  
       defer s.worldLock.RUnlock()
    defer func() {
    // 读取成功的操作
    if err == nil {
    // 变更stats的内容  增加成次数
             s.Stats.Inc(GetSuccess)
             if recursive {  // 若是recurise=true
                reportReadSuccess(GetRecursive)
             } else {
                reportReadSuccess(Get)
             }
             return
          }
       // 读取失败
    
        // 增长fail次数
    
          s.Stats.Inc(GetFail)
          if recursive {
             reportReadFailure(GetRecursive)
          } else {
             reportReadFailure(Get)
          }
       }()
       // 
       n, err := s.internalGet(nodePath)
       if err != nil {
          return nil, err
       }
       // 若是get操作成功 则需返回一个event
       e := newEvent(Get, nodePath, n.ModifiedIndex, n.CreatedIndex)
       e.EtcdIndex = s.CurrentIndex
       e.Node.loadInternalNode(n, recursive, sorted, s.clock)
    
       return e, nil
    }
    

    // 辅助类: 获取指定的nodepath对应的node

    func (s *store) internalGet(nodePath string) (*node, *etcdErr.Error) {
     // 得到当前node的文件path形式内容
    
    nodePath = path.Clean(path.Join("/", nodePath))
       // 根据指定node获取指定name是否在当前的node的子node中存在
       walkFunc := func(parent *node, name string) (*node, *etcdErr.Error) {
          // 当前给定的父node类型不为directory 是不能够进行添加子node
          if !parent.IsDir() {
             err := etcdErr.NewError(etcdErr.EcodeNotDir, parent.Path, s.CurrentIndex)
             return nil, err
          }
       // 当前给定的父node=directory 则判断当前父node下面的子node是否存在对应符合要求的node
    
    // 存在则直接返回对应的node和nil【没有error出现】
         child, ok := parent.Children[name]
          if ok {
             return child, nil
          }
          // 当不存在对应的node时 直接返回对应的error: Ecode key not found
          return nil, etcdErr.NewError(etcdErr.EcodeKeyNotFound, path.Join(parent.Path, name), s.CurrentIndex)
       }
       // 若是key-value,则直接返回对应的root对应的node
    
    // 若是directory,则需要迭代node path中的每个node 直到找到最后一个node  
        f, err := s.walk(nodePath, walkFunc)
    
       if err != nil {  // 出现error 只需要返回对应的error内容 无合适node返回
          return nil, err
       }
       return f, nil // 返回符合要求的node path对应的最后一个node的内容 即为查询所需的内容
    }
    

    // 辅助方法:walk 遍历所有nodepath并在每个directory上应用walkfunc

    func (s *store) walk(nodePath string, walkFunc func(prev *node, component string) (*node, *etcdErr.Error)) (*node, *etcdErr.Error) {
    
      // 拆分指定node path
    
       components := strings.Split(nodePath, "/")
       // 当前store的对应root
       curr := s.Root
       var err *etcdErr.Error
       // 遍历node path
       for i := 1; i < len(components); i++ {
          if len(components[i]) == 0 { // 忽略空字符串 代表当前的nodepath只有root目录 不含有子node
             return curr, nil
          }
          // 迭代获取node path中最后一个node
          curr, err = walkFunc(curr, components[i])
          if err != nil {
             return nil, err
          }
       }
    
       return curr, nil
    }
    
    创建操作
    // 在node path新增node,同时创建出来的node默认是没有ttl
    // 若是node已经存在node path中,则创建失败 返回error
    // 若是node path中的任意一个node是file 则创建失败 返回error
    func (s *store) Create(nodePath string, dir bool, value string, unique bool, expireOpts TTLOptionSet) (*Event, error) {
       var err *etcdErr.Error
        // 该操作属于安全的
       s.worldLock.Lock()
       defer s.worldLock.Unlock()
    
       defer func() {
          if err == nil {  // 创建成功
             s.Stats.Inc(CreateSuccess) // 变更stats 记录create success记录
             reportWriteSuccess(Create) // 记录写成功
             return
          }
    
          s.Stats.Inc(CreateFail)  // 失败 变更stats中的create fail
          reportWriteFailure(Create) // 记录写失败
       }()
    
       e, err := s.internalCreate(nodePath, dir, value, unique, false, expireOpts.ExpireTime, Create)
       if err != nil {
          return nil, err
       }
    
       e.EtcdIndex = s.CurrentIndex
       s.WatcherHub.notify(e)
    
       return e, nil
    }
    

    // 辅助方法:内部执行create

    func (s *store) internalCreate(nodePath string, dir bool, value string, unique, replace bool,
       expireTime time.Time, action string) (*Event, *etcdErr.Error) {
        // 获取store当前的index及下一个新的index【默认在当前index+1】
       currIndex, nextIndex := s.CurrentIndex, s.CurrentIndex+1
    
        // 当unique=true 在当前node path追加唯一项
       if unique { // append unique item under the node path
          nodePath += "/" + fmt.Sprintf("%020s", strconv.FormatUint(nextIndex, 10))
       }
    
       nodePath = path.Clean(path.Join("/", nodePath))
    
    
        // 防止用户改变"/"
       if s.readonlySet.Contains(nodePath) {
          return nil, etcdErr.NewError(etcdErr.EcodeRootROnly, "/", currIndex)
       }
    
       // Assume expire times that are way in the past are
       // This can occur when the time is serialized to JS
    
       if expireTime.Before(minExpireTime) {
          expireTime = Permanent
       }
       dirName, nodeName := path.Split(nodePath)
       // 遍历nodePath,创建dirs并获得最后一个目录节点
       d, err := s.walk(dirName, s.checkDir)
    
      // 创建dir失败 
       if err != nil {
          s.Stats.Inc(SetFail)
          reportWriteFailure(action)
          err.Index = currIndex
          return nil, err
       }
    
       e := newEvent(action, nodePath, nextIndex, nextIndex)
       eNode := e.Node
    
       n, _ := d.GetChild(nodeName)
    
       // force will try to replace an existing file
       if n != nil {
          if replace {
             if n.IsDir() {
                return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, currIndex)
             }
             e.PrevNode = n.Repr(false, false, s.clock)
    
             n.Remove(false, false, nil)
          } else {
             return nil, etcdErr.NewError(etcdErr.EcodeNodeExist, nodePath, currIndex)
          }
       }
    
       if !dir { // create file
          // copy the value for safety
          valueCopy := value
          eNode.Value = &valueCopy
    
          n = newKV(s, nodePath, value, nextIndex, d, expireTime)
    
       } else { // create directory
          eNode.Dir = true
    
          n = newDir(s, nodePath, nextIndex, d, expireTime)
       }
    
       // we are sure d is a directory and does not have the children with name n.Name
       d.Add(n)
    
       // node with TTL
       if !n.IsPermanent() {
          s.ttlKeyHeap.push(n)
    
          eNode.Expiration, eNode.TTL = n.expirationAndTTL(s.clock)
       }
    
       s.CurrentIndex = nextIndex
    
       return e, nil
    }
    

    本次未完待续...

    相关文章

      网友评论

          本文标题:​Etcd源码剖析(一)

          本文链接:https://www.haomeiwen.com/subject/dzgmiftx.html