美文网首页程序员之言Java
海量日志实时收集系统架构设计与go语言实现

海量日志实时收集系统架构设计与go语言实现

作者: java面试笔试 | 来源:发表于2018-09-10 16:54 被阅读11次

日志收集系统应该说是到达一定规模的公司的标配了,一个能满足业务需求、运维成本低、稳定的日志收集系统对于运维的同学和日志使用方的同学都是非常nice的。然而这时理想中的日志收集系统,现实往往不是这样的...本篇的主要内容是:首先吐槽一下公司以前的日志收集和上传;介绍新的实时日志收集系统架构;用go语言实现。澄清一下,并不是用go语言实现全部,比如用到卡夫卡肯定不能重写一个kafka吧……

logagent所有代码已上传到github:https://github.com/zingp/logagent。

1 老系统吐槽

我司以前的日志收集系统概述如下:

日志收集的频率有每小时收集一次、每5分钟收集一次、实时收集三种。大部分情况是每小时收集上传一次。

(1) 每5分钟上传一次和每小时上传一次的情况是这样的:

每台机器上都需要部署一个日志收集agengt,部署一个日志上传agent,每台机器都需要挂载hadoop集群的客户端。

日志收集agent负责切割日志,上传agent整点的时候启动利用hadoop客户端,将切割好的前1小时或前5分钟日志打包上传到hadoop集群。

(2) 实时传输的情况是这样的

每台机器上部署另一个agent,该agent实时收集日志传输到kafka。

看到这里你可能都看不下去了,这么复杂臃肿费劲的日志收集系统是怎么设计出来的?额...先辩解一下,这套系统有4年以上的历史了,当时的解决方案确实有限。辩解完之后还是得吐槽一下系统存在的问题:

(1) 首先部署在每台机器上的agent没有做统一的配置入口,需要根据不同业务到不同机器上配置,运维成本太大;十台机器也就罢了,问题是现在有几万台机器,几千个服务。

(2) 最无语的是针对不同的hadoop集群,需要挂载多个hadoop客户端,也就是存在一台机器上部署几个hadoop客户端的情况。运维成本太大……

(3) 没做限流,整点的时候传输压力变大。某些机器有很多日志,一到整点压力就上来了。无图无真相,我们来看下:

CPU:看绿色的线条

负载:

网卡:

这组机器比较典型(这就是前文说的有多个hadoop客户端的情况),截图是凌晨至上午的时间段,还未到真正的高峰期。不过总体上可看出整点的压力是明显比非正点高很多的,已经到了不能忍的地步。

(4) 省略n条吐槽……

2 新系统架构

首先日志收集大可不必在客户端分为1小时、5分钟、实时这几种频率,只需要实时一种就能满足前面三种需求。

其次可以砍掉在机器上挂载hadoop客户端,放在其他地方做日志上传hadoop流程。

第三,做统一的配置管理系统,提供友好的web界面,用户只需要在web界面上配置一组service需要收集的日志,便可通知该组service下的所有机器上的日志收集agent。

第四,流量削峰。应该说实时收集可以避免旧系统整点负载过大情况,但依旧应该做限流功能,防止高峰期agent过度消耗资源影响业务。

第五,日志补传...

实际上公司有的部门在用flume做日志收集,但觉得太重。经过一段时间调研和结合自身业务特点,利用开源软件在适当做些开发会比较好。go应该擅长做这个事,而且方便运维。好了,附上架构图。

将用go实现logagent,Web,transfer这个三个部分。

logagent主要负责按照配置实时收集日志发送到kafka,此外还需watch etcd中的配置,如改变,需要热更新。

web部分主要用于更新etcd中的配置,etcd已提供接口,我们只需要集成到资源管理系统或CMDB系统的管理界面中去即可。

transfer 做的是消费kafka队列中的日志,发送到es/hadoop/storm中去。

3 实现logagent

3.1 配置设计

首先思考下logagent的配置文件内容:

etcd_addr = 10.134.123.183:2379# etcd 地址

etcd_timeout = 5# 连接etcd超时时间

etcd_watch_key = /logagent/%s/logconfig# etcd key 格式

kafka_addr = 10.134.123.183:9092# 卡夫卡地址

thread_num = 4# 线程数

log = ./log/logagent.log# agent的日志文件

level = debug# 日志级别

# 监听哪些日志,日志限流大小,发送到卡夫卡的哪个topic  这个部分可以放到etcd中去。

