美文网首页
聊聊golang的zap的ZapKafkaWriter

聊聊golang的zap的ZapKafkaWriter

作者: go4it | 来源:发表于2020-12-10 23:46 被阅读0次

    本文主要研究一下golang的zap的ZapKafkaWriter

    ZapKafkaWriter

    package logger
    
    import (
        "errors"
        "sync"
        "sync/atomic"
        "syscall"
    )
    
    // ZapKafkaWriter is a zap WriteSyncer (io.Writer) that writes messages to Kafka
    type ZapKafkaWriter struct {
        kp        *KafkaProducer
        ce        *CloudEvents
        closed    int32          // Nonzero if closing, must access atomically
        pendingWg sync.WaitGroup // WaitGroup for pending messages
        closeMut  sync.Mutex
    }
    
    // newZapKafkaWriter returns a kafka io.writer instance
    func newZapKafkaWriter(
        kpCfg ProducerConfiguration, cloudEvents *CloudEvents,
        ceCfg CloudEventsConfiguration) (*ZapKafkaWriter, error) {
    
        // create an async producer
        kp, err := newKafkaProducer(kpCfg, cloudEvents, ceCfg)
        if err != nil {
            return nil, err
        }
    
        zw := &ZapKafkaWriter{
            kp: kp,
            ce: cloudEvents,
        }
        return zw, nil
    }
    

    ZapKafkaWriter定义了KafkaProducer、CloudEvents、closed、pendingWg、closeMut属性,其newZapKafkaWriter方法根据ProducerConfiguration、cloudEvents、CloudEventsConfiguration来创建KafkaProducer,然后根据KafkaProducer来创建ZapKafkaWriter

    zapcore.WriteSyncer

    // Sync satisfies zapcore.WriteSyncer interface, zapcore.AddSync works as well
    func (zw *ZapKafkaWriter) Sync() error {
        return nil
    }
    
    // Write sends byte slices to Kafka ignoring error responses (Thread-safe)
    // Write might block if the Input() channel of the AsyncProducer is full
    func (zw *ZapKafkaWriter) Write(msg []byte) (int, error) {
        if zw.Closed() {
            return 0, syscall.EINVAL
        }
    
        if zw.kp.producer == nil {
            return 0, errors.New("No producer defined")
        }
    
        zw.pendingWg.Add(1)
        defer zw.pendingWg.Done()
    
        err := zw.kp.sendMessage(msg)
        return len(msg), err
    }
    
    // Closed returns true if the writer is closed, false otherwise (Thread-safe)
    func (zw *ZapKafkaWriter) Closed() bool {
        return atomic.LoadInt32(&zw.closed) != 0
    }
    
    // Close must be called when the writer is no longer needed (Thread-safe)
    func (zw *ZapKafkaWriter) Close() (err error) {
        zw.closeMut.Lock()
        defer zw.closeMut.Unlock()
    
        if zw.Closed() {
            return syscall.EINVAL
        }
    
        atomic.StoreInt32(&zw.closed, 1)
    
        zw.pendingWg.Wait()
        return nil
    }
    

    ZapKafkaWriter实现了zapcore.WriteSyncer接口,其Write方法使用KafkaProducer发送消息,其Sync方法目前不做任何操作,它还提供了Close方法,也就是也实现了Sink接口

    小结

    WriteSyncer内嵌了io.Writer接口,定义了Sync方法;Sink接口内嵌了zapcore.WriteSyncer及io.Closer接口;ZapKafkaWriter实现Sink接口及zapcore.WriteSyncer接口,其Write方法直接将data通过kafka发送出去。

    doc

    相关文章

      网友评论

          本文标题:聊聊golang的zap的ZapKafkaWriter

          本文链接:https://www.haomeiwen.com/subject/znedgktx.html