Transfer版本
VERSION = "0.0.17"
COMMIT = "e249d8a"
Transfer组件功能
接收RPC/TCP服务发送过来的采集数据归整、入队列、统计、转发数据至后端Graph、Judge或TSDB组件。
Transfer组件逻辑图

main入口分析
func main() {
//命令行参数解析
cfg := flag.String("c", "cfg.json", "configuration file")
version := flag.Bool("v", false, "show version")
versionGit := flag.Bool("vg", false, "show version")
flag.Parse()
//程序版本信息输出
if *version {
fmt.Println(g.VERSION)
os.Exit(0)
}
//git版本信息输出
if *versionGit {
fmt.Println(g.VERSION, g.COMMIT)
os.Exit(0)
}
//全局配置解析
g.ParseConfig(*cfg) //【参考详细分析】
// proc为统计模块,此处仅打印"proc.Start, ok"
// sender/receiver模块都引用统计功能
proc.Start()
//发送与接收监控数据
sender.Start() //主要业务逻辑模块【参考详细分析】
receiver.Start() //主要业务逻辑模块【参考详细分析】
//http API服务启动与监听处理
http.Start() //【参考详细分析】
select {}
}
g.ParseConfig(*cfg) 全局配置解析
# 默认配置为当前目录下cfg.json
func ParseConfig(cfg string) {
if cfg == "" {
log.Fatalln("use -c to specify configuration file")
}
//判断文件是否存在
if !file.IsExist(cfg) {
log.Fatalln("config file:", cfg, "is not existent. maybe you need `mv cfg.example.json cfg.json`")
}
ConfigFile = cfg
//读取配置成字符串
configContent, err := file.ToTrimString(cfg)
if err != nil {
log.Fatalln("read config file:", cfg, "fail:", err)
}
var c GlobalConfig
//json反序列化为配置结构体
err = json.Unmarshal([]byte(configContent), &c)
if err != nil {
log.Fatalln("parse config file:", cfg, "fail:", err)
}
// 主机信息字符串转为slice
c.Judge.ClusterList = formatClusterItems(c.Judge.Cluster)
c.Graph.ClusterList = formatClusterItems(c.Graph.Cluster)
configLock.Lock()
defer configLock.Unlock()
config = &c
log.Println("g.ParseConfig ok, file ", cfg)
}
# 如map["node"]="host1,host2"-->map["node"]=["host1", "host2"]
# 将主机信息字符串以","作为分隔符截为slice
func formatClusterItems(cluster map[string]string) map[string]*ClusterNode {
ret := make(map[string]*ClusterNode)
for node, clusterStr := range cluster {
items := strings.Split(clusterStr, ",") //截断
nitems := make([]string, 0)
for _, item := range items {
nitems = append(nitems, strings.TrimSpace(item)) //去空格
}
ret[node] = NewClusterNode(nitems) //&ClusterNode{nitems}
}
return ret
}
# 全局配置结构体
type GlobalConfig struct {
Debug bool `json:"debug"`
MinStep int `json:"minStep"` //最小周期,单位sec
Http *HttpConfig `json:"http"`
Rpc *RpcConfig `json:"rpc"`
Socket *SocketConfig `json:"socket"`
Judge *JudgeConfig `json:"judge"`
Graph *GraphConfig `json:"graph"`
Tsdb *TsdbConfig `json:"tsdb"`
}
http.Start() HTTP API服务启动与监听处理
# 后台线程执行
func Start() {
go startHttpServer()
}
func startHttpServer() {
if !g.Config().Http.Enabled {
return
}
addr := g.Config().Http.Listen
if addr == "" {
return
}
configCommonRoutes() //公共API接口路由
configProcHttpRoutes() //统计API接口路由
configDebugHttpRoutes() //Debug接口路由
configApiRoutes() //"/api/push"配置上传接口路由
s := &http.Server{
Addr: addr,
MaxHeaderBytes: 1 << 30,
}
log.Println("http.startHttpServer ok, listening", addr)
log.Fatalln(s.ListenAndServe()) //Run
}
# 公共API接口路由
func configCommonRoutes() {
//健康检测
http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {})
//查询版本信息
http.HandleFunc("/version", func(w http.ResponseWriter, r *http.Request) {})
//工作目录查询
http.HandleFunc("/workdir", func(w http.ResponseWriter, r *http.Request) {})
//全局配置项信息
http.HandleFunc("/config", func(w http.ResponseWriter, r *http.Request) {})
//重载配置信息
http.HandleFunc("/config/reload", func(w http.ResponseWriter, r *http.Request) {}
# 传递新metric值
func configApiRoutes() {
http.HandleFunc("/api/push", api_push_datapoints)
}
# 统计API接口
func configProcHttpRoutes() {
//查看所有统计信息
http.HandleFunc("/counter/all", func(w http.ResponseWriter, r *http.Request) {})
//旧统计接口已弃用
http.HandleFunc("/statistics/all", func(w http.ResponseWriter, r *http.Request) {})
//查看最小上报周期
http.HandleFunc("/proc/step", func(w http.ResponseWriter, r *http.Request) {})
//trace信息"RecvDataTrace"
http.HandleFunc("/trace/", func(w http.ResponseWriter, r *http.Request) {})
// filter信息"RecvDataFilter"
http.HandleFunc("/filter/", func(w http.ResponseWriter, r *http.Request) {}
#Debug接口路由
func configDebugHttpRoutes() {
//debug查看连接池信息 ("judge" OR "graph")
http.HandleFunc("/debug/connpool/", func(w http.ResponseWriter, r *http.Request) {}
sender.Start() 初始化数据发送服务
# 初始化数据发送服务
func Start() {
// 初始化默认参数
MinStep = g.Config().MinStep
if MinStep < 1 {
MinStep = 30 //默认30s
}
initConnPools() //初始化连接池
initSendQueues() //初始化队列(双链safeList)
initNodeRings() //初始化分布式HASH表(后端集群分布数据)
startSendTasks() // SendTasks依赖基础组件的初始化,发送数据至后端
startSenderCron() // 定期收集与打印统计数据
log.Println("send.Start, ok")
}
- initConnPools() 初始化后端连接池
## 初始化后端连接池
func initConnPools() {
cfg := g.Config()
// 实例化judgeInstances数据集,judgeInstances.ToSlice()转化Slice
// judge配置,根据全局集群配置创建连接池
judgeInstances := nset.NewStringSet()
for _, instance := range cfg.Judge.Cluster {
judgeInstances.Add(instance)
}
JudgeConnPools = backend.CreateSafeRpcConnPools(cfg.Judge.MaxConns, cfg.Judge.MaxIdle,
cfg.Judge.ConnTimeout, cfg.Judge.CallTimeout, judgeInstances.ToSlice())
// tsdb配置,根据全局集群配置创建连接池
if cfg.Tsdb.Enabled {
TsdbConnPoolHelper = backend.NewTsdbConnPoolHelper(cfg.Tsdb.Address, cfg.Tsdb.MaxConns, cfg.Tsdb.MaxIdle, cfg.Tsdb.ConnTimeout, cfg.Tsdb.CallTimeout)
}
// graph配置,根据全局集群配置创建连接池
graphInstances := nset.NewSafeSet()
for _, nitem := range cfg.Graph.ClusterList {
for _, addr := range nitem.Addrs {
graphInstances.Add(addr)
}
}
GraphConnPools = backend.CreateSafeRpcConnPools(cfg.Graph.MaxConns, cfg.Graph.MaxIdle,
cfg.Graph.ConnTimeout, cfg.Graph.CallTimeout, graphInstances.ToSlice())
}
- initSendQueues() 初始化各后端服务器及集群的发送队列
## 初始化各后端服务器及集群的发送队列
func initSendQueues() {
cfg := g.Config()
//根据全局配置Judge集群设置创建相对应个数的队列
for node := range cfg.Judge.Cluster {
Q := nlist.NewSafeListLimited(DefaultSendQueueMaxSize)
JudgeQueues[node] = Q
}
//根据全局配置Graph集群设置创建相对应个数的队列
for node, nitem := range cfg.Graph.ClusterList {
for _, addr := range nitem.Addrs {
Q := nlist.NewSafeListLimited(DefaultSendQueueMaxSize)
GraphQueues[node+addr] = Q
}
}
//根据全局配置tsdb是否开启,创建队列
if cfg.Tsdb.Enabled {
TsdbQueue = nlist.NewSafeListLimited(DefaultSendQueueMaxSize)
}
}
- initNodeRings() 初始化一致性Hash环
## 初始化一致性Hashing环(JudgeNodeRing,GraphNodeRing)
## Data->Node映射
## 了解基本知识"一致性Hashing",
func initNodeRings() {
cfg := g.Config()
JudgeNodeRing = rings.NewConsistentHashNodesRing(int32(cfg.Judge.Replicas), cutils.KeysOfMap(cfg.Judge.Cluster))
GraphNodeRing = rings.NewConsistentHashNodesRing(int32(cfg.Graph.Replicas), cutils.KeysOfMap(cfg.Graph.Cluster))
}
- startSendTasks() 发送数据与控制
func startSendTasks() {
cfg := g.Config()
//init semaphore
judgeConcurrent := cfg.Judge.MaxConns
graphConcurrent := cfg.Graph.MaxConns
tsdbConcurrent := cfg.Tsdb.MaxConns
if tsdbConcurrent < 1 {
tsdbConcurrent = 1
}
if judgeConcurrent < 1 {
judgeConcurrent = 1
}
if graphConcurrent < 1 {
graphConcurrent = 1
}
//发送数据到Judge集群
for node := range cfg.Judge.Cluster {
queue := JudgeQueues[node]
go forward2JudgeTask(queue, node, judgeConcurrent) //后台线程周期性发送任务
}
//发送数据到Graph集群
for node, nitem := range cfg.Graph.ClusterList {
for _, addr := range nitem.Addrs {
queue := GraphQueues[node+addr]
go forward2GraphTask(queue, node, addr, graphConcurrent) //后台线程周期性发送任务
}
}
//发送数据到Tsdb
if cfg.Tsdb.Enabled {
go forward2TsdbTask(tsdbConcurrent) //后台线程周期性发送任务
}
}
//Judge定时任务, 将 Judge发送缓存中的数据 通过rpc连接池 发送到Judge
func forward2JudgeTask(Q *list.SafeListLimited, node string, concurrent int) {
batch := g.Config().Judge.Batch // 一次发送,最多batch条数据
addr := g.Config().Judge.Cluster[node]
sema := nsema.NewSemaphore(concurrent)
for {
items := Q.PopBackBy(batch) //队列后面批量Pop数据
count := len(items)
if count == 0 {
time.Sleep(DefaultSendTaskSleepInterval) //周期间隔
continue
}
judgeItems := make([]*cmodel.JudgeItem, count)
for i := 0; i < count; i++ {
judgeItems[i] = items[i].(*cmodel.JudgeItem)
}
// 同步Call + 有限并发 进行发送
sema.Acquire()
go func(addr string, judgeItems []*cmodel.JudgeItem, count int) {
defer sema.Release()
resp := &cmodel.SimpleRpcResponse{}
var err error
sendOk := false
for i := 0; i < 3; i++ { //最多重试3次
err = JudgeConnPools.Call(addr, "Judge.Send", judgeItems, resp) //RPC发送数据至Judge
if err == nil {
sendOk = true
break
}
time.Sleep(time.Millisecond * 10) //失败重试间隔
}
// 发送成功或失败计数
if !sendOk {
log.Printf("send judge %s:%s fail: %v", node, addr, err)
proc.SendToJudgeFailCnt.IncrBy(int64(count))
} else {
proc.SendToJudgeCnt.IncrBy(int64(count))
}
}(addr, judgeItems, count)
}
}
type JudgeItem struct {
Endpoint string `json:"endpoint"`
Metric string `json:"metric"`
Value float64 `json:"value"`
Timestamp int64 `json:"timestamp"`
JudgeType string `json:"judgeType"`
Tags map[string]string `json:"tags"`
}
// Graph定时任务, 将 Graph发送缓存中的数据 通过rpc连接池 发送到Graph
func forward2GraphTask(Q *list.SafeListLimited, node string, addr string, concurrent int) {
batch := g.Config().Graph.Batch // 一次发送,最多batch条数据
sema := nsema.NewSemaphore(concurrent)
for {
items := Q.PopBackBy(batch)
count := len(items)
if count == 0 {
time.Sleep(DefaultSendTaskSleepInterval)
continue
}
graphItems := make([]*cmodel.GraphItem, count)
for i := 0; i < count; i++ {
graphItems[i] = items[i].(*cmodel.GraphItem)
}
sema.Acquire()
go func(addr string, graphItems []*cmodel.GraphItem, count int) {
defer sema.Release()
resp := &cmodel.SimpleRpcResponse{}
var err error
sendOk := false
for i := 0; i < 3; i++ { //最多重试3次
err = GraphConnPools.Call(addr, "Graph.Send", graphItems, resp) //RPC发送数据至Graph
if err == nil {
sendOk = true
break
}
time.Sleep(time.Millisecond * 10) //失败重试间隔
}
// 发送成功或失败计数
if !sendOk {
log.Printf("send to graph %s:%s fail: %v", node, addr, err)
proc.SendToGraphFailCnt.IncrBy(int64(count))
} else {
proc.SendToGraphCnt.IncrBy(int64(count))
}
}(addr, graphItems, count)
}
}
// DsType 即RRD中的Datasource的类型:GAUGE|COUNTER|DERIVE
type GraphItem struct {
Endpoint string `json:"endpoint"`
Metric string `json:"metric"`
Tags map[string]string `json:"tags"`
Value float64 `json:"value"`
Timestamp int64 `json:"timestamp"`
DsType string `json:"dstype"`
Step int `json:"step"`
Heartbeat int `json:"heartbeat"`
Min string `json:"min"`
Max string `json:"max"`
}
// Tsdb定时任务, 将数据通过api发送到tsdb
func forward2TsdbTask(concurrent int) {
batch := g.Config().Tsdb.Batch // 一次发送,最多batch条数据
retry := g.Config().Tsdb.MaxRetry //失败尝试最大次数
sema := nsema.NewSemaphore(concurrent) //并发同步机制(信号量)
for {
items := TsdbQueue.PopBackBy(batch)
if len(items) == 0 {
time.Sleep(DefaultSendTaskSleepInterval)
continue
}
// 同步Call + 有限并发 进行发送
sema.Acquire()
go func(itemList []interface{}) {
defer sema.Release()
var tsdbBuffer bytes.Buffer
// 格式数据
for i := 0; i < len(itemList); i++ {
tsdbItem := itemList[i].(*cmodel.TsdbItem)
tsdbBuffer.WriteString(tsdbItem.TsdbString()) //格式化发送数据
tsdbBuffer.WriteString("\n") //分隔符
}
var err error
for i := 0; i < retry; i++ {
err = TsdbConnPoolHelper.Send(tsdbBuffer.Bytes()) // 发布数据至TSDB
// 发送成功计数
if err == nil {
proc.SendToTsdbCnt.IncrBy(int64(len(itemList)))
break
}
time.Sleep(100 * time.Millisecond)
}
// 发送失败计数与告警
if err != nil {
proc.SendToTsdbFailCnt.IncrBy(int64(len(itemList)))
log.Println(err)
return
}
}(items)
}
}
- startSenderCron() 发送相关数据统计
## 后台线程,发送统计数据
func startSenderCron() {
go startProcCron() //发送统计
go startLogCron() //连接池统计打印输出
}
func startProcCron() {
for {
time.Sleep(DefaultProcCronPeriod)
refreshSendingCacheSize() //发送队列统计
}
}
func startLogCron() {
for {
time.Sleep(DefaultLogCronPeriod)
logConnPoolsProc() //日志打印输出Graph连接池统计
}
}
func refreshSendingCacheSize() {
proc.JudgeQueuesCnt.SetCnt(calcSendCacheSize(JudgeQueues))
proc.GraphQueuesCnt.SetCnt(calcSendCacheSize(GraphQueues))
}
func calcSendCacheSize(mapList map[string]*list.SafeListLimited) int64 {
var cnt int64 = 0
for _, list := range mapList {
if list != nil {
cnt += int64(list.Len())
}
}
return cnt
}
func logConnPoolsProc() {
log.Printf("connPools proc: \n%v", strings.Join(GraphConnPools.Proc(), "\n"))
}
type TsdbItem struct {
Metric string `json:"metric"`
Tags map[string]string `json:"tags"`
Value float64 `json:"value"`
Timestamp int64 `json:"timestamp"`
}
receiver.Start() 接收数据,入列队
// RPC、TCP服务启动,接收数据
func Start() {
go rpc.StartRpc()
go socket.StartSocket()
}
RPC 服务启动与监听处理
func StartRpc() {
if !g.Config().Rpc.Enabled {
return
}
addr := g.Config().Rpc.Listen
tcpAddr, err := net.ResolveTCPAddr("tcp", addr) //默认端口8433
if err != nil {
log.Fatalf("net.ResolveTCPAddr fail: %s", err)
}
listener, err := net.ListenTCP("tcp", tcpAddr)
if err != nil {
log.Fatalf("listen %s fail: %s", addr, err)
} else {
log.Println("rpc listening", addr)
}
server := rpc.NewServer() //RPC server
server.Register(new(Transfer)) // 注册RPC类型
for {
conn, err := listener.Accept()
if err != nil {
log.Println("listener.Accept occur error:", err)
continue
}
go server.ServeCodec(jsonrpc.NewServerCodec(conn))
}
}
// RPC "Transfer.Ping"
func (this *Transfer) Ping(req cmodel.NullRpcRequest, resp *cmodel.SimpleRpcResponse) error {
return nil
}
// RPC "Transfer.Update"
func (t *Transfer) Update(args []*cmodel.MetricValue, reply *cmodel.TransferResponse) error {
return RecvMetricValues(args, reply, "rpc") //采集数据接收处理
}
// process new metric values
func RecvMetricValues(args []*cmodel.MetricValue, reply *cmodel.TransferResponse, from string) error {
start := time.Now() //start计时,用于计算处理总处理时间
reply.Invalid = 0
items := []*cmodel.MetaData{}
for _, v := range args {
if v == nil {
reply.Invalid += 1
continue
}
// 历史遗留问题.
// 老版本agent上报的metric=kernel.hostname的数据,其取值为string类型,现在已经不支持了;所以,这里硬编码过滤掉
if v.Metric == "kernel.hostname" {
reply.Invalid += 1
continue
}
if v.Metric == "" || v.Endpoint == "" {
reply.Invalid += 1
continue
}
//采集数据类型(RRDtool:COUNTER计数/GAUGE计量/DERIVE差量)
if v.Type != g.COUNTER && v.Type != g.GAUGE && v.Type != g.DERIVE {
reply.Invalid += 1
continue
}
if v.Value == "" {
reply.Invalid += 1
continue
}
if v.Step <= 0 {
reply.Invalid += 1
continue
}
if len(v.Metric)+len(v.Tags) > 510 {
reply.Invalid += 1
continue
}
// TODO 呵呵,这里需要再优雅一点
now := start.Unix()
if v.Timestamp <= 0 || v.Timestamp > now*2 {
v.Timestamp = now
}
fv := &cmodel.MetaData{
Metric: v.Metric,
Endpoint: v.Endpoint,
Timestamp: v.Timestamp,
Step: v.Step,
CounterType: v.Type,
Tags: cutils.DictedTagstring(v.Tags), //TODO tags键值对的个数,要做一下限制
}
valid := true
var vv float64
var err error
//Value值反射类型判断,转为float64
switch cv := v.Value.(type) {
case string:
vv, err = strconv.ParseFloat(cv, 64)
if err != nil {
valid = false
}
case float64:
vv = cv
case int64:
vv = float64(cv)
default:
valid = false
}
if !valid {
reply.Invalid += 1
continue
}
fv.Value = vv
items = append(items, fv)
}
// 发送计数(统计)
cnt := int64(len(items))
proc.RecvCnt.IncrBy(cnt)
if from == "rpc" {
proc.RpcRecvCnt.IncrBy(cnt)
} else if from == "http" {
proc.HttpRecvCnt.IncrBy(cnt)
}
cfg := g.Config()
if cfg.Graph.Enabled {
sender.Push2GraphSendQueue(items) //入Graphe队列
}
if cfg.Judge.Enabled {
sender.Push2JudgeSendQueue(items) //入Judge队列
}
if cfg.Tsdb.Enabled {
sender.Push2TsdbSendQueue(items) //入Tsdb队列
}
reply.Message = "ok"
reply.Total = len(args)
reply.Latency = (time.Now().UnixNano() - start.UnixNano()) / 1000000
return nil
}
// 将数据 打入 某个Judge的发送缓存队列, 具体是哪一个Judge 由一致性哈希 决定
func Push2JudgeSendQueue(items []*cmodel.MetaData) {
for _, item := range items {
pk := item.PK()
node, err := JudgeNodeRing.GetNode(pk) //一致性哈希分配Node
if err != nil {
log.Println("E:", err)
continue
}
// align ts
step := int(item.Step)
if step < MinStep {
step = MinStep
}
ts := alignTs(item.Timestamp, int64(step))
//judgeItem格式化
judgeItem := &cmodel.JudgeItem{
Endpoint: item.Endpoint,
Metric: item.Metric,
Value: item.Value,
Timestamp: ts,
JudgeType: item.CounterType,
Tags: item.Tags,
}
Q := JudgeQueues[node]
isSuccess := Q.PushFront(judgeItem) //入队最前
// 统计错误
if !isSuccess {
proc.SendToJudgeDropCnt.Incr()
}
}
}
// 将数据 打入 某个Graph的发送缓存队列, 具体是哪一个Graph 由一致性哈希 决定
func Push2GraphSendQueue(items []*cmodel.MetaData) {
cfg := g.Config().Graph
for _, item := range items {
graphItem, err := convert2GraphItem(item) //格式化为GraphItem
if err != nil {
log.Println("E:", err)
continue
}
pk := item.PK()
// statistics. 为了效率,放到了这里,因此只有graph是enbale时才能trace
proc.RecvDataTrace.Trace(pk, item)
proc.RecvDataFilter.Filter(pk, item.Value, item)
node, err := GraphNodeRing.GetNode(pk) //一致性哈希分配Node
if err != nil {
log.Println("E:", err)
continue
}
cnode := cfg.ClusterList[node]
errCnt := 0
for _, addr := range cnode.Addrs {
Q := GraphQueues[node+addr]
if !Q.PushFront(graphItem) { //入队最前
errCnt += 1
}
}
// statistics
if errCnt > 0 {
proc.SendToGraphDropCnt.Incr()
}
}
}
// 打到Graph的数据,要根据rrdtool的特定 来限制 step、counterType、timestamp
func convert2GraphItem(d *cmodel.MetaData) (*cmodel.GraphItem, error) {
item := &cmodel.GraphItem{}
item.Endpoint = d.Endpoint
item.Metric = d.Metric
item.Tags = d.Tags
item.Timestamp = d.Timestamp
item.Value = d.Value
item.Step = int(d.Step)
if item.Step < MinStep {
item.Step = MinStep
}
item.Heartbeat = item.Step * 2
if d.CounterType == g.GAUGE {
item.DsType = d.CounterType
item.Min = "U"
item.Max = "U"
} else if d.CounterType == g.COUNTER {
item.DsType = g.DERIVE
item.Min = "0"
item.Max = "U"
} else if d.CounterType == g.DERIVE {
item.DsType = g.DERIVE
item.Min = "0"
item.Max = "U"
} else {
return item, fmt.Errorf("not_supported_counter_type")
}
item.Timestamp = alignTs(item.Timestamp, int64(item.Step)) //item.Timestamp - item.Timestamp%int64(item.Step)
return item, nil
}
// 将原始数据入到tsdb发送缓存队列
func Push2TsdbSendQueue(items []*cmodel.MetaData) {
for _, item := range items {
tsdbItem := convert2TsdbItem(item)
isSuccess := TsdbQueue.PushFront(tsdbItem)
if !isSuccess {
proc.SendToTsdbDropCnt.Incr()
}
}
}
// 转化为tsdb格式
func convert2TsdbItem(d *cmodel.MetaData) *cmodel.TsdbItem {
t := cmodel.TsdbItem{Tags: make(map[string]string)}
for k, v := range d.Tags {
t.Tags[k] = v
}
t.Tags["endpoint"] = d.Endpoint
t.Metric = d.Metric
t.Timestamp = d.Timestamp
t.Value = d.Value
return &t
}
func alignTs(ts int64, period int64) int64 {
return ts - ts%period
}
TCP socket服务启动与监听数据转发
func StartSocket() {
if !g.Config().Socket.Enabled {
return
}
addr := g.Config().Socket.Listen //默认4444
tcpAddr, err := net.ResolveTCPAddr("tcp", addr)
if err != nil {
log.Fatalf("net.ResolveTCPAddr fail: %s", err)
}
listener, err := net.ListenTCP("tcp", tcpAddr)
if err != nil {
log.Fatalf("listen %s fail: %s", addr, err)
} else {
log.Println("socket listening", addr)
}
defer listener.Close()
for {
conn, err := listener.Accept()
if err != nil {
log.Println("listener.Accept occur error:", err)
continue
}
go socketTelnetHandle(conn) //数据处理格式化与向后端转发
}
}
## 数据处理格式化与向后端转发
func socketTelnetHandle(conn net.Conn) {
defer conn.Close()
items := []*cmodel.MetaData{} //采集数据结构格式
buf := bufio.NewReader(conn)
cfg := g.Config()
timeout := time.Duration(cfg.Socket.Timeout) * time.Second
for {
conn.SetReadDeadline(time.Now().Add(timeout))
line, err := buf.ReadString('\n') //行读取
if err != nil {
break
}
line = strings.Trim(line, "\n")
if line == "quit" { //退出TCP连接命令“quit”
break
}
if line == "" {
continue
}
t := strings.Fields(line) //以空格分隔Slice
if len(t) < 2 {
continue
}
cmd := t[0]
if cmd != "update" {
continue
}
item, err := convertLine2MetaData(t[1:]) //格式化数据
if err != nil {
continue
}
items = append(items, item) //汇集数据
}
// 统计
proc.SocketRecvCnt.IncrBy(int64(len(items)))
proc.RecvCnt.IncrBy(int64(len(items)))
if cfg.Graph.Enabled {
sender.Push2GraphSendQueue(items) //发送graph队列
}
if cfg.Judge.Enabled {
sender.Push2JudgeSendQueue(items) //发送Judge队列
}
return
}
type MetaData struct {
Metric string `json:"metric"`
Endpoint string `json:"endpoint"`
Timestamp int64 `json:"timestamp"`
Step int64 `json:"step"`
Value float64 `json:"value"`
CounterType string `json:"counterType"`
Tags map[string]string `json:"tags"`
}
扩展知识
-
github.com/toolkits/proc
统计工具(CounterBase/CounterQps)
-
github.com/toolkits/container/set
字符串集合
-
github.com/toolkits/consistent/rings
一致性Hashing
-
github.com/toolkits/container/list
双向链列表
-
github.com/toolkits/concurrent/tree/master/semaphore
并发信号量
-
github.com/toolkits/conn_pool
连接池
-
github.com/toolkits/conn_pool/rpc_conn_pool
RPC连接池
网友评论