consul是golang
开发的一个高可用的分布式服务注册系统,有service discovery
和key/value
,健康检查, 节点选举,多数据中心等功能,与zookeeper
和etcd
等相似。微服务框架go-micro
默认使用consul
作服务发现。
安装
go get github.com/hashicorp/consul
运行
安装好consul
后, 我们需要运行agent
, agent
可以以server mode
或者client mode
方式运行,通常一个数据中心需要3个或者5个server mode
, 其他的运行client mode
。client agent
是一个轻量级的进程,完成健康检查和转发请求到相应的server agent
, 可参考consul cluster。
consul agent -dev
# or
consul agent -dev -advertise=127.0.0.1
服务运行后, 我们可以在浏览器中打开consul
的控制界面了,访问http://localhost:8500
,界面如下图:
服务发现
consul
提供服务注册和服务查询的功能。服务指定的我们提供某个需求实现的接口,在consul
中,我们的服务看起来如下所示:
{
"service": {
"name": "redis",
"tags": ["primary"],
"address": "",
"port": 8000,
"enable_tag_override": false,
"checks": [
{
"script": "/usr/local/bin/check_redis.py",
"interval": "10s"
}
]
}
}
上面就算是定义了一个服务name=redis
, 我们给出了该服务的tags
, 以及服务的address
和port
,checks
指定我们对该服务需要做定时健康检查。
consul服务注册有两种方式:服务定义和HTTP API
调用。通常我们使用服务定义,下面我们说明如何来注册服务。
- 服务定义
保存下面文件到/etc/consul.d/web.json
。
{
"service": {
"name": "web",
"tags": ["rails"],
"address": "127.0.0.1",
"port": 8000
}
}
启动consul agent
consul agent -dev -advertise=127.0.0.1 -config-dir=/etc/consul.d
在consul
控制界面,可以发现已经成功注册了我们的web服务,如下图所示:
- HTTP API
package main
import (
"fmt"
consulapi "github.com/hashicorp/consul/api"
"log"
"time"
)
// start consul
// consul agent -dev -enable-script-checks --advertise=127.0.0.1
// to listen localhost:8080
// nc -lp 8000
// consul web
// http://127.0.0.1:8500/ui
const Id = "1234567890"
func testRegister() {
fmt.Println("test begin .")
config := consulapi.DefaultConfig()
//config.Address = "localhost"
fmt.Println("defautl config : ", config)
client, err := consulapi.NewClient(config)
if err != nil {
log.Fatal("consul client error : ", err)
}
//创建一个新服务。
registration := new(consulapi.AgentServiceRegistration)
registration.ID = Id
registration.Name = "web"
registration.Port = 8000
registration.Tags = []string{"rails"}
registration.Address = "127.0.0.1"
// //增加check。
// check := new(consulapi.AgentServiceCheck)
// check.HTTP = fmt.Sprintf("http://%s:%d%s", registration.Address, registration.Port, "/check")
// //设置超时 5s。
// check.Timeout = "5s"
// //设置间隔 5s。
// check.Interval = "5s"
// //注册check服务。
// registration.Check = check
// log.Println("get check.HTTP:", check)
//
// err = client.Agent().ServiceRegister(registration)
//
// if err != nil {
// log.Fatal("register server error : ", err)
// }
//增加check。
check := new(consulapi.AgentServiceCheck)
check.Args = []string{"sh", "-c", "sleep 1 && exit 0"}
//设置超时 5s。
check.Timeout = "5s"
//设置间隔 5s。
check.Interval = "5s"
//注册check服务。
registration.Check = check
err = client.Agent().ServiceRegister(registration)
if err != nil {
log.Fatal("register server error : ", err)
}
}
func testDeregister() {
fmt.Println("test begin .")
config := consulapi.DefaultConfig()
//config.Address = "localhost"
fmt.Println("defautl config : ", config)
client, err := consulapi.NewClient(config)
if err != nil {
log.Fatal("consul client error : ", err)
}
err = client.Agent().ServiceDeregister(Id)
if err != nil {
log.Fatal("register server error : ", err)
}
}
func main() {
log.Println("ready to register service")
testRegister()
time.Sleep(1 * time.Minute)
log.Println("ready to deregister service")
testDeregister()
}
服务查询, 服务消费者为了或者某个名字的服务提供者的具体信息(服务地址,端口等等),需要从consul agent
进行查询。
- 域名查询
dig @127.0.0.1 -p 8600 web.service.consul
结果如下所示:
dig dns query
另外还可以通过命令dig @127.0.0.1 -p 8600 web.service.consul SRV
获得更详细的服务信息。
dig service query - 通过HTTP API查询
curl http://localhost:8500/v1/catalog/service/web
HTTP API service query
key/value
consul
提供用于存储分布式环境需要的配置以及服务信息的key/value
数据库。当然我们也可以根据需要存储我们自己的数据,示例演示了基本的key/value
操作,代码如下所示:
package main
import (
"github.com/hashicorp/consul/api"
"log"
)
func main() {
config := api.DefaultConfig()
client, err := api.NewClient(config)
if err != nil {
log.Fatal(err)
}
kv := client.KV()
pair := &api.KVPair{Key: "key-1", Value: []byte("10")}
if _, err := kv.Put(pair, nil); err != nil {
log.Fatal(err)
}
if p, _, err := kv.Get(pair.Key, nil); err != nil {
log.Fatal(err)
} else {
log.Println(p.Key, "=>", string(p.Value))
}
if ps, _, err := kv.List(pair.Key, nil); err != nil {
log.Fatal(err)
} else {
for _, p := range ps {
log.Println(p.Key, "=>", string(p.Value))
}
}
if ks, _, err := kv.Keys(pair.Key, "/", nil); err != nil {
log.Fatal(err)
} else {
for _, k := range ks {
log.Println(k)
}
}
if _, err := kv.Delete(pair.Key, nil); err != nil {
log.Fatal(err)
} else {
log.Println("delete key '", pair.Key, "' successfully")
}
}
健康检查
consul
在每个server
上都运行了一个轻量级的agent
,因此健康检查(health check
)可以从两个层次来进行,一个是针对service
的应用级别检查,另一个是针对系统级别的检查。跟service
一样,我们同样可以采用两种方式来定义检查: 检查定义和HTTP API
。consul
提供五种方式的健康检查:
- Script + Interval
这种方式即使采用脚本定时(默认30s)的去检查,脚本的
exit code
和输出内容(最大长度4K,多余的将被截取)作为检查的结果。consul agent
启动时,需设置enable_script_checks
选项true
。
- HTTP + Interval
定时(默认30s)发送
HTTP
请求,根据请求返回的状态码来确定服务的检查结果,2xx
表示服务正常,429
表示请求过多(警告状态,warning
), 其他状态表示服务不正常(failure
)。通常采用curl
或者某种HTTP
工具来实现。
- TCP + Interval
定时(默认30s)连接指定的
hostname/ip
(默认locahost
)和port
,如果连接成功,表示服务正常,否则不正常。
- Time to Live (TTL)
由服务定时(指定的
TTL
时长)去维护某个检查状态,如果服务无法正常去更新检查状态,则该状态会被标记为失败(failure
)。
- Docker + Interval
我们的服务采用
docker
方式来部署并使用Docker Exec API
来运行时,需设置enable_script_checks
选项true
。
如何定义健康检查可参考前文所述服务注册#HTTP API
。
分布式锁的实现
参考文章
分布式锁的实现方式目前有redis
的SETNX
,etcd
。下面使用consul
来实现。
consul官方关于信号量的说明
package main
import (
"fmt"
consulapi "github.com/hashicorp/consul/api"
"log"
"sync"
"time"
)
var n int = 0
func print(wg *sync.WaitGroup, char string) {
go func() {
config := consulapi.DefaultConfig()
client, err := consulapi.NewClient(config)
lock, err := client.LockKey("lockthis")
if err != nil {
log.Fatal(err)
}
defer wg.Done()
for {
_, err := lock.Lock(nil)
if err != nil {
log.Fatal(err)
}
if n >= 20 {
if err := lock.Unlock(); err != nil {
log.Fatal(err)
}
break
}
n++
fmt.Println(char, "->", n)
time.Sleep(1 * time.Second)
if err := lock.Unlock(); err != nil {
log.Fatal(err)
}
}
}()
}
func main() {
wg := &sync.WaitGroup{}
wg.Add(2)
print(wg, "A")
print(wg, "B")
wg.Wait()
fmt.Println("Done")
}
执行该代码会依次打印出字母A
和B
。
网友评论