消费者
KafkaConsumer.go
package cws
import (
"github.com/IBM/sarama"
"log"
"os"
"os/signal"
)
func KafkaConsumer() {
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
client, err := sarama.NewClient([]string{"localhost:9192", "localhost:9292", "localhost:9392"}, config)
defer client.Close()
if err != nil {
panic(err)
}
consumer, err := sarama.NewConsumerFromClient(client)
defer consumer.Close()
if err != nil {
panic(err)
}
// get partitionId list
partitions, err := consumer.Partitions("my_topic")
if err != nil {
panic(err)
}
for _, partitionId := range partitions {
// create partitionConsumer for every partitionId
partitionConsumer, err := consumer.ConsumePartition("my_topic", partitionId, sarama.OffsetNewest)
if err != nil {
panic(err)
}
go func(pc *sarama.PartitionConsumer) {
defer (*pc).Close()
// block
for message := range (*pc).Messages() {
value := string(message.Value)
log.Printf("Partitionid: %d; offset:%d, value: %s\n", message.Partition, message.Offset, value)
}
}(&partitionConsumer)
}
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
select {
case <-signals:
}
}
消费组
KafkaConsumerGroup.go
package cws
import (
"context"
"fmt"
"github.com/IBM/sarama"
"os"
"os/signal"
"sync"
)
type consumerGroupHandler struct {
name string
}
func (consumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
func (consumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h consumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
fmt.Printf("%s Message topic:%q partition:%d offset:%d value:%s\n", h.name, msg.Topic, msg.Partition, msg.Offset, string(msg.Value))
// 手动确认消息
sess.MarkMessage(msg, "")
//手动的话需要提交下。
sess.Commit()
}
return nil
}
func handleErrors(group *sarama.ConsumerGroup, wg *sync.WaitGroup) {
wg.Done()
for err := range (*group).Errors() {
fmt.Println("ERROR", err)
}
}
func consume(group *sarama.ConsumerGroup, wg *sync.WaitGroup, name string) {
fmt.Println(name + "start")
wg.Done()
ctx := context.Background()
for {
topics := []string{"my_topic"}
handler := consumerGroupHandler{name: name}
err := (*group).Consume(ctx, topics, handler)
if err != nil {
panic(err)
}
}
}
func KafkaConsumerGroup() {
var wg sync.WaitGroup
config := sarama.NewConfig()
config.Consumer.Return.Errors = false
config.Version = sarama.V0_10_2_0
client, err := sarama.NewClient([]string{"localhost:9192", "localhost:9292", "localhost:9392"}, config)
defer client.Close()
if err != nil {
panic(err)
}
group1, err := sarama.NewConsumerGroupFromClient("c1", client)
if err != nil {
panic(err)
}
group2, err := sarama.NewConsumerGroupFromClient("c2", client)
if err != nil {
panic(err)
}
group3, err := sarama.NewConsumerGroupFromClient("c3", client)
if err != nil {
panic(err)
}
defer group1.Close()
defer group2.Close()
defer group3.Close()
wg.Add(3)
go consume(&group1, &wg, "c1")
go consume(&group2, &wg, "c2")
go consume(&group3, &wg, "c3")
wg.Wait()
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
select {
case <-signals:
}
}
生产者
KafkaProducer.go
package cws
import (
"github.com/IBM/sarama"
"log"
"os"
"os/signal"
"sync"
)
func main() {
config := sarama.NewConfig()
config.Producer.Return.Successes = true
config.Producer.Partitioner = sarama.NewRandomPartitioner
client, err := sarama.NewClient([]string{"192.168.0.104:9192", "localhost:9292", "localhost:9392"}, config)
defer client.Close()
if err != nil {
panic(err)
}
producer, err := sarama.NewAsyncProducerFromClient(client)
if err != nil {
panic(err)
}
// Trap SIGINT to trigger a graceful shutdown.
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
var (
wg sync.WaitGroup
enqueued, successes, errors int
)
wg.Add(1)
// start a groutines to count successes num
go func() {
defer wg.Done()
for range producer.Successes() {
successes++
}
}()
wg.Add(1)
// start a groutines to count error num
go func() {
defer wg.Done()
for err := range producer.Errors() {
log.Println(err)
errors++
}
}()
ProducerLoop:
for {
message := &sarama.ProducerMessage{Topic: "my_topic", Value: sarama.StringEncoder("testing 123")}
select {
case producer.Input() <- message:
enqueued++
case <-signals:
producer.AsyncClose() // Trigger a shutdown of the producer.
break ProducerLoop
}
}
wg.Wait()
log.Printf("Successfully produced: %d; errors: %d\n", successes, errors)
}
网友评论