consul

作者: auguszou | 来源:发表于2017-11-09 16:29 被阅读0次

    consulgolang开发的一个高可用的分布式服务注册系统,有service discoverykey/value,健康检查, 节点选举,多数据中心等功能,与zookeeperetcd等相似。微服务框架go-micro默认使用consul作服务发现。

    安装

    go get github.com/hashicorp/consul
    

    运行

    安装好consul后, 我们需要运行agentagent可以以server mode或者client mode方式运行,通常一个数据中心需要3个或者5个server mode, 其他的运行client modeclient agent是一个轻量级的进程,完成健康检查和转发请求到相应的server agent, 可参考consul cluster

    consul agent -dev
    # or
    consul agent -dev -advertise=127.0.0.1
    

    服务运行后, 我们可以在浏览器中打开consul的控制界面了,访问http://localhost:8500,界面如下图:

    consul web ui

    服务发现

    consul 提供服务注册和服务查询的功能。服务指定的我们提供某个需求实现的接口,在consul中,我们的服务看起来如下所示:

    {
      "service": {
        "name": "redis",
        "tags": ["primary"],
        "address": "",
        "port": 8000,
        "enable_tag_override": false,
        "checks": [
          {
            "script": "/usr/local/bin/check_redis.py",
            "interval": "10s"
          }
        ]
      }
    }
    

    上面就算是定义了一个服务name=redis, 我们给出了该服务的tags, 以及服务的addressportchecks指定我们对该服务需要做定时健康检查。

    consul服务注册有两种方式:服务定义和HTTP API调用。通常我们使用服务定义,下面我们说明如何来注册服务。

    • 服务定义
      保存下面文件到/etc/consul.d/web.json
    {
      "service": {
        "name": "web",
        "tags": ["rails"],
        "address": "127.0.0.1",
        "port": 8000
      }
    }
    

    启动consul agent

    consul agent -dev -advertise=127.0.0.1 -config-dir=/etc/consul.d
    

    consul控制界面,可以发现已经成功注册了我们的web服务,如下图所示:

    consul web ui
    • HTTP API
    package main
    
    import (
        "fmt"
        consulapi "github.com/hashicorp/consul/api"
        "log"
        "time"
    )
    
    // start consul
    // consul agent -dev -enable-script-checks --advertise=127.0.0.1
    
    // to listen localhost:8080
    // nc -lp 8000
    
    // consul web
    // http://127.0.0.1:8500/ui
    
    const Id = "1234567890"
    
    func testRegister() {
    
        fmt.Println("test begin .")
        config := consulapi.DefaultConfig()
        //config.Address = "localhost"
        fmt.Println("defautl config : ", config)
        client, err := consulapi.NewClient(config)
        if err != nil {
            log.Fatal("consul client error : ", err)
        }
        //创建一个新服务。
        registration := new(consulapi.AgentServiceRegistration)
        registration.ID = Id
        registration.Name = "web"
        registration.Port = 8000
        registration.Tags = []string{"rails"}
        registration.Address = "127.0.0.1"
    
        //  //增加check。
        //  check := new(consulapi.AgentServiceCheck)
        //  check.HTTP = fmt.Sprintf("http://%s:%d%s", registration.Address, registration.Port, "/check")
        //  //设置超时 5s。
        //  check.Timeout = "5s"
        //  //设置间隔 5s。
        //  check.Interval = "5s"
        //  //注册check服务。
        //  registration.Check = check
        //  log.Println("get check.HTTP:", check)
        //
        //  err = client.Agent().ServiceRegister(registration)
        //
        //  if err != nil {
        //      log.Fatal("register server error : ", err)
        //  }
    
        //增加check。
        check := new(consulapi.AgentServiceCheck)
    
        check.Args = []string{"sh", "-c", "sleep 1 && exit 0"}
        //设置超时 5s。
        check.Timeout = "5s"
        //设置间隔 5s。
        check.Interval = "5s"
        //注册check服务。
        registration.Check = check
    
        err = client.Agent().ServiceRegister(registration)
    
        if err != nil {
            log.Fatal("register server error : ", err)
        }
    }
    
    func testDeregister() {
        fmt.Println("test begin .")
        config := consulapi.DefaultConfig()
        //config.Address = "localhost"
        fmt.Println("defautl config : ", config)
        client, err := consulapi.NewClient(config)
        if err != nil {
            log.Fatal("consul client error : ", err)
        }
    
        err = client.Agent().ServiceDeregister(Id)
        if err != nil {
            log.Fatal("register server error : ", err)
        }
    }
    
    func main() {
        log.Println("ready to register service")
        testRegister()
    
        time.Sleep(1 * time.Minute)
    
        log.Println("ready to deregister service")
        testDeregister()
    }
    

    服务查询, 服务消费者为了或者某个名字的服务提供者的具体信息(服务地址,端口等等),需要从consul agent进行查询。

    • 域名查询 dig @127.0.0.1 -p 8600 web.service.consul
      结果如下所示:
      dig dns query
      另外还可以通过命令dig @127.0.0.1 -p 8600 web.service.consul SRV获得更详细的服务信息。
      dig service query
    • 通过HTTP API查询curl http://localhost:8500/v1/catalog/service/web
      HTTP API service query

    key/value

    consul提供用于存储分布式环境需要的配置以及服务信息的key/value数据库。当然我们也可以根据需要存储我们自己的数据,示例演示了基本的key/value操作,代码如下所示:

    package main
    
    import (
        "github.com/hashicorp/consul/api"
        "log"
    )
    
    func main() {
        config := api.DefaultConfig()
        client, err := api.NewClient(config)
        if err != nil {
            log.Fatal(err)
        }
        kv := client.KV()
    
        pair := &api.KVPair{Key: "key-1", Value: []byte("10")}
    
        if _, err := kv.Put(pair, nil); err != nil {
            log.Fatal(err)
        }
    
        if p, _, err := kv.Get(pair.Key, nil); err != nil {
            log.Fatal(err)
        } else {
            log.Println(p.Key, "=>", string(p.Value))
        }
    
        if ps, _, err := kv.List(pair.Key, nil); err != nil {
            log.Fatal(err)
        } else {
            for _, p := range ps {
                log.Println(p.Key, "=>", string(p.Value))
            }
        }
    
        if ks, _, err := kv.Keys(pair.Key, "/", nil); err != nil {
            log.Fatal(err)
        } else {
            for _, k := range ks {
                log.Println(k)
            }
        }
    
        if _, err := kv.Delete(pair.Key, nil); err != nil {
            log.Fatal(err)
        } else {
            log.Println("delete key '", pair.Key, "' successfully")
        }
    }
    

    健康检查

    consul在每个server上都运行了一个轻量级的agent,因此健康检查(health check)可以从两个层次来进行,一个是针对service的应用级别检查,另一个是针对系统级别的检查。跟service一样,我们同样可以采用两种方式来定义检查: 检查定义和HTTP APIconsul提供五种方式的健康检查:

    • Script + Interval

    这种方式即使采用脚本定时(默认30s)的去检查,脚本的exit code和输出内容(最大长度4K,多余的将被截取)作为检查的结果。consul agent启动时,需设置enable_script_checks选项true

    • HTTP + Interval

    定时(默认30s)发送HTTP请求,根据请求返回的状态码来确定服务的检查结果,2xx表示服务正常, 429表示请求过多(警告状态, warning), 其他状态表示服务不正常(failure)。通常采用curl或者某种HTTP工具来实现。

    • TCP + Interval

    定时(默认30s)连接指定的hostname/ip(默认locahost)和port,如果连接成功,表示服务正常,否则不正常。

    • Time to Live (TTL)

    由服务定时(指定的TTL时长)去维护某个检查状态,如果服务无法正常去更新检查状态,则该状态会被标记为失败(failure)。

    • Docker + Interval

    我们的服务采用docker方式来部署并使用Docker Exec API来运行时,需设置enable_script_checks选项true

    如何定义健康检查可参考前文所述服务注册#HTTP API

    分布式锁的实现

    参考文章
    分布式锁的实现方式目前有redisSETNXetcd。下面使用consul来实现。
    consul官方关于信号量的说明

    package main
    
    import (
        "fmt"
        consulapi "github.com/hashicorp/consul/api"
        "log"
        "sync"
        "time"
    )
    
    var n int = 0
    
    func print(wg *sync.WaitGroup, char string) {
        go func() {
            config := consulapi.DefaultConfig()
            client, err := consulapi.NewClient(config)
    
            lock, err := client.LockKey("lockthis")
            if err != nil {
                log.Fatal(err)
            }
    
            defer wg.Done()
            for {
                _, err := lock.Lock(nil)
                if err != nil {
                    log.Fatal(err)
                }
                if n >= 20 {
                    if err := lock.Unlock(); err != nil {
                        log.Fatal(err)
                    }
                    break
                }
                n++
                fmt.Println(char, "->", n)
                time.Sleep(1 * time.Second)
                if err := lock.Unlock(); err != nil {
                    log.Fatal(err)
                }
            }
        }()
    }
    
    func main() {
        wg := &sync.WaitGroup{}
        wg.Add(2)
    
        print(wg, "A")
        print(wg, "B")
    
        wg.Wait()
        fmt.Println("Done")
    }
    

    执行该代码会依次打印出字母AB

    相关文章

      网友评论

        本文标题:consul

        本文链接:https://www.haomeiwen.com/subject/ukqpmxtx.html