kafka工作原理(新人向,代码向)
以上来自:链接
怎么向kafka send消息呢?
如何向kafka send消息
可以看出,由producer对象中的send方法中用ProducerRecord封装了个由topic,key,value组成的消息。
producer:
原来就有的Producer接口 其中producer对象是由原来就有的Producer接口生成的。send:
Producer接口中send方法介绍消息的组成:topic,key和value
topic:
send时的目标主题。
key:
用atomicLong实现若空时自增的键。
value:
将value对象转化为Json字符串send到kafka
如何转化?
JSON.toJSONString则是将对象转化为Json字符串。如何向kafka poll消息呢?
如何向kafka poll消息呢consumer:与producer同理,只不过是消费消息罢了。
Duration.of:
大体意思是一段时间,比如这里10000millis,意思是10秒
duration.of含义ConsumerRecords records =consumer.poll(Duration.of(10000, ChronoUnit.MILLIS));
在10秒内poll拿到的放进ConsumerRecord接口生成的records中,再把所有拿到的value放入result这个Arraylist中。(这部分与send部分类似,不详细写了)
commitAsync():
commitAsync详解鄙人的拙劣翻译如下:
背景介绍:
要以文件存放消费的消息时,为了方便查找,对于过大的文件分成许多个小文件,并由offset来给这些小文件打标签(和数组的offset一个意思)。这个过程叫平均/重组(rebalance)。
我的翻译:
在最后一次从指定的主题分区中poll时,会拿到各个offset。
这个拿offset的行为只在kafka用。在最开始拿消息或重组之后,这个行为才会用这个api。所以,你想用这个api给不是kafka上的消息拿offset是不可能的。
这是个异步的过程,任何时候都不会被打断。遇到任何报错都会扔给callback(如果提供的话)或者被丢弃。
在多个调用这个api时,会根据调用先后来给offset。相应的callback也会以相同顺序给出。拿offset这个操作一定会在commitSync()返回前做完。
参数:callback——为了说明这个commitAsync操作完成了。
也就是说,这句话是设定怎么设置offset用的。
网友评论