安装kafka
-
下载并安装kafka, 地址 http://kafka.apache.org/downloads
-
启动zookeeper
zookeeper-server-start.bat D:\kafka\config\zookeeper.properties
-
启动kafka
kafka-server-start.bat D:\kafka\config\server.properties
-
启动kafka需要配置JDK
目录结构
image.png- main.go
package main
import (
"fmt"
"gopkg.in/ini.v1"
"logAgent/conf"
"logAgent/kafka"
"logAgent/tailLog"
"time"
)
var (
cfg = new(conf.AppConfig)
)
func run(){
// 读取日志 发送到kafka
for {
select {
case line := <-tailLog.ReadChan():
//发送到kafka
kafka.SendToKafka(cfg.Topic, line.Text)
default:
time.Sleep(time.Second)
}
}
}
// logAgent 程序入口
func main(){
//加载配置文件
err := ini.MapTo(cfg, "./conf/config.ini")
if err != nil{
fmt.Println("load config.ini failed, err:", err)
return
}
//1:初始化kafka连接
err = kafka.Init([]string{cfg.Address})
if err != nil{
fmt.Println("Init Kafka failed, err:", err)
return
}
fmt.Println("init kafka success")
//2:打开日志文件准备收集日志
err = tailLog.Init(cfg.FileName)
if err != nil{
fmt.Println("Init Tail failed, err:", err)
return
}
fmt.Println("init tailLog success")
run()
}
- kafka.go
package kafka
import (
"fmt"
"github.com/Shopify/sarama"
)
// 专门向kafka中写日志的模块
var (
client sarama.SyncProducer
)
func Init(addr []string) (err error){
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Partitioner = sarama.NewRandomPartitioner
config.Producer.Return.Successes = true
client,err = sarama.NewSyncProducer(addr, config)
if err!= nil{
fmt.Println("Producer closed ,err :",err)
return
}
return
}
func SendToKafka(topic, data string){
msg := &sarama.ProducerMessage{}
msg.Topic = topic
msg.Value = sarama.StringEncoder(data)
pid, offset, err := client.SendMessage(msg)
if err != nil{
fmt.Println("send msg failed, err:",err)
return
}
fmt.Printf("pid:%v, offset:%v\n", pid, offset)
}
- tailLog.go
package tailLog
import (
"fmt"
"github.com/hpcloud/tail"
)
// 收集日志文件的模块
var (
tailObj *tail.Tail
LogChan chan string
)
func Init(fileName string) (err error){
config := tail.Config{
ReOpen:true,
Follow:true,
Location:&tail.SeekInfo{Offset:0,Whence:2},
MustExist:false,
Poll:true,
}
tailObj,err = tail.TailFile(fileName, config)
if err != nil{
fmt.Println("tail file failed ,err :",err)
return
}
return
}
func ReadChan() <-chan *tail.Line{
return tailObj.Lines
}
- config.go
package conf
type AppConfig struct {
KafkaConf `ini:"kafka"`
TaillogConf `ini:"taillog"`
}
type KafkaConf struct {
Address string `ini:"address"`
Topic string `ini:"topic"`
}
type TaillogConf struct {
FileName string `ini:"path"`
}
- config.ini
[kafka]
address=127.0.0.1:9092
topic=web_log
[taillog]
path=./my.log
网友评论