美文网首页
golang kafka小试消息队列

golang kafka小试消息队列

作者: Jancd | 来源:发表于2018-06-23 19:41 被阅读670次

    Kafka 安装配置、更多资料请参考其官网。

    启动 kafka server

    在这之前需要启动 zookeeper 做服务治理(单机)。

    $ bin/zkServer.sh status conf/zoo_sample.cfg
    

    如提示权限限制加上 sudo

    启动 kafka server

    $ bin/kafka-server-start.sh config/server.properties
    

    启动消息队列(本部分仅为测试 server)

    新建 Topic

    $ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
    
    $ bin/kafka-topics.sh --list --zookeeper localhost:2181 (test)
    

    1. 启动 Producer

    $ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
    

    2. 启动 Consumer

    bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
    

    此时在 Producer 端发送消息,在 Consumer 就会显示,如下图所示。

    (上图中 Consumer 多出了好几个消息是我截图之前测试发出的)


    Action

    本文使用 sarama 库作为 kafka 的 go API。sarama 库没有给出很具体的文档,可以参考其源码。

    Producer

    package main
    
    import (
        "fmt"
        "github.com/Shopify/sarama"
    )
    
    func main() {
        config := sarama.NewConfig()
        config.Producer.RequiredAcks = sarama.WaitForAll
        config.Producer.Partitioner = sarama.NewRandomPartitioner
        config.Producer.Return.Successes = true
    
        addr := []string{"localhost:9092"}
    
        producer, err := sarama.NewSyncProducer(addr, config)
        if err != nil {
            panic(err)
        }
    
        defer producer.Close()
    
        msg := &sarama.ProducerMessage{
            Topic:     "hello",
            Partition: int32(-1),
            Key:       sarama.StringEncoder("key"),
        }
    
        var value string
        for {
            _, err := fmt.Scanf("%s", &value)
            if err != nil {
                break
            }
            msg.Value = sarama.ByteEncoder(value)
            fmt.Println(value)
    
            partition, offset, err := producer.SendMessage(msg)
            if err != nil {
                fmt.Println("Send message Fail")
            }
            fmt.Printf("Partition = %d, offset=%d\n", partition, offset)
        }
    }
    
    

    Consumer

    package main
    
    import (
        "fmt"
        "sync"
        "github.com/Shopify/sarama"
    )
    
    var (
        wg  sync.WaitGroup
    )
    
    func main() {
        consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
        if err != nil {
            panic(err)
        }
    
        partitionList, err := consumer.Partitions("hello")
        if err != nil {
            panic(err)
        }
    
        for partition := range partitionList {
            pc, err := consumer.ConsumePartition("hello", int32(partition), sarama.OffsetNewest)
            if err != nil {
                panic(err)
            }
    
            defer pc.AsyncClose()
    
            wg.Add(1)
    
            go func(sarama.PartitionConsumer) {
                defer wg.Done()
                for msg := range pc.Messages() {
                    fmt.Printf("Partition:%d, Offset:%d, Key:%s, Value:%s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
                }
    
            }(pc)
        }
        wg.Wait()
        consumer.Close()
    }
    

    结果如下:

    相关文章

      网友评论

          本文标题:golang kafka小试消息队列

          本文链接:https://www.haomeiwen.com/subject/pmddyftx.html