如上所说,监听哪些日志,日志限流大小,发送到卡夫卡的哪个topic 这个部分可以放到etcd中去。etcd中存储的value格式设计如下:

`[

{

"service":"test_service",

"log_path":"/search/nginx/logs/ping-android.shouji.sogou.com_access_log","topic":"nginx_log",

"send_rate": 1000

},

{

"service":"srv.android.shouji.sogou.com",

"log_path":"/search/nginx/logs/srv.android.shouji.sogou.com_access_log","topic":"nginx_log",

"send_rate": 2000

}

]`

-"service":"服务名称",

-"log_path":"应该监听的日志文件",

-"topic":"kfk topic",

-"send_rate":"日志条数限制"

其实可以将更多的配置放入etcd中,根据自身业务情况可自行定义,本次就做如此设计,接下来可以写解析配置文件的代码了。

config.go

packagemain

import(

"fmt"

"github.com/astaxie/beego/config"

)

typeAppConfigstruct{

EtcdAddrstring

EtcdTimeOutint

EtcdWatchKeystring

KafkaAddrstring

ThreadNumint

LogFilestring

LogLevelstring

}

varappConf = &AppConfig{}

funcinitConfig(filestring)(err error){

conf, err := config.NewConfig("ini", file)

iferr !=nil{

fmt.Println("new config failed, err:", err)

return

}

appConf.EtcdAddr = conf.String("etcd_addr")

appConf.EtcdTimeOut = conf.DefaultInt("etcd_timeout",5)

appConf.EtcdWatchKey = conf.String("etcd_watch_key")

appConf.KafkaAddr = conf.String("kafka_addr")

appConf.ThreadNum = conf.DefaultInt("thread_num",4)

appConf.LogFile = conf.String("log")

appConf.LogLevel = conf.String("level")

return

}

代码主要定义了一个AppConf结构体,然后读取配置文件,存放到结构体中。

此外,还有部分配置在etcd中,需要做两件事,第一次启动程序时将配置从etcd拉取下来;然后启动一个协程去watch etcd中的配置是否更改,如果更改需要拉取并更新到内存中。代码如下:

etcd.go:

packagemain

import(

"context"

"fmt"

"sync"

"time"

"github.com/astaxie/beego/logs"

client"github.com/coreos/etcd/clientv3"

)

var(

confChan  =make(chanstring,10)

cli       *client.Client

waitGroup sync.WaitGroup

)

funcinitEtcd(addr []string, keyFormatstring, timeout time.Duration)(err error){

// init a global var cli and can not close

cli, err = client.New(client.Config{

Endpoints:   addr,

DialTimeout: timeout,

})

iferr !=nil{

fmt.Println("connect etcd error:", err)

return

}

logs.Debug("init etcd success")

// defer cli.Close()   //can not close

varetcdKeys []string

ips, err := getLocalIP()

iferr !=nil{

fmt.Println("get local ip error:", err)

return

}

for_, ip :=rangeips {

key := fmt.Sprintf(keyFormat, ip)

etcdKeys =append(etcdKeys, key)

}

// first, pull conf from etcd

for_, key :=rangeetcdKeys {

ctx, cancel := context.WithTimeout(context.Background(), time.Second)

resp, err := cli.Get(ctx, key)

cancel()

iferr !=nil{

fmt.Println("get etcd key failed, error:", err)

continue

}

for_, ev :=rangeresp.Kvs {

// return result is not string

confChan <-string(ev.Value)

fmt.Printf("etcd key = %s , etcd value = %s", ev.Key, ev.Value)

}

}

waitGroup.Add(1)

// second, start a goroutine to watch etcd

goetcdWatch(etcdKeys)

return

}

// watch etcd

funcetcdWatch(keys []string){

deferwaitGroup.Done()

varwatchChans []client.WatchChan

for_, key :=rangekeys {

rch := cli.Watch(context.Background(), key)

watchChans =append(watchChans, rch)

}

for{

for_, watchC :=rangewatchChans {

select{

casewresp := <-watchC:

for_, ev :=rangewresp.Events {

confChan <-string(ev.Kv.Value)

logs.Debug("etcd key = %s , etcd value = %s", ev.Kv.Key, ev.Kv.Value)

}

default:

}

}

time.Sleep(time.Second)

}

}

//GetEtcdConfChan is func get etcd conf add to chan

funcGetEtcdConfChan()chanstring{

returnconfChan

}

