美文网首页
go项目Kafka示例

go项目Kafka示例

作者: yichen_china | 来源:发表于2023-08-19 15:34 被阅读0次

    消费者

    KafkaConsumer.go

    package cws
    import (
        "github.com/IBM/sarama"
        "log"
        "os"
        "os/signal"
    )
    
    func KafkaConsumer() {
        config := sarama.NewConfig()
        config.Consumer.Return.Errors = true
        client, err := sarama.NewClient([]string{"localhost:9192", "localhost:9292", "localhost:9392"}, config)
        defer client.Close()
        if err != nil {
            panic(err)
        }
        consumer, err := sarama.NewConsumerFromClient(client)
    
        defer consumer.Close()
        if err != nil {
            panic(err)
        }
        // get partitionId list
        partitions, err := consumer.Partitions("my_topic")
        if err != nil {
            panic(err)
        }
    
        for _, partitionId := range partitions {
            // create partitionConsumer for every partitionId
            partitionConsumer, err := consumer.ConsumePartition("my_topic", partitionId, sarama.OffsetNewest)
            if err != nil {
                panic(err)
            }
    
            go func(pc *sarama.PartitionConsumer) {
                defer (*pc).Close()
                // block
                for message := range (*pc).Messages() {
                    value := string(message.Value)
                    log.Printf("Partitionid: %d; offset:%d, value: %s\n", message.Partition, message.Offset, value)
                }
    
            }(&partitionConsumer)
        }
        signals := make(chan os.Signal, 1)
        signal.Notify(signals, os.Interrupt)
        select {
        case <-signals:
    
        }
    }
    

    消费组

    KafkaConsumerGroup.go

    package cws
    
    import (
        "context"
        "fmt"
        "github.com/IBM/sarama"
        "os"
        "os/signal"
        "sync"
    )
    
    type consumerGroupHandler struct {
        name string
    }
    
    func (consumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error   { return nil }
    func (consumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
    func (h consumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
        for msg := range claim.Messages() {
            fmt.Printf("%s Message topic:%q partition:%d offset:%d  value:%s\n", h.name, msg.Topic, msg.Partition, msg.Offset, string(msg.Value))
            // 手动确认消息
            sess.MarkMessage(msg, "")
    //手动的话需要提交下。
            sess.Commit()
        }
        return nil
    }
    
    func handleErrors(group *sarama.ConsumerGroup, wg *sync.WaitGroup) {
        wg.Done()
        for err := range (*group).Errors() {
            fmt.Println("ERROR", err)
        }
    }
    
    func consume(group *sarama.ConsumerGroup, wg *sync.WaitGroup, name string) {
        fmt.Println(name + "start")
        wg.Done()
        ctx := context.Background()
        for {
            topics := []string{"my_topic"}
            handler := consumerGroupHandler{name: name}
            err := (*group).Consume(ctx, topics, handler)
            if err != nil {
                panic(err)
            }
        }
    }
    
    func KafkaConsumerGroup() {
        var wg sync.WaitGroup
        config := sarama.NewConfig()
        config.Consumer.Return.Errors = false
        config.Version = sarama.V0_10_2_0
        client, err := sarama.NewClient([]string{"localhost:9192", "localhost:9292", "localhost:9392"}, config)
        defer client.Close()
        if err != nil {
            panic(err)
        }
        group1, err := sarama.NewConsumerGroupFromClient("c1", client)
        if err != nil {
            panic(err)
        }
        group2, err := sarama.NewConsumerGroupFromClient("c2", client)
        if err != nil {
            panic(err)
        }
        group3, err := sarama.NewConsumerGroupFromClient("c3", client)
        if err != nil {
            panic(err)
        }
        defer group1.Close()
        defer group2.Close()
        defer group3.Close()
        wg.Add(3)
        go consume(&group1, &wg, "c1")
        go consume(&group2, &wg, "c2")
        go consume(&group3, &wg, "c3")
        wg.Wait()
        signals := make(chan os.Signal, 1)
        signal.Notify(signals, os.Interrupt)
        select {
        case <-signals:
        }
    }
    

    生产者

    KafkaProducer.go

    package cws
    
    import (
        "github.com/IBM/sarama"
        "log"
        "os"
        "os/signal"
        "sync"
    )
    
    func main() {
        config := sarama.NewConfig()
    
        config.Producer.Return.Successes = true
        config.Producer.Partitioner = sarama.NewRandomPartitioner
    
        client, err := sarama.NewClient([]string{"192.168.0.104:9192", "localhost:9292", "localhost:9392"}, config)
        defer client.Close()
        if err != nil {
            panic(err)
        }
        producer, err := sarama.NewAsyncProducerFromClient(client)
        if err != nil {
            panic(err)
        }
    
        // Trap SIGINT to trigger a graceful shutdown.
        signals := make(chan os.Signal, 1)
        signal.Notify(signals, os.Interrupt)
    
        var (
            wg                          sync.WaitGroup
            enqueued, successes, errors int
        )
    
        wg.Add(1)
        // start a groutines to count successes num
        go func() {
            defer wg.Done()
            for range producer.Successes() {
                successes++
            }
        }()
    
        wg.Add(1)
        // start a groutines to count error num
        go func() {
            defer wg.Done()
            for err := range producer.Errors() {
                log.Println(err)
                errors++
            }
        }()
    
    ProducerLoop:
        for {
            message := &sarama.ProducerMessage{Topic: "my_topic", Value: sarama.StringEncoder("testing 123")}
            select {
            case producer.Input() <- message:
                enqueued++
    
            case <-signals:
                producer.AsyncClose() // Trigger a shutdown of the producer.
                break ProducerLoop
            }
        }
    
        wg.Wait()
    
        log.Printf("Successfully produced: %d; errors: %d\n", successes, errors)
    }
    
    

    相关文章

      网友评论

          本文标题:go项目Kafka示例

          本文链接:https://www.haomeiwen.com/subject/uwejmdtx.html