美文网首页
导入和导出kafka based channel的数据

导入和导出kafka based channel的数据

作者: CodingCode | 来源:发表于2020-06-01 08:52 被阅读0次

    golang实现从kafka导入导出一个channel的内容:

    1. 导出一个channel
    package main
    
    import (
        "flag"
        "fmt"
        "log"
        "time"
        "os"
        "strings"
        "encoding/binary"
    
        "github.com/Shopify/sarama"
        "github.com/golang/protobuf/proto"
     ab "github.com/hyperledger/fabric/protos/orderer"
    )
    
    var (
        brokers     string
        topic       string
        partition   int
    )
    
    func main() {
        flag.StringVar(&brokers,    "brokers",      "localhost:9093",   "Kafka brokers")
        flag.StringVar(&topic,      "topic",        "topic",            "Kafka topic")
        flag.IntVar(&partition,     "partition",    0,                  "Kafka topic partition")
        flag.Parse()
    
        config := sarama.NewConfig()
        client, err := sarama.NewClient(strings.Split(brokers, ","), config)
        if err != nil {
            log.Fatalf("Unable to create kafka client, error: %v\n", err)
        }
    
        err = exportTopic(client, topic, partition)
        if err != nil {
            log.Fatalf("Unabled to export topic, error: %v\n", err)
        }
    }
    
    func exportTopic(client sarama.Client, topic string, partition int) error {
        consumer, err := sarama.NewConsumerFromClient(client)
        if err != nil {
            return err
        }
        defer consumer.Close()
    
      //partitionConsumer, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest)
        partitionConsumer, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetOldest)
        if err != nil {
            return err
        }
        defer partitionConsumer.Close()
    
        file, err := os.OpenFile(fmt.Sprintf("%s.dat", topic), os.O_CREATE|os.O_WRONLY, 0644)
        if err != nil {
            return err
        }
        defer file.Close()
    
    
        var countConnect   int64 = 0
        var countTimeToCut int64 = 0
        var countRegular   int64 = 0
        var lastOffset     int64 = 0
        msg := new(ab.KafkaMessage)
        for {
            select {
            case err = <- partitionConsumer.Errors():
                return err
            case in, ok := <- partitionConsumer.Messages():
                if !ok {
                    return fmt.Errorf("kafka consumer closed")
                }
                if err := proto.Unmarshal(in.Value, msg); err != nil {
                    return err
                }
    
                // export mssage
                lastOffset = in.Offset
                if err := exportMessage(file, in.Key, in.Value); err != nil {
                    return err
                }
    
                switch msg.Type.(type) {
                case *ab.KafkaMessage_Connect:
                    countConnect ++
                case *ab.KafkaMessage_TimeToCut:
                    countTimeToCut ++
                case *ab.KafkaMessage_Regular:
                    countRegular ++
                default:
                    return fmt.Errorf("unknown kafka message")
                }
            case <- time.After(5 * time.Second):
                fmt.Printf("export summary total: %d (Connect=%d, TimeToCut=%d, Regular=%d)\n", lastOffset+1, countConnect, countTimeToCut, countRegular)
                return nil
            }
        }
        return nil
    }
    
    func exportMessage(file *os.File, key []byte, value []byte) error {
        if err := exportField(file, key); err != nil {
            return err
        }
    
        if err := exportField(file, value); err != nil {
            return err
        }
    
        return nil
    }
    
    func exportField(file *os.File, data []byte) error {
        l := len(data)
        if err := binary.Write(file, binary.LittleEndian, int32(l)); err != nil {
            return err
        }
    
        if l > 0 {
            if n, err := file.Write(data); err != nil {
                return err
            } else if n != l {
                return fmt.Errorf("incorrect bytes written expect %d, but %d", l, n)
            }
        }
        return nil
    }
    
    1. 导入一个channel
    package main
    
    import (
        "flag"
        "io"
        "fmt"
        "log"
        "os"
        "strings"
        "encoding/binary"
    
        "github.com/Shopify/sarama"
        "github.com/golang/protobuf/proto"
     ab "github.com/hyperledger/fabric/protos/orderer"
    )
    
    var (
        brokers     string
        topic       string
    )
    
    func main() {
        flag.StringVar(&brokers,    "brokers",      "localhost:9093",   "Kafka brokers")
        flag.StringVar(&topic,      "topic",        "topic",            "Kafka topic")
        flag.Parse()
    
        config := sarama.NewConfig()
        client, err := sarama.NewClient(strings.Split(brokers, ","), config)
        if err != nil {
            log.Fatalf("Unable to create kafka client, error: %v\n", err)
        }
    
        err = importTopic(client, topic)
        if err != nil {
            log.Fatalf("Unabled to export topic, error: %v\n", err)
        }
    }
    
    func importTopic(client sarama.Client, topic string) error {
        producer, err := sarama.NewAsyncProducerFromClient(client)
        if err != nil {
            return err
        }
        defer producer.Close()
    
        file, err := os.OpenFile(fmt.Sprintf("%s.dat", topic), os.O_RDONLY, 0644)
        if err != nil {
            return err
        }
        defer file.Close()
    
        var countConnect   int64 = 0
        var countTimeToCut int64 = 0
        var countRegular   int64 = 0
        msg := new(ab.KafkaMessage)
        for {
            key, value, err := importMessage(file)
            if err == io.EOF {
                fmt.Printf("import summary total: %d (Connect=%d, TimeToCut=%d, Regular=%d)\n", (countConnect + countTimeToCut + countRegular), countConnect, countTimeToCut, countRegular)
                return nil
            } else if err != nil {
                return err
            }
    
            if err := proto.Unmarshal(value, msg); err != nil {
                return err
            }
    
            switch msg.Type.(type) {
            case *ab.KafkaMessage_Connect:
                countConnect ++
            case *ab.KafkaMessage_TimeToCut:
                countTimeToCut ++
            case *ab.KafkaMessage_Regular:
                countRegular ++
            default:
                return fmt.Errorf("unknown kakfa message")
            }
    
            producer.Input() <- &sarama.ProducerMessage{Topic: topic, Key: sarama.ByteEncoder(key), Value: sarama.ByteEncoder(value)}
        }
    }
    
    func importMessage(file *os.File) ([]byte, []byte, error) {
        key, err := importField(file)
        if err != nil {
            return nil, nil, err
        }
    
        value, err := importField(file)
        if err == io.EOF {
            return nil, nil, fmt.Errorf("invalid EOF meet")
        } else if err != nil {
            return nil, nil, err
        }
    
        return key, value, nil
    }
    
    
    func importField(file *os.File) ([]byte, error) {
        var l int32
        err := binary.Read(file, binary.LittleEndian, &l)
        if err != nil {
            return nil, err
        }
    
        if l == 0 {
            return nil, nil
        }
    
        data := make([]byte, l)
        if n, err := file.Read(data); err != nil {
            return nil, err
        } else if int32(n) != l {
            return nil, fmt.Errorf("incorrect bytes read expect %d, but %d", l, n)
        }
    
        return data, nil
    }
    

    相关文章

      网友评论

          本文标题:导入和导出kafka based channel的数据

          本文链接:https://www.haomeiwen.com/subject/vbqjzhtx.html