下面我们来进行consumer的开发。先来写一个Consumer类,在构造方法里初始化KafkaConsumer对象,并设置相应的参数。
private final KafkaConsumer<Integer,String> consumer;
private List<String> topics = new ArrayList<String>();
public Consumer() {
Properties props = new Properties();
props.put("bootstrap.servers","192.168.61.158:9092");
props.put("zookeeper.connect", "192.168.61.151:2181,192.168.61.152:2181,192.168.61.153:2181");
props.put("group.id", "0");
props.put("auto.offset.reset","earliest");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("enable.auto.commit","true");
props.put("zookeeper.session.timeout.ms", "4000");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
topics.add("goods");
//配置consumer的值
consumer = new KafkaConsumer(props);
consumer.subscribe(topics);
}
Kafka是通过Properties配置参数的。这里的参数比较多,我们来详细的说一下。
1.bootstrap.servers,服务的节点列表,这个不用再说了。
2.zookeeper.connect,Zookeeper的地址
3.group.id,Consumer Group的id,我们知道,每个Consumer都会属于一个Consumer Group,这里就是给Consumer指定一个所属group的id。
4.auto.offset.reset,从哪里开始消费,earliest是从最早的offset开始消费数据。
5.key.deserializer,key的反序列化器。
6.value.deserializer,value的反序列化器。
7.enable.auto.commit,是否自动提交offset,如果要做到不重复消费,可以设置为手动提交,在业务中处理offset提交的问题。
8.zookeeper.session.timeout.ms,和Zookeeper连接的超时时间。
9.zookeeper.sync.time.ms,Zookeeper同步数据的时间。
10.auto.commit.interval.ms,多长时间进行offset的自动提交。
初始化KafkaConsumer后,我们要把KafkaConsumer订阅要关注的topic。下面看一下consume方法
public void consume(){
ConsumerRecords<Integer,String> consumerRecord = consumer.poll(1000);
Iterator iterator = consumerRecord.iterator();
while (iterator.hasNext()){
ConsumerRecord<Integer,String> record = (ConsumerRecord<Integer,String>)iterator.next();
LOG.info("offset:"+record.offset()+",key:"+record.key()+",value:"+record.value());
}
}
通过poll方法去broker上拉取消息。下面来写一个测试方法
Consumer consumer = new Consumer();
consumer.consume();
Kafka的consumer开发是很简单的,但是要注意,consumer消费消息的offset管理,否则可能会产生重复消费的情况,在配置中没有设置自动提交offset的话,需要在业务中手动进行处理。
Kafka的consumer开发就介绍到这里了。
网友评论