美文网首页
ETCD 服务注册和发现

ETCD 服务注册和发现

作者: 一抹圆弧 | 来源:发表于2020-10-18 22:38 被阅读0次

    ETCD 服务发现

    在微服务中各个服务都是无状态的,不会在配置中写死上下游的访问地址,所以需要有一个地方去维护各个节点的信息。
    服务起来的时候会去注册中心拉取其他服务的节点信息,并且把自己的信息推送到注册中心。
    当有运行的服务下线或者出现问题的时候,把自己从配置中心摘除,或者配置中心来检测服务状态(心跳、健康检查的协议)来摘除服务。

    package main
    
    import (
        "encoding/json"
        "fmt"
        "github.com/coreos/etcd/clientv3"
        "context"
        "log"
    )
    
    type Discovery struct {
        cli       *clientv3.Client
        info      *NodeInfo
        nodes     *NodesManager
    }
    
    func NewDiscovery(info *NodeInfo, conf clientv3.Config, mgr *NodesManager) (dis *Discovery, err error) {
        d := &Discovery{}
        d.info = info
        if mgr == nil {
            return nil, fmt.Errorf("[Discovery] mgr == nil")
        }
        d.nodes = mgr
        d.cli, err = clientv3.New(conf)
        return d, err
    }
    
    func (d *Discovery) pull() {
        kv := clientv3.NewKV(d.cli)
        resp, err := kv.Get(context.TODO(), "discovery/", clientv3.WithPrefix())
        if err != nil {
            log.Fatalf("[Discovery] kv.Get err:%+v", err)
            return
        }
        for _, v := range resp.Kvs{
            node := &NodeInfo{}
            err = json.Unmarshal(v.Value, node)
            if err != nil {
                log.Fatalf("[Discovery] json.Unmarshal err:%+v", err)
                continue
            }
            d.nodes.AddNode(node)
            log.Printf("[Discovery] pull node:%+v", node)
        }
    }
    
    func (d *Discovery) watch() {
        watcher := clientv3.NewWatcher(d.cli)
        watchChan := watcher.Watch(context.TODO(), "discovery", clientv3.WithPrefix())
        for {
            select {
            case resp := <-watchChan:
                d.watchEvent(resp.Events)
            }
        }
    }
    
    func (d *Discovery) watchEvent(evs []*clientv3.Event) {
        for _, ev := range evs{
            switch ev.Type{
            case clientv3.EventTypePut:
                node := &NodeInfo{}
                err := json.Unmarshal(ev.Kv.Value, node)
                if err != nil {
                    log.Fatalf("[Discovery] json.Unmarshal err:%+v", err)
                    continue
                }
                d.nodes.AddNode(node)
                log.Printf(fmt.Sprintf("[Discovery] new node:%s",string(ev.Kv.Value)))
            case clientv3.EventTypeDelete:
                d.nodes.DelNode(string(ev.Kv.Key))
                log.Printf(fmt.Sprintf("[Discovery] del node:%s data:%s",string(ev.Kv.Key),string(ev.Kv.Value)))
            }
        }
    }
    
    
    

    服务注册

    在etcd中服务的注册使用租约(lease)来实现,设置租约的时候按需求设置租约的时间(ttl),类似redis中的EXPIRE key,再把服务自身的节点等信息写到对应的key中。然后定时去调用KeepAliveOnce来保持租约,如果在期间KeepAliveOnce的消息丢失或者延迟大于这个租约的ttl则etcd中将会把这个节点的信息删除,恢复正常时重新发起租约流程。

    package main
    
    import (
        "encoding/json"
        "fmt"
        "github.com/coreos/etcd/clientv3"
        "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
        "github.com/pkg/errors"
        "log"
        "time"
        "context"
    )
    
    const (
        _ttl = 10
    )
    
    
    type Register struct {
        cli       *clientv3.Client
        leaseId   clientv3.LeaseID
        lease     clientv3.Lease
        info      *NodeInfo
        closeChan chan error
    }
    
    func NewRegister(info *NodeInfo, conf clientv3.Config) (reg *Register, err error) {
        r := &Register{}
        r.closeChan = make(chan error)
        r.info = info
        r.cli, err = clientv3.New(conf)
        return r, err
    }
    
    
    func (r *Register) Run() {
        dur := time.Duration(time.Second)
        timer := time.NewTicker(dur)
        r.register()
        for {
            select {
            case <-timer.C:
                r.keepAlive()
            case <-r.closeChan:
                goto EXIT
            }
        }
    EXIT:
        log.Printf("[Register] Run exit...")
    }
    
    func (r *Register) Stop() {
        r.revoke()
        close(r.closeChan)
    }
    
    func (r *Register) register() (err error) {
        r.leaseId = 0
        kv := clientv3.NewKV(r.cli)
        r.lease = clientv3.NewLease(r.cli)
        leaseResp, err := r.lease.Grant(context.TODO(), _ttl)
        if err != nil {
            err = errors.Wrapf(err, "[Register] register Grant err")
            return
        }
        data, err := json.Marshal(r.info)
        _, err = kv.Put(context.TODO(), r.info.UniqueId, string(data), clientv3.WithLease(leaseResp.ID))
        if err != nil {
            err = errors.Wrapf(err, "[Register] register kv.Put err %s-%+v", r.info.Name, string(data))
            return
        }
        r.leaseId = leaseResp.ID
        return
    }
    
    func (r *Register) keepAlive() (err error) {
        _, err = r.lease.KeepAliveOnce(context.TODO(), r.leaseId)
        if err != nil {
            // 租约丢失,重新注册
            if err == rpctypes.ErrLeaseNotFound {
                r.register()
                err = nil
            }
            err = errors.Wrapf(err, "[Register] keepAlive err")
        }
        log.Printf(fmt.Sprintf("[Register] keepalive... leaseId:%+v", r.leaseId))
        return err
    }
    
    func(r *Register) revoke() (err error) {
        _, err = r.cli.Revoke(context.TODO(), r.leaseId)
        if err != nil {
            err = errors.Wrapf(err, "[Register] revoke err")
            return
        }
        log.Printf(fmt.Sprintf("[Register] revoke node:%+v", r.leaseId))
        return
    }
    
    

    节点管理

    package main
    
    import (
        "log"
        "math/rand"
        "strings"
        "sync"
    )
    
    type NodeInfo struct {
        Addr     string
        Name     string
        UniqueId string
    }
    
    type NodesManager struct {
        sync.RWMutex
        // <name,<id,node>>
        nodes map[string]map[string]*NodeInfo
    }
    
    func NewNodeManager() (m *NodesManager){
        return &NodesManager{
            nodes: map[string]map[string]*NodeInfo{},
        }
    }
    
    func (n *NodesManager) AddNode(node *NodeInfo) {
        if node == nil {
            return
        }
        n.Lock()
        defer n.Unlock()
        if _, exist := n.nodes[node.Name]; !exist {
            n.nodes[node.Name] = map[string]*NodeInfo{}
        }
        n.nodes[node.Name][node.UniqueId] = node
    }
    
    func (n *NodesManager) DelNode(id string) {
        sli := strings.Split(id,"/")
        name := sli[len(sli)-2]
        n.Lock()
        defer n.Unlock()
        if _, exist := n.nodes[name]; exist {
            delete(n.nodes[name], id)
        }
    }
    
    func (n *NodesManager) Pick(name string) *NodeInfo {
        n.RLock()
        defer n.RUnlock()
        if nodes, exist := n.nodes[name]; !exist {
            return nil
        } else {
            // 纯随机取节点
            idx := rand.Intn(len(nodes))
            for _, v := range nodes {
                if idx == 0 {
                    return v
                }
                idx--
            }
        }
        return nil
    }
    
    func (n *NodesManager) Dump () {
        for k, v := range n.nodes {
            for kk, vv := range v {
                log.Printf("[NodesManager] Name:%s Id:%s Node:%+v", k, kk, vv)
            }
        }
    
    }
    

    测试效果

    package main
    
    import (
        "github.com/coreos/etcd/clientv3"
        "log"
        "time"
    )
    
    func main() {
    
        nodes := NewNodeManager()
        dis, _ := NewDiscovery(&NodeInfo{
            Name: "server name/aaaa",
            Addr: "127.0.0.1:8888",
        }, clientv3.Config{
            Endpoints:   []string{"tx.cxc233.com:8888"},
            DialTimeout: 5 * time.Second,
        }, nodes)
    
        reg, _ := NewRegister(&NodeInfo{
            Name: "testsvr",
            Addr: "127.0.0.1:8888",
            UniqueId: "discovery/testsvr/instance_id/aaabbbccc",
        }, clientv3.Config{
            Endpoints:   []string{"tx.cxc233.com:8888"},
            DialTimeout: 5 * time.Second,
        })
    
        reg2, _ := NewRegister(&NodeInfo{
            Name: "testsvr",
            Addr: "127.0.0.1:8888",
            UniqueId: "discovery/testsvr/instance_id/testqqqqq",
        }, clientv3.Config{
            Endpoints:   []string{"tx.cxc233.com:8888"},
            DialTimeout: 5 * time.Second,
        })
    
        go reg.Run()
        time.Sleep(time.Second * 2)
        dis.pull()
        go dis.watch()
        time.Sleep(time.Second * 1)
        go reg2.Run()
        time.Sleep(time.Second * 1)
        nodes.Dump()
        log.Printf("[Main] nodes pick:%+v",nodes.Pick("testsvr"))
        time.Sleep(time.Second * 5)
    }
    
    
    2020-10-18 22:36:39.102233 I | [Register] keepalive... leaseId:7587849142468224212
    2020-10-18 22:36:40.100365 I | [Register] keepalive... leaseId:7587849142468224212
    2020-10-18 22:36:40.100365 I | [Discovery] pull node:&{Addr:127.0.0.1:8888 Name:testsvr UniqueId:discovery/testsvr/instance_id/aaabbbccc}
    2020-10-18 22:36:41.100389 I | [Register] keepalive... leaseId:7587849142468224212
    2020-10-18 22:36:41.143304 I | [Discovery] new node:{"Addr":"127.0.0.1:8888","Name":"testsvr","UniqueId":"discovery/testsvr/instance_id/testqqqqq"}
    2020-10-18 22:36:42.097662 I | [Register] keepalive... leaseId:7587849142468224212
    2020-10-18 22:36:42.100819 I | [NodesManager] Name:testsvr Id:discovery/testsvr/instance_id/aaabbbccc Node:&{Addr:127.0.0.1:8888 Name:testsvr UniqueId:discovery/testsvr/instance_id/aaabbbccc}
    2020-10-18 22:36:42.100819 I | [NodesManager] Name:testsvr Id:discovery/testsvr/instance_id/testqqqqq Node:&{Addr:127.0.0.1:8888 Name:testsvr UniqueId:discovery/testsvr/instance_id/testqqqqq}
    2020-10-18 22:36:42.100819 I | [Main] nodes pick:&{Addr:127.0.0.1:8888 Name:testsvr UniqueId:discovery/testsvr/instance_id/testqqqqq}
    2020-10-18 22:36:42.107588 I | [Register] keepalive... leaseId:7587849142468224216
    2020-10-18 22:36:43.091650 I | [Register] keepalive... leaseId:7587849142468224212
    2020-10-18 22:36:43.109634 I | [Register] keepalive... leaseId:7587849142468224216
    2020-10-18 22:36:44.093966 I | [Register] keepalive... leaseId:7587849142468224212
    2020-10-18 22:36:44.108130 I | [Register] keepalive... leaseId:7587849142468224216
    2020-10-18 22:36:45.093075 I | [Register] keepalive... leaseId:7587849142468224212
    2020-10-18 22:36:45.114071 I | [Register] keepalive... leaseId:7587849142468224216
    2020-10-18 22:36:46.107634 I | [Register] keepalive... leaseId:7587849142468224212
    2020-10-18 22:36:46.114016 I | [Register] keepalive... leaseId:7587849142468224216
    2020-10-18 22:36:47.091863 I | [Register] keepalive... leaseId:7587849142468224212
    

    相关文章

      网友评论

          本文标题:ETCD 服务注册和发现

          本文链接:https://www.haomeiwen.com/subject/vhrmyktx.html