其中,有一个比较个性化的设计,就是一台主机对应的etcd 中的key我们设置成/logagent/本机ip/logconfig的格式,因此还需要一个获取本机IP的功能,注意一台机器可能存在多个IP。

ip.go:

packagemain

import(

"fmt"

"net"

)

// var a slice for ip addr

varipArray []string

funcgetLocalIP()(ips []string, err error){

ifaces, err := net.Interfaces()

iferr !=nil{

fmt.Println("get ip interfaces error:", err)

return

}

for_, i :=rangeifaces {

addrs, errRet := i.Addrs()

iferrRet !=nil{

continue

}

for_, addr :=rangeaddrs {

varip net.IP

switchv := addr.(type) {

case*net.IPNet:

ip = v.IP

ifip.IsGlobalUnicast() {

ips =append(ips, ip.String())

}

}

}

}

return

}

3.2 初始化kafka

初始化kafka很简单,就是创建kafka实例,提供发送日志功能。只不过发送是并发的。

packagemain

import(

"fmt"

"github.com/Shopify/sarama"

"github.com/astaxie/beego/logs"

)

varkafkaSend = &KafkaSend{}

typeMessagestruct{

linestring

topicstring

}

typeKafkaSendstruct{

client   sarama.SyncProducer

lineChanchan*Message

}

funcinitKafka(kafkaAddrstring, threadNumint)(err error){

kafkaSend, err = NewKafkaSend(kafkaAddr, threadNum)

return

}

// NewKafkaSend is

funcNewKafkaSend(kafkaAddrstring, threadNumint)(kafka *KafkaSend, err error){

kafka = &KafkaSend{

lineChan:make(chan*Message,10000),

}

config := sarama.NewConfig()

config.Producer.RequiredAcks = sarama.WaitForAll// wait kafka ack

config.Producer.Partitioner = sarama.NewRandomPartitioner// random partition

config.Producer.Return.Successes =true

client, err := sarama.NewSyncProducer([]string{kafkaAddr}, config)

iferr !=nil{

logs.Error("init kafka client err: %v", err)

return

}

kafka.client = client

fori :=0; i < threadNum; i++ {

fmt.Println("start to send kfk")

waitGroup.Add(1)

gokafka.sendMsgToKfk()

}

return

}

func(k *KafkaSend)sendMsgToKfk(){

deferwaitGroup.Done()

forv :=rangek.lineChan {

msg := &sarama.ProducerMessage{}

msg.Topic = v.topic

msg.Value = sarama.StringEncoder(v.line)

_, _, err := k.client.SendMessage(msg)

iferr !=nil{

logs.Error("send massage to kafka error: %v", err)

return

}

}

}

func(k *KafkaSend)addMessage(linestring, topicstring)(err error){

k.lineChan <- &Message{line: line, topic: topic}

return

}

3.3 实时读取日志,发送到kafka

用到第三方包:"github.com/hpcloud/tail"。将每个监听的日志,都抽象成一个对象。

packagemain

import(

"encoding/json"

"fmt"

"strings"

"sync"

"github.com/astaxie/beego/logs"

"github.com/hpcloud/tail"

)

// TailObj is TailMgr's instance

typeTailObjstruct{

tail     *tail.Tail

offsetint64

logConf  LogConfig

secLimit *SecondLimit

exitChanchanbool

}

vartailMgr *TailMgr

//TailMgr to manage tailObj

typeTailMgrstruct{

tailObjMapmap[string]*TailObj

lock       sync.Mutex

}

// NewTailMgr init TailMgr obj

funcNewTailMgr()*TailMgr{

return&TailMgr{

tailObjMap:make(map[string]*TailObj,16),

}

}

//AddLogFile to Add tail obj

func(t *TailMgr)AddLogFile(conf LogConfig)(err error){

t.lock.Lock()

defert.lock.Unlock()

_, ok := t.tailObjMap[conf.LogPath]

ifok {

err = fmt.Errorf("duplicate filename:%s", conf.LogPath)

return

}

tail, err := tail.TailFile(conf.LogPath, tail.Config{

ReOpen:true,

Follow:true,

Location:  &tail.SeekInfo{Offset:0, Whence:2},// read to tail

MustExist:false,//file does not exist, it does not return an error

Poll:true,

})

iferr !=nil{

fmt.Println("tail file err:", err)

return

}

tailObj := &TailObj{

tail:     tail,

offset:0,

logConf:  conf,

secLimit: NewSecondLimit(int32(conf.SendRate)),

exitChan:make(chanbool,1),

}

t.tailObjMap[conf.LogPath] = tailObj

waitGroup.Add(1)

gotailObj.readLog()

return

}

