golang中使用kafka

作者: yandaren | 来源:发表于2018-09-29 18:18 被阅读0次

    golang中比较好用的kafka client有

    其中 sarama的使用者应该是最多的, 然后还有一个sarama的cluster版本sarama-cluster

    本文简单描述下sarama的一些简单使用

    生产者接口

    func producer_test() {
        fmt.Printf("producer_test\n")
        config := sarama.NewConfig()
        config.Producer.RequiredAcks = sarama.WaitForAll
        config.Producer.Partitioner = sarama.NewRandomPartitioner
        config.Producer.Return.Successes = true
        config.Producer.Return.Errors = true
        config.Version = sarama.V0_11_0_2
    
        producer, err := sarama.NewAsyncProducer([]string{"localhost:9092"}, config)
        if err != nil {
            fmt.Printf("producer_test create producer error :%s\n", err.Error())
            return
        }
    
        defer producer.AsyncClose()
    
        // send message
        msg := &sarama.ProducerMessage{
            Topic: "kafka_go_test",
            Key:   sarama.StringEncoder("go_test"),
        }
    
        value := "this is message"
        for {
            fmt.Scanln(&value)
            msg.Value = sarama.ByteEncoder(value)
            fmt.Printf("input [%s]\n", value)
    
            // send to chain
            producer.Input() <- msg
    
            select {
            case suc := <-producer.Successes():
                fmt.Printf("offset: %d,  timestamp: %s", suc.Offset, suc.Timestamp.String())
            case fail := <-producer.Errors():
                fmt.Printf("err: %s\n", fail.Err.Error())
            }
        }
    }
    

    消费者接口

    func consumer_test() {
        fmt.Printf("consumer_test")
    
        config := sarama.NewConfig()
        config.Consumer.Return.Errors = true
        config.Version = sarama.V0_11_0_2
    
        // consumer
        consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config)
        if err != nil {
            fmt.Printf("consumer_test create consumer error %s\n", err.Error())
            return
        }
    
        defer consumer.Close()
    
        partition_consumer, err := consumer.ConsumePartition("kafka_go_test", 0, sarama.OffsetOldest)
        if err != nil {
            fmt.Printf("try create partition_consumer error %s\n", err.Error())
            return
        }
        defer partition_consumer.Close()
    
        for {
            select {
            case msg := <-partition_consumer.Messages():
                fmt.Printf("msg offset: %d, partition: %d, timestamp: %s, value: %s\n",
                    msg.Offset, msg.Partition, msg.Timestamp.String(), string(msg.Value))
            case err := <-partition_consumer.Errors():
                fmt.Printf("err :%s\n", err.Error())
            }
        }
    
    }
    

    元数据接口

    func metadata_test() {
        fmt.Printf("metadata test\n")
    
        config := sarama.NewConfig()
        config.Version = sarama.V0_11_0_2
    
        client, err := sarama.NewClient([]string{"localhost:9092"}, config)
        if err != nil {
            fmt.Printf("metadata_test try create client err :%s\n", err.Error())
            return
        }
    
        defer client.Close()
    
        // get topic set
        topics, err := client.Topics()
        if err != nil {
            fmt.Printf("try get topics err %s\n", err.Error())
            return
        }
    
        fmt.Printf("topics(%d):\n", len(topics))
    
        for _, topic := range topics {
            fmt.Println(topic)
        }
    
        // get broker set
        brokers := client.Brokers()
        fmt.Printf("broker set(%d):\n", len(brokers))
        for _, broker := range brokers {
            fmt.Printf("%s\n", broker.Addr())
        }
    }
    

    相关文章

      网友评论

        本文标题:golang中使用kafka

        本文链接:https://www.haomeiwen.com/subject/peuboftx.html