接上文的环境,我们已经搭建了拥有3个节点的kafka集群,并创建了topic(分区数为4,副本因子为3(3个broker))
# docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
4056558ea9ab wurstmeister/kafka "start-kafka.sh" 2 hours ago Up 2 hours 0.0.0.0:19094->9092/tcp kafka3
fc3e6eb2fdea wurstmeister/kafka "start-kafka.sh" 2 hours ago Up 2 hours 0.0.0.0:19093->9092/tcp kafka2
dd1536016f31 wurstmeister/kafka "start-kafka.sh" 2 hours ago Up 2 hours 0.0.0.0:19092->9092/tcp kafka1
5e9c4f4911a0 zookeeper "/docker-entrypoin..." 2 hours ago Up 2 hours 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, 8080/tcp zookeeper
# ./kafka-topics.sh --zookeeper 192.168.48.134:2181 --describe --topic test
Topic: test TopicId: _A5YjE72QWeECBPEOFCquw PartitionCount: 4 ReplicationFactor: 3 Configs:
Topic: test Partition: 0 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1
Topic: test Partition: 1 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2
Topic: test Partition: 2 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3
Topic: test Partition: 3 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
消费者
这里我们使用go语言,kafka 客户端使用 github.com/Shopify/sarama v1.29.1 版本
-
生产者
package main import ( "fmt" "github.com/Shopify/sarama" ) var kafkaAddresses = []string{"192.168.48.134:19092", "192.168.48.134:19093", "192.168.48.134:19094"} var topic = "test" func main() { config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll // 发送完数据需要leader和follow都确认 config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition config.Producer.Return.Successes = true // 成功交付的消息将在success channel返回 // 连接kafka client, err := sarama.NewSyncProducer(kafkaAddresses, config) if err != nil { fmt.Println("producer closed, err:", err) return } defer client.Close() // 构造一个消息 msg := &sarama.ProducerMessage{} msg.Topic = topic msg.Value = sarama.StringEncoder("this is a test log") // 发送消息 pid, offset, err := client.SendMessage(msg) if err != nil { fmt.Println("send msg failed, err:", err) return } fmt.Printf("pid:%v offset:%v\n", pid, offset) }
-
消费者
package main import ( "fmt" "github.com/Shopify/sarama" ) var kafkaAddresses = []string{"192.168.48.134:19092", "192.168.48.134:19093", "192.168.48.134:19094"} var topic = "test" func main() { config := sarama.NewConfig() fmt.Println("clientID: ", config.ClientID) consumer, err := sarama.NewConsumer(kafkaAddresses, config) if err != nil { fmt.Printf("fail to start consumer, err:%v\n", err) return } partitionList, err := consumer.Partitions(topic) // 根据topic取到所有的分区 if err != nil { fmt.Printf("fail to get list of partition:err%v\n", err) return } fmt.Println("partitionList:", partitionList) // 遍历所有的分区 for partition := range partitionList { // 针对每个分区创建一个对应的分区消费者 pc, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest) if err != nil { fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err) return } defer pc.AsyncClose() // 异步从每个分区消费信息 go func(sarama.PartitionConsumer) { for msg := range pc.Messages() { fmt.Println(fmt.Sprintf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, msg.Value)) } }(pc) } for {} }
消费者组
-
生产者
生产者代码不变 -
消费者
代码源自 https://github.com/Shopify/sarama/tree/v1.29.1/examples/consumergrouppackage main import ( "context" "github.com/Shopify/sarama" "log" "os" "os/signal" "sync" "syscall" ) var kafkaAddresses = []string{"192.168.48.134:19092", "192.168.48.134:19093", "192.168.48.134:19094"} var topic = []string{"test"} func main() { config := sarama.NewConfig() consumer := Consumer{ ready: make(chan bool), } ctx, cancel := context.WithCancel(context.Background()) client, err := sarama.NewConsumerGroup(kafkaAddresses, "111", config) if err != nil { log.Panicf("Error creating consumer group client: %v", err) } wg := &sync.WaitGroup{} wg.Add(1) go func() { defer wg.Done() for { // `Consume` should be called inside an infinite loop, when a // server-side rebalance happens, the consumer session will need to be // recreated to get the new claims if err := client.Consume(ctx, topic, &consumer); err != nil { log.Panicf("Error from consumer: %v", err) } // check if context was cancelled, signaling that the consumer should stop if ctx.Err() != nil { return } consumer.ready = make(chan bool) } }() log.Println("Sarama consumer up and running!...") sigterm := make(chan os.Signal, 1) signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM) select { case <-ctx.Done(): log.Println("terminating: context cancelled") case <-sigterm: log.Println("terminating: via signal") } cancel() wg.Wait() if err = client.Close(); err != nil { log.Panicf("Error closing client: %v", err) } } // Consumer represents a Sarama consumer group consumer type Consumer struct { ready chan bool } // Setup is run at the beginning of a new session, before ConsumeClaim func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error { // Mark the consumer as ready close(consumer.ready) return nil } // Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error { return nil } // ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { // NOTE: // Do not move the code below to a goroutine. // The `ConsumeClaim` itself is called within a goroutine, see: // https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29 for message := range claim.Messages() { log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic) session.MarkMessage(message, "") } return nil }
网友评论