func(t *TailMgr)reloadConfig(logConfArr []LogConfig)(err error){

for_, conf :=rangelogConfArr {

tailObj, ok := t.tailObjMap[conf.LogPath]

if!ok {

err = t.AddLogFile(conf)

iferr !=nil{

logs.Error("add log file failed:%v", err)

continue

}

continue

}

tailObj.logConf = conf

tailObj.secLimit.limit =int32(conf.SendRate)

t.tailObjMap[conf.LogPath] = tailObj

}

forkey, tailObj :=ranget.tailObjMap {

varfound =false

for_, newValue :=rangelogConfArr {

ifkey == newValue.LogPath {

found =true

break

}

}

iffound ==false{

logs.Warn("log path :%s is remove", key)

tailObj.exitChan <-true

delete(t.tailObjMap, key)

}

}

return

}

// Process hava two func get new log conf and reload conf

func(t *TailMgr)Process(){

forconf :=rangeGetEtcdConfChan() {

logs.Debug("log conf: %v", conf)

varlogConfArr []LogConfig

err := json.Unmarshal([]byte(conf), &logConfArr)

iferr !=nil{

logs.Error("unmarshal failed, err: %v conf :%s", err, conf)

continue

}

err = t.reloadConfig(logConfArr)

iferr !=nil{

logs.Error("reload config from etcd failed: %v", err)

continue

}

logs.Debug("reload config from etcd success")

}

}

func(t *TailObj)readLog(){

forline :=ranget.tail.Lines {

ifline.Err !=nil{

logs.Error("read line error:%v ", line.Err)

continue

}

lineStr := strings.TrimSpace(line.Text)

iflen(lineStr) ==0|| lineStr[0] =='\n'{

continue

}

kafkaSend.addMessage(line.Text, t.logConf.Topic)

t.secLimit.Add(1)

t.secLimit.Wait()

select{

case<-t.exitChan:

logs.Warn("tail obj is exited: config:", t.logConf)

return

default:

}

}

waitGroup.Done()

}

funcrunServer(){

tailMgr = NewTailMgr()

tailMgr.Process()

waitGroup.Wait()

}

此处设计了一个限流功能,逻辑大概如下:设置阈值A,如阈值为1000条,如果这秒钟已经发送1000条,那么这一秒剩下的时间就sleep。limit.go代码如下:

packagemain

import(

"sync/atomic"

"time"

"github.com/astaxie/beego/logs"

)

// SecondLimit to limit num in one second

typeSecondLimitstruct{

unixSecondint64

curCountint32

limitint32

}

// NewSecondLimit to init a SecondLimit obj

funcNewSecondLimit(limitint32)*SecondLimit{

secLimit := &SecondLimit{

unixSecond: time.Now().Unix(),

curCount:0,

limit:      limit,

}

returnsecLimit

}

// Add is func to

func(s *SecondLimit)Add(countint){

sec := time.Now().Unix()

ifsec == s.unixSecond {

atomic.AddInt32(&s.curCount,int32(count))

return

}

atomic.StoreInt64(&s.unixSecond, sec)

atomic.StoreInt32(&s.curCount,int32(count))

}

// Wait to limit num

func(s *SecondLimit)Wait()bool{

for{

sec := time.Now().Unix()

if(sec == atomic.LoadInt64(&s.unixSecond)) && s.curCount >= s.limit {

time.Sleep(time.Millisecond)

logs.Debug("limit is runing, limit: %d s.curCount:%d", s.limit, s.curCount)

continue

}

ifsec != atomic.LoadInt64(&s.unixSecond) {

atomic.StoreInt64(&s.unixSecond, sec)

atomic.StoreInt32(&s.curCount,0)

}

logs.Debug("limit is exited")

returnfalse

}

}

此外,写日志的代码非主要代码,这里就不介绍了。所有代码均上传到github上,如有兴趣可前去clone,地址已经在文章开头处给出。

出处:http://www.cnblogs.com/zingp/p/9365010.html

公众号:javafirst

相关文章

网友评论

    本文标题:海量日志实时收集系统架构设计与go语言实现

    本文链接:https://www.haomeiwen.com/subject/utosgftx.html