美文网首页GolangGolang语言社区
golang 操作etcd租约以及监听kv变化

golang 操作etcd租约以及监听kv变化

作者: 我的饭卡呢 | 来源:发表于2018-11-17 16:17 被阅读5次

定义错误常量:

const (
    NewLeaseErr  = 101
    LeasTtlErr   = 102
    KeepAliveErr = 103
    PutErr       = 104
    GetErr       = 105
    RevokeErr    = 106
)

创建client:

var conf = clientv3.Config{
        Endpoints:   []string{"172.16.196.129:2380", "192.168.50.250:2380"},
        DialTimeout: 5 * time.Second,
}
client, err := clientv3.New(conf)
defer client.Close()
if err != nil {
    fmt.Printf("创建client失败:\n", err.Error())
    os.Exit(NewLeaseErr)
}

创建租约:

//创建租约
lease := clientv3.NewLease(client)

//设置租约时间
leaseResp, err := lease.Grant(context.TODO(), 10)
if err != nil {
    fmt.Printf("设置租约时间失败:%s\n", err.Error())
    os.Exit(LeasTtlErr)
}

设置续租:

//设置续租
leaseID := leaseResp.ID
ctx, cancelFunc := context.WithCancel(context.TODO())
leaseRespChan, err := lease.KeepAlive(ctx, leaseID)
if err != nil {
    fmt.Printf("续租失败:%s\n", err.Error())
    os.Exit(KeepAliveErr)
}

监听租约:

go func() {
        for  {
            select {
            case leaseKeepResp := <-leaseRespChan:
                if leaseKeepResp == nil {
                    fmt.Printf("已经关闭续租功能\n")
                    return
                } else {
                    fmt.Printf("续租成功\n")
                    goto END
                }
            }
            END:
                time.Sleep(500*time.Millisecond)
        }
    }()

监听某个key的变化

//ctx1, _ := context.WithTimeout(context.TODO(),20)  //设置超时ctx传入Watch里会使watch监听失败,可能是watch是个永久监听,不支持设置timeCtx(我也不太清楚)
go func() {
    wc := client.Watch(context.TODO(), "/job/v3/1", clientv3.WithPrevKV())
    for v := range wc {
        for _, e := range v.Events {
        fmt.Printf("type:%v kv:%v  prevKey:%v \n ", e.Type, string(e.Kv.Key), e.PrevKv)
    }
    }
}()

put操作:

kv := clientv3.NewKV(client)
time.Sleep(3*time.Second)
//通过租约put
putResp, err := kv.Put(context.TODO(), "/job/v3/1", "koock",clientv3.WithLease(leaseID))
if err != nil {
    fmt.Printf("put 失败:%s", err.Error())
    os.Exit(PutErr)
}
fmt.Printf("%v\n",putResp.Header)

取消续租以及撤销租约:

撤销租约会使当前租约的所关联的key-value失效

 //关闭续租
cancelFunc() 
//撤销租约
_, err = lease.Revoke(context.TODO(), leaseID)
    if err != nil {
        fmt.Printf("撤销租约失败:%s\n",err.Error())
        os.Exit(RevokeErr)
    }
fmt.Printf("撤销租约成功")

完整代码段:

package main

import (
    "context"
    "fmt"
    "go.etcd.io/etcd/clientv3"
    "os"
    "time"
)

const (
    NewLeaseErr  = 101
    LeasTtlErr   = 102
    KeepAliveErr = 103
    PutErr       = 104
    GetErr       = 105
    RevokeErr    = 106
)

