logAgent

作者: 韩小禹 | 来源:发表于2020-04-21 23:05 被阅读0次

安装kafka

  • 下载并安装kafka, 地址 http://kafka.apache.org/downloads

  • 启动zookeeper
    zookeeper-server-start.bat D:\kafka\config\zookeeper.properties

  • 启动kafka
    kafka-server-start.bat D:\kafka\config\server.properties

  • 启动kafka需要配置JDK

目录结构

image.png
  • main.go
package main
import (
    "fmt"
    "gopkg.in/ini.v1"
    "logAgent/conf"
    "logAgent/kafka"
    "logAgent/tailLog"
    "time"
)
var (
    cfg = new(conf.AppConfig)
)
func run(){
    // 读取日志 发送到kafka
    for {
        select {
            case line := <-tailLog.ReadChan():
                //发送到kafka
                kafka.SendToKafka(cfg.Topic, line.Text)
        default:
            time.Sleep(time.Second)
        }
    }
}
// logAgent 程序入口
func main(){
    //加载配置文件
    err := ini.MapTo(cfg, "./conf/config.ini")
    if err != nil{
        fmt.Println("load config.ini failed, err:", err)
        return
    }
    //1:初始化kafka连接
    err = kafka.Init([]string{cfg.Address})
    if err != nil{
        fmt.Println("Init Kafka failed, err:", err)
        return
    }
    fmt.Println("init kafka success")
    //2:打开日志文件准备收集日志
    err = tailLog.Init(cfg.FileName)
    if err != nil{
        fmt.Println("Init Tail failed, err:", err)
        return
    }
    fmt.Println("init tailLog success")
    run()
}
  • kafka.go
package kafka

import (
    "fmt"
    "github.com/Shopify/sarama"
)

// 专门向kafka中写日志的模块
var (
    client sarama.SyncProducer
)

func Init(addr []string) (err error){
    config := sarama.NewConfig()
    config.Producer.RequiredAcks = sarama.WaitForAll
    config.Producer.Partitioner = sarama.NewRandomPartitioner
    config.Producer.Return.Successes = true
    client,err = sarama.NewSyncProducer(addr, config)
    if err!= nil{
        fmt.Println("Producer closed ,err :",err)
        return
    }
    return
}

func SendToKafka(topic, data string){
    msg := &sarama.ProducerMessage{}
    msg.Topic = topic
    msg.Value = sarama.StringEncoder(data)

    pid, offset, err := client.SendMessage(msg)
    if err != nil{
        fmt.Println("send msg failed, err:",err)
        return
    }
    fmt.Printf("pid:%v, offset:%v\n", pid, offset)
}
  • tailLog.go
package tailLog

import (
    "fmt"
    "github.com/hpcloud/tail"
)

// 收集日志文件的模块

var (
    tailObj *tail.Tail
    LogChan chan string
)

func Init(fileName string) (err error){
    config := tail.Config{
        ReOpen:true,
        Follow:true,
        Location:&tail.SeekInfo{Offset:0,Whence:2},
        MustExist:false,
        Poll:true,
    }

    tailObj,err = tail.TailFile(fileName, config)
    if err != nil{
        fmt.Println("tail file failed ,err :",err)
        return
    }
    return
}

func ReadChan() <-chan *tail.Line{
    return tailObj.Lines
}

  • config.go
package conf

type AppConfig struct {
    KafkaConf `ini:"kafka"`
    TaillogConf `ini:"taillog"`
}

type KafkaConf struct {
    Address string `ini:"address"`
    Topic string `ini:"topic"`
}

type TaillogConf struct {
    FileName string `ini:"path"`
}
  • config.ini
[kafka]
address=127.0.0.1:9092
topic=web_log

[taillog]
path=./my.log

相关文章

网友评论

      本文标题:logAgent

      本文链接:https://www.haomeiwen.com/subject/xxcvihtx.html