美文网首页
从farbic ledger block中提取对应kafka的o

从farbic ledger block中提取对应kafka的o

作者: CodingCode | 来源:发表于2020-05-05 10:48 被阅读0次

    从farbic ledger block中提取对应kafka的offset

    如果kafka模式下面的没有block都包含对应kafka topic的offset,下面的代码从一个block里面提取kafka的offset值。

    输入:blockfile
    输出:kafkaoffset

    package main
    
    import (
        "os"
        "fmt"
        "log"
        "io/ioutil"
        "github.com/Shopify/sarama"
        "github.com/golang/protobuf/proto"
    
     cb "github.com/hyperledger/fabric/protos/common"
     ab "github.com/hyperledger/fabric/protos/orderer"
    )
    
    // Get kafka offset of a given block file
    // input: path to blockfile
    // output: kafka offset
    func main() {
        if len(os.Args) < 2 {
            log.Printf("ERROR: invalid parameter, usage: %s path/to/blockfile\n", os.Args[0])
            os.Exit(1)
        }
    
        blockBytes, err := ioutil.ReadFile(os.Args[1])
        if err != nil {
            log.Printf("ERROR: cannot read block file, error: %v\n", err)
            os.Exit(1)
        }
    
        block := &cb.Block{}
        if err = proto.Unmarshal(blockBytes, block); err != nil {
            log.Printf("ERROR: invalid block file content, unmarshal error: %v\n", err)
            os.Exit(1)
        }
        metadataValue := block.Metadata.Metadata[cb.BlockMetadataIndex_ORDERER]
    
        md := &cb.Metadata{}
        err = proto.Unmarshal(metadataValue, md)
        if err != nil {
            log.Printf("ERROR: invalid metadata value, unmarshal error: %v\n", err)
            os.Exit(1)
        }
    
        if lastoffset, _, _, err := getOffsets(md.Value); err != nil {
            os.Exit(1)
        } else {
            fmt.Printf("%d\n", lastoffset)
        }
    }
    
    func getOffsets(metadataValue []byte) (persisted int64, processed int64, resubmitted int64, err error) {
        if metadataValue != nil {
            kafkaMetadata := &ab.KafkaMetadata{}
            if err := proto.Unmarshal(metadataValue, kafkaMetadata); err != nil {
                log.Printf("ERROR: invalid kafka metavalue, unmarshal error: %v\n", err)
                return 0, 0, 0, err
            }
            return kafkaMetadata.LastOffsetPersisted,
                kafkaMetadata.LastOriginalOffsetProcessed,
                kafkaMetadata.LastResubmittedConfigOffset,
                nil
        }
        return sarama.OffsetOldest - 1, int64(0), int64(0), nil // default
    }
    

    用法:

    $ ./main path/to/config.pb 
    12203
    

    blockfile的获取可以使用peer fetch:

    $ peer channel fetch newest path/to/config.pb -o ${ORDERERADDR} -c ${CHANNEL}
    

    然后我们用kafka命令行去查看topic的值:

    $ /opt/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list <KAFKAHOST>:<PORT> --topic <TOPIC>
    <TOPIC>:0:12203
    

    需要注意的是,用kafka命令行去读取的时候返回值有时会比newest块里面读出的值大于几个3,例如上述例子可能会读出:12206, 12209, 等等,12203 + 3 * N,取决于延迟了多久。
    因为这有一个延迟,orderer不停的往kafka里面写数据,数据大小是3;等我有空的时候再去研究一下这个大小是3的数据都是些什么内容。

    再补充一点前面文件怎么编译:

    1. fabric-go-sdk是一个很不成熟的产品,几乎不能用。
    2. 所以我们build都是直接用fabric本身的原代码。

    假设fabric源代码已经下载放到:${GOPATH}/src/github.com/hyperledger/fabric目录下面。

    $ cd ${GOPATH}/src/github.com/hyperledger/fabric
    $ vim main.go
    $ GOPATH=${GOPATH} go build main.go 
    $ ./main.go path/to/blockfile
    

    相关文章

      网友评论

          本文标题:从farbic ledger block中提取对应kafka的o

          本文链接:https://www.haomeiwen.com/subject/fxopghtx.html