首先去下载sarama
1.生产者
package main
import (
"fmt"
"github.com/Shopify/sarama"
)
func produce(value, msgType string) bool {
config := sarama.NewConfig()
// 等待服务器所有副本都保存成功后的响应
config.Producer.RequiredAcks = sarama.WaitForAll
// 随机的分区类型:返回一个分区器,该分区器每次选择一个随机分区
config.Producer.Partitioner = sarama.NewRandomPartitioner
// 是否等待成功和失败后的响应
config.Producer.Return.Successes = true
// 使用给定代理地址和配置创建一个同步生产者
producer, err := sarama.NewSyncProducer([]string{"192.168.0.121:9092"}, config)
if err != nil {
panic(err)
}
defer producer.Close()
//构建发送的消息,
msg := &sarama.ProducerMessage{
Topic: "test", //包含了消息的主题
Partition: int32(10), //
Key: sarama.StringEncoder("key"), //
}
for {
// _, err := fmt.Scanf("%s", &value)
// if err != nil {
// a := err.Error
// fmt.Println(a)
// }
// fmt.Scanf("%s", &msgType)
fmt.Println("msgType = ", msgType, ",value = ", value)
msg.Topic = msgType
//将字符串转换为字节数组
msg.Value = sarama.ByteEncoder(value)
//fmt.Println(value)
//SendMessage:该方法是生产者生产给定的消息
//生产成功的时候返回该消息的分区和所在的偏移量
//生产失败的时候返回error
partition, offset, err := producer.SendMessage(msg)
if err != nil {
fmt.Println("Send message Fail")
return false
}
fmt.Printf("Partition = %d, offset=%d\n", partition, offset)
return true
}
}
2.消费者
package main
import (
"encoding/json"
"fmt"
"sync"
"github.com/Shopify/sarama"
)
var (
wg sync.WaitGroup
)
func Consumer(topics []string, ip []string, grouid string) {
defer wg.Done()
config := cluster.NewConfig()
config.Consumer.Return.Errors = true
config.Group.Return.Notifications = true
config.Consumer.Offsets.Initial = sarama.OffsetNewest
// init consumer
consumer, err := cluster.NewConsumer(ip, grouid, topics, config)
if err != nil {
log.Printf("%s: sarama.NewSyncProducer err, message=%s \n", grouid, err)
return
}
defer consumer.Close()
// trap SIGINT to trigger a shutdown
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
// consume errors
go func() {
for err := range consumer.Errors() {
log.Printf("%s:Error: %s\n", grouid, err.Error())
}
}()
// consume notifications
go func() {
for ntf := range consumer.Notifications() {
log.Printf("%s:Rebalanced: %+v \n", grouid, ntf)
}
}()
// consume messages, watch signals
var successes int
Loop:
for {
select {
case msg, ok := <-consumer.Messages():
if ok {
fmt.Fprintf(os.Stdout, "%s:%s/%d/%d\t%s\t%s\n", grouid, msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
consumer.MarkOffset(msg, "") //这里是给消费过的offset 打上标记 下次启动从这里进行消费
}
case <-signals:
break Loop
}
}
3.以上经过测试可以使用 来源 (稍微有一点点改动)https://studygolang.com/articles/17912
网友评论