美文网首页
开发一个filebeat output websocket插件

开发一个filebeat output websocket插件

作者: ShootHzj | 来源:发表于2021-07-04 09:38 被阅读0次

    开发一个filebeat的websocket插件, 代码仓地址: https://github.com/Shoothzj/beats_output_websocket

    引入对beat的依赖

    go get github.com/elastic/beats/v7
    

    定义在filebeat中的配置文件

    filebeat通常以配置文件的方式加载插件。让我们定义一下必须的配置,就像elasticsearch中的连接地址等等一样。

    output.websocket:
      # worker
      # 用于工作的websocket客户端数量
      workers: 1
      # 日志批量的最大大小
      batch_size: 1
      # 重试的最大次数,0代表不重试
      retry_limit: 1
      # conn
      # ws/wss
      schema: "ws"
      # websocket连接地址
      addr: "localhost:8080"
      # websocket路径
      path: "/echo"
      # websocket心跳间隔,用于保活
      ping_interval: 30
    

    go文件中的配置

    type clientConfig struct {
        // Number of worker goroutines publishing log events
        Workers int `config:"workers" validate:"min=1"`
        // Max number of events in a batch to send to a single client
        BatchSize int `config:"batch_size" validate:"min=1"`
        // Max number of retries for single batch of events
        RetryLimit int `config:"retry_limit"`
        // Schema WebSocket Schema
        Schema string `config:"schema"`
        // Addr WebSocket Addr
        Addr string `config:"addr"`
        // Path WebSocket Path
        Path string `config:"path"`
        // PingInterval WebSocket PingInterval
        PingInterval int `config:"ping_interval"`
    }
    

    初始化加载插件

    加载插件

    在某个init函数中注册插件

    func init() {
        outputs.RegisterType("websocket", newWsOutput)
    }
    

    newWsOutput中卸载配置,并提供配置给WebSocket客户端

    func newWsOutput(_ outputs.IndexManager, _ beat.Info, stats outputs.Observer, cfg *common.Config) (outputs.Group, error) {
        config := clientConfig{}
        // 卸载配置,将配置用于初始化WebSocket客户端
        if err := cfg.Unpack(&config); err != nil {
            return outputs.Fail(err)
        }
        clients := make([]outputs.NetworkClient, config.Workers)
        for i := 0; i < config.Workers; i++ {
            clients[i] = &wsClient{
                stats:  stats,
                Schema: config.Schema,
                Host:   config.Addr,
                Path:   config.Path,
                PingInterval: config.PingInterval,
            }
        }
    
        return outputs.SuccessNet(true, config.BatchSize, config.RetryLimit, clients)
    }
    

    初始化WebSocket客户端

    WebSocket客户端不仅仅是一个WebSocket客户端,而且还需要实现filebeat中的NetworkClient接口,接下来,让我们来关注接口中的每一个方法的作用及实现

    String()接口

    String作为客户端的名字,用来标识日志以及指标。是最简单的一个接口

    func (w *wsClient) String() string {
        return "websocket"
    }
    

    Connect()接口

    Connect用来初始化客户端

    func (w *wsClient) Connect() error {
        u := url.URL{Scheme: w.Schema, Host: w.Host, Path: w.Path}
        dial, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
        if err == nil {
            w.conn = dial
            ticker := time.NewTicker(time.Duration(w.PingInterval) * time.Second)
            go func() {
                for range ticker.C {
                    w.conn.WriteMessage(websocket.PingMessage, nil)
                }
            }()
        } else {
            time.Sleep(10 * time.Second)
        }
        return err
    }
    

    注意,这里初始化失败,需要Sleep一段时间,否则,filebeat会一直重试。这绝非是你想要的。或许对于场景来说,退避重试可能会更好

    Close()接口

    关闭客户端,也是很简单的接口

    func (w *wsClient) Close() error {
        return w.conn.Close()
    }
    

    Publish()接口

    func (w *wsClient) Publish(_ context.Context, batch publisher.Batch) error {
        events := batch.Events()
        // 记录这批日志
        w.stats.NewBatch(len(events))
        failEvents, err := w.PublishEvents(events)
        if err != nil {
            // 如果发送正常,则ACK
            batch.ACK()
        } else {
            // 发送失败,则重试。受RetryLimit的限制
            batch.RetryEvents(failEvents)
        }
        return err
    }
    
    func (w *wsClient) PublishEvents(events []publisher.Event) ([]publisher.Event, error) {
        for i, event := range events {
            err := w.publishEvent(&event)
            if err != nil {
                // 如果单条消息发送失败,则将剩余的消息直接重试
                return events[i:], err
            }
        }
        return nil, nil
    }
    
    func (w *wsClient) publishEvent(event *publisher.Event) error {
        bytes, err := encode(&event.Content)
        if err != nil {
            // 如果编码失败,就不重试了,重试也不会成功
            // encode error, don't retry.
            // consider being success
            return nil
        }
        err = w.conn.WriteMessage(websocket.TextMessage, bytes)
        if err != nil {
            // 写入WebSocket Server失败
            return err
        }
        return nil
    }
    

    编码

    编码的逻辑因人而异,事实上,这可能是大家最大的差异所在。这里只是做一个简单地例子

    type LogOutput struct {
        Timestamp time.Time `json:"timestamp"`
        Message   string    `json:"message"`
    }
    
    func encode(event *beat.Event) ([]byte, error) {
        logOutput := &LogOutput{}
        value, err := event.Fields.GetValue("message")
        if err != nil {
            return nil, err
        }
        logOutput.Timestamp = event.Timestamp
        logOutput.Message = value.(string)
        return json.Marshal(logOutput)
    }
    

    最后是我们的wsclient

    type wsClient struct {
        // construct field
        Schema       string
        Host         string
        Path         string
        PingInterval int
    
        stats outputs.Observer
        conn  *websocket.Conn
    }
    

    添加额外的功能:大包丢弃

    你可能会想保护你的WebSocket服务器,避免接收到超级大的日志。我们可以在配置项中添加一个配置

    maxLen用来限制日志长度,超过maxLen的日志直接丢弃。为什么不使用filebeat中的max_bytes

    因为filebeatmax_bytes的默认行为是截断,截断的日志在某些场景下不如丢弃。(比如,日志是json格式,截断后格式无法解析)

    配置中添加maxLen

      max_len: 1024
    

    省略掉那些重复的添加结构体,读取max_len在encode的时候忽略掉

        s := value.(string)
        if len(s) >= w.MaxLen {
            return nil, err
        }
    

    参考文献

    相关文章

      网友评论

          本文标题:开发一个filebeat output websocket插件

          本文链接:https://www.haomeiwen.com/subject/zesmultx.html