logAgent

作者: 韩小禹 | 来源:发表于2020-04-21 23:05 被阅读0次

    安装kafka

    • 下载并安装kafka, 地址 http://kafka.apache.org/downloads

    • 启动zookeeper
      zookeeper-server-start.bat D:\kafka\config\zookeeper.properties

    • 启动kafka
      kafka-server-start.bat D:\kafka\config\server.properties

    • 启动kafka需要配置JDK

    目录结构

    image.png
    • main.go
    package main
    import (
        "fmt"
        "gopkg.in/ini.v1"
        "logAgent/conf"
        "logAgent/kafka"
        "logAgent/tailLog"
        "time"
    )
    var (
        cfg = new(conf.AppConfig)
    )
    func run(){
        // 读取日志 发送到kafka
        for {
            select {
                case line := <-tailLog.ReadChan():
                    //发送到kafka
                    kafka.SendToKafka(cfg.Topic, line.Text)
            default:
                time.Sleep(time.Second)
            }
        }
    }
    // logAgent 程序入口
    func main(){
        //加载配置文件
        err := ini.MapTo(cfg, "./conf/config.ini")
        if err != nil{
            fmt.Println("load config.ini failed, err:", err)
            return
        }
        //1:初始化kafka连接
        err = kafka.Init([]string{cfg.Address})
        if err != nil{
            fmt.Println("Init Kafka failed, err:", err)
            return
        }
        fmt.Println("init kafka success")
        //2:打开日志文件准备收集日志
        err = tailLog.Init(cfg.FileName)
        if err != nil{
            fmt.Println("Init Tail failed, err:", err)
            return
        }
        fmt.Println("init tailLog success")
        run()
    }
    
    • kafka.go
    package kafka
    
    import (
        "fmt"
        "github.com/Shopify/sarama"
    )
    
    // 专门向kafka中写日志的模块
    var (
        client sarama.SyncProducer
    )
    
    func Init(addr []string) (err error){
        config := sarama.NewConfig()
        config.Producer.RequiredAcks = sarama.WaitForAll
        config.Producer.Partitioner = sarama.NewRandomPartitioner
        config.Producer.Return.Successes = true
        client,err = sarama.NewSyncProducer(addr, config)
        if err!= nil{
            fmt.Println("Producer closed ,err :",err)
            return
        }
        return
    }
    
    func SendToKafka(topic, data string){
        msg := &sarama.ProducerMessage{}
        msg.Topic = topic
        msg.Value = sarama.StringEncoder(data)
    
        pid, offset, err := client.SendMessage(msg)
        if err != nil{
            fmt.Println("send msg failed, err:",err)
            return
        }
        fmt.Printf("pid:%v, offset:%v\n", pid, offset)
    }
    
    • tailLog.go
    package tailLog
    
    import (
        "fmt"
        "github.com/hpcloud/tail"
    )
    
    // 收集日志文件的模块
    
    var (
        tailObj *tail.Tail
        LogChan chan string
    )
    
    func Init(fileName string) (err error){
        config := tail.Config{
            ReOpen:true,
            Follow:true,
            Location:&tail.SeekInfo{Offset:0,Whence:2},
            MustExist:false,
            Poll:true,
        }
    
        tailObj,err = tail.TailFile(fileName, config)
        if err != nil{
            fmt.Println("tail file failed ,err :",err)
            return
        }
        return
    }
    
    func ReadChan() <-chan *tail.Line{
        return tailObj.Lines
    }
    
    
    • config.go
    package conf
    
    type AppConfig struct {
        KafkaConf `ini:"kafka"`
        TaillogConf `ini:"taillog"`
    }
    
    type KafkaConf struct {
        Address string `ini:"address"`
        Topic string `ini:"topic"`
    }
    
    type TaillogConf struct {
        FileName string `ini:"path"`
    }
    
    • config.ini
    [kafka]
    address=127.0.0.1:9092
    topic=web_log
    
    [taillog]
    path=./my.log
    

    相关文章

      网友评论

          本文标题:logAgent

          本文链接:https://www.haomeiwen.com/subject/xxcvihtx.html