美文网首页
聊聊storagetapper的pipe

聊聊storagetapper的pipe

作者: go4it | 来源:发表于2021-03-02 23:45 被阅读0次

    本文主要研究一下storagetapper的pipe

    Pipe

    storagetapper/pipe/pipe.go

    type Pipe interface {
        NewConsumer(topic string) (Consumer, error)
        NewProducer(topic string) (Producer, error)
        Type() string
        Config() *config.PipeConfig
        Close() error
    }
    

    Pipe接口定义了NewConsumer、NewProducer、Type、Config、Close方法

    Consumer

    storagetapper/pipe/pipe.go

    type Consumer interface {
        Close() error
        //CloseOnFailure doesn't save offsets
        CloseOnFailure() error
        Message() chan interface{}
        Error() chan error
        FetchNext() (interface{}, error)
        //Allows to explicitly persists current consumer position
        SaveOffset() error
    
        //SetFormat allow to tell consumer the format of the file when there is no
        //header
        SetFormat(format string)
    }
    

    Consumer接口定义了Close、CloseOnFailure、Message、Error、FetchNext、SaveOffset、SetFormat方法

    Producer

    storagetapper/pipe/pipe.go

    type Producer interface {
        Push(data interface{}) error
        PushK(key string, data interface{}) error
        PushSchema(key string, data []byte) error
        //PushBatch queues the messages instead of sending immediately
        PushBatch(key string, data interface{}) error
        //PushCommit writes out all the messages queued by PushBatch
        PushBatchCommit() error
        Close() error
        CloseOnFailure() error
    
        SetFormat(format string)
    
        PartitionKey(source string, key string) string
    }
    

    Producer接口定义了Push、PushK、PushSchema、PushBatch、PushBatchCommit、Close、CloseOnFailure、SetFormat、PartitionKey

    Create

    storagetapper/pipe/pipe.go

    func Create(pipeType string, cfg *config.PipeConfig, db *sql.DB) (Pipe, error) {
    
        init := Pipes[strings.ToLower(pipeType)]
        if init == nil {
            return nil, fmt.Errorf("unsupported pipe: %s", strings.ToLower(pipeType))
        }
    
        pipe, err := init(cfg, db)
        if err != nil {
            return nil, err
        }
    
        return pipe, nil
    }
    
    type constructor func(cfg *config.PipeConfig, db *sql.DB) (Pipe, error)
    
    //Pipes is the list of registered pipes
    //Plugins insert their constructors into this map
    var Pipes map[string]constructor
    
    //registerPlugin should be called from plugin's init
    func registerPlugin(name string, init constructor) {
        if Pipes == nil {
            Pipes = make(map[string]constructor)
        }
        Pipes[name] = init
    }
    

    Create方法根据pipeType、PipeConfig、db来创建pipe

    小结

    storagetapper的Pipe接口定义了NewConsumer、NewProducer、Type、Config、Close方法,其Create方法根据pipeType、PipeConfig、db来创建pipe。

    doc

    相关文章

      网友评论

          本文标题:聊聊storagetapper的pipe

          本文链接:https://www.haomeiwen.com/subject/dryfqltx.html