美文网首页
go 使用kafka

go 使用kafka

作者: realPeanut | 来源:发表于2020-10-20 22:28 被阅读0次
go get github.com/bsm/sarama-cluster
  • consumer
package main

import (
    "golangWeb/pkg/logging"
    "log"
    "os"
    "os/signal"
    _ "regexp"

    cluster "github.com/bsm/sarama-cluster"
)

/**
    消费者
*/
func main() {

    // init (custom) config, enable errors and notifications
    config := cluster.NewConfig()
    config.Consumer.Return.Errors = true
    config.Group.Return.Notifications = true

    // init consumer
    brokers := []string{"127.0.0.1:9092"}
    //可以订阅多个主题
    topics := []string{"topic_0", "topic_1", "topic_2", "topic_3", "topic_4"}
    consumer, err := cluster.NewConsumer(brokers, "my-consumer-group", topics, config)
    if err != nil {
        panic(err)
    }
    //这里需要注意的是defer 一定要在panic 之后才能关闭连接
    defer consumer.Close()

    // trap SIGINT to trigger a shutdown.
    signals := make(chan os.Signal, 1)
    signal.Notify(signals, os.Interrupt)

    // consume errors
    go func() {
        for err := range consumer.Errors() {
            log.Printf("Error: %s\n", err.Error())
        }
    }()

    // consume notifications
    go func() {
        for ntf := range consumer.Notifications() {
            log.Printf("Rebalanced: %+v\n", ntf)
        }
    }()

    // 循环从通道中获取message
    //msg.Topic 消息主题
    //msg.Partition  消息分区
    //msg.Offset  
    //msg.Key
    //msg.Value 消息值
    for {
        select {
        case msg, ok := <-consumer.Messages():
            if ok {
                fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
                consumer.MarkOffset(msg, "") // 上报offset
            }
        case <-signals:
            return
        }
    }
}

  • 消费者通道中的消息结构可以参考结构体原型
type ConsumerMessage struct {
    Headers        []*RecordHeader // only set if kafka is version 0.11+
    Timestamp      time.Time       // only set if kafka is version 0.10+, inner message timestamp
    BlockTimestamp time.Time       // only set if kafka is version 0.10+, outer (compressed) block timestamp

    Key, Value []byte
    Topic      string
    Partition  int32
    Offset     int64
}
  • producer
package kafka

import (
    "fmt"
    "github.com/Shopify/sarama"
    "log"
    "os"
    "time"
)

var Address = []string{"127.0.0.1:9092"}

func SaramaProducer(message ,topic string) {
    syncProducer(Address, message,topic)
}

//同步消息模式
func syncProducer(address []string, message string,topic string) {
    //指定配置
    config := sarama.NewConfig()
    config.Producer.Return.Successes = true
    config.Producer.Timeout = 5 * time.Second
    p, err := sarama.NewSyncProducer(address, config)
    if err != nil {
        log.Printf("sarama.NewSyncProducer err, message=%s \n", err)
        return
    }
    defer p.Close()
    msg := &sarama.ProducerMessage{
        Topic: topic,
        Value: sarama.ByteEncoder(message),
    }
    part, offset, err := p.SendMessage(msg)
    if err != nil {
        log.Printf("send message(%s) err=%s \n", message, err)
    } else {
        fmt.Fprintf(os.Stdout, message+"发送成功,partition=%d, offset=%d \n", part, offset)
    }

}
  • 这里只是一个简单实用,高级用法可以参考文档 link

相关文章

网友评论

      本文标题:go 使用kafka

      本文链接:https://www.haomeiwen.com/subject/ylpjmktx.html