最近写了个kafka的接收消息的功能,需要使用回调处理收到的消息。
一个是基本的回调,一个是使用接口功能实现回调,对接口是个很好的学习。
1.正常回调
kafka的接收消息处。收到消息后,使用传入的Onmessage进行处理。
type Onmessage func(string)
func (kafka *KafkaClient) Consumer(topic string, onmessage Onmessage) {
.......
onmessage(string(msg.Value))
}
调用kafka接收消息的单元,并在调用方写好回调
func CollectResult() {
kafkaclient := tools.KAFKA
TOPICTASK := config.CONFIG.GetString("kafka.topicresult")
kafkaclient.Consumer(TOPICTASK, Onmessage)
}
//收到消息的回调函数
func Onmessage(message string) {
fmt.Println("Im callback", message)
}
- 上述是使用正常的回调,突然想到可以使用更常用的接口。
先到kafka消息接收的单元定义接口
type MessageHandler interface {
Onmessage(message string)
}
//接收消息,并回传给回调函数
func (kafka *KafkaClient) Consumer(topic string, onm MessageHandler) {
.......
//使用接口定义的方法
onm.Onmessage(string(msg.Value))
}
在调用方实现回调需要执行的方法
type MM struct{}
func (*MM) Onmessage(message string) {
fmt.Println("Im callback", message)
}
func CollectResult() {
......
//传入实现了方法的结构体
kafkaclient.Consumer(TOPICTASK, &MM{})
}
感觉还是使用基本回调相对简单点,接口就当学习了。
另外跨包的接口的方法要大写!定位了好久发现个入门的问题。
网友评论