func main() {

    var conf = clientv3.Config{
        Endpoints:   []string{"172.16.196.129:2380", "192.168.50.250:2380"},
        DialTimeout: 5 * time.Second,
    }

    client, err := clientv3.New(conf)
    defer client.Close()
    if err != nil {
        fmt.Printf("创建client失败:\n", err.Error())
        os.Exit(NewLeaseErr)
    }

    //创建租约
    lease := clientv3.NewLease(client)

    //设置租约时间
    leaseResp, err := lease.Grant(context.TODO(), 10)
    if err != nil {
        fmt.Printf("设置租约时间失败:%s\n", err.Error())
        os.Exit(LeasTtlErr)
    }

    //设置续租
    leaseID := leaseResp.ID
    ctx, cancelFunc := context.WithCancel(context.TODO())
    leaseRespChan, err := lease.KeepAlive(ctx, leaseID)
    if err != nil {
        fmt.Printf("续租失败:%s\n", err.Error())
        os.Exit(KeepAliveErr)
    }

    //监听租约
    go func() {
        for  {
            select {
            case leaseKeepResp := <-leaseRespChan:
                if leaseKeepResp == nil {
                    fmt.Printf("已经关闭续租功能\n")
                    return
                } else {
                    fmt.Printf("续租成功\n")
                    goto END
                }
            }
            END:
                time.Sleep(500*time.Millisecond)
        }

    }()

    //监听某个key的变化
    //ctx1, _ := context.WithTimeout(context.TODO(),20)
    go func() {
        wc := client.Watch(context.TODO(), "/job/v3/1", clientv3.WithPrevKV())
            for v := range wc {
                for _, e := range v.Events {
                    fmt.Printf("type:%v kv:%v  prevKey:%v \n ", e.Type, string(e.Kv.Key), e.PrevKv)
                }
            }
    }()

    kv := clientv3.NewKV(client)
    //通过租约put
    putResp, err := kv.Put(context.TODO(), "/job/v3/1", "koock",clientv3.WithLease(leaseID))
    if err != nil {
        fmt.Printf("put 失败:%s", err.Error())
        os.Exit(PutErr)
    }
    fmt.Printf("%v\n",putResp.Header)

    cancelFunc()

    time.Sleep(2*time.Second)
    _, err = lease.Revoke(context.TODO(), leaseID)
    if err != nil {
        fmt.Printf("撤销租约失败:%s\n",err.Error())
        os.Exit(RevokeErr)
    }
    fmt.Printf("撤销租约成功")
    getResp,err := kv.Get(context.TODO(),"/job/v3/1")
    if err != nil{
        fmt.Printf("get 失败:%s",err.Error())
        os.Exit(GetErr)
    }
    fmt.Printf("%v",getResp.Kvs)
    time.Sleep(20 * time.Second)


}

在put操作时,watch会监听到操作,打印出:type:PUT kv:/job/v3/1 prevKey:<nil>
撤销租约后,watch会监听到delete操作,打印出:type:DELETE kv:/job/v3/1 prevKey:key:"/job/v3/1"

相关文章

  • golang 操作etcd租约以及监听kv变化

    定义错误常量: 创建client: 创建租约: 设置续租: 监听租约: 监听某个key的变化 put操作: 取消续...

  • 转载:深入浅出etcd之raft实现

    转载:深入浅出etcd之raft实现 导语 etcd是coreOS使用golang开发的分布式,一致性的kv存储系...

  • Etcd 多节点安装实践

    Etcd主要功能 键值写入与读取。 过期时间。 观察者。 租约。 集群管理相关操作。 维护操作。 用户及权限管理。...

  • golang操作etcd

    windows 安装 etcdhttps://www.jianshu.com/p/2cc988ea2191[htt...

  • golang对etcd的简单操作

    首先获取clientv3: 连接etcd: kv是一个用于操作kv的连接,其实它本质上是用了client的conn...

  • docker etcd 环境搭建

    etcd 是用 golang 编写,raft 协议实现的分布式高可靠的 kv 存储系统,常用来作为配置共享和服务注...

  • etcd事件监听

    etcd中的数据变化触发相关的事件,事件监听分为一次性监听(watch)和永久监听(stream)。 docker...

  • Golang简单操作etcd

    go.mod文件内容 main.go文件内容 程序输出如下

  • etcd本地集群部署

    准备工作 golang环境 etcd地址:https://github.com/etcd-io/etcd gore...

  • 一次etcd踩坑记

    最近项目中部分服务迁移到go,为了方便监控服务,采用了etcd。启动go服务的时候,去etcd获取一个租约,然后每...

网友评论

    本文标题:golang 操作etcd租约以及监听kv变化

    本文链接:https://www.haomeiwen.com/subject/sgfifqtx.html