美文网首页
深入理解Kafka(六) Consumer开发

深入理解Kafka(六) Consumer开发

作者: skyguard | 来源:发表于2019-01-10 22:33 被阅读0次

    下面我们来进行consumer的开发。先来写一个Consumer类,在构造方法里初始化KafkaConsumer对象,并设置相应的参数。

    private final KafkaConsumer<Integer,String> consumer;
    
    private List<String> topics = new ArrayList<String>();
    
    public Consumer() {
        Properties props = new Properties();
        props.put("bootstrap.servers","192.168.61.158:9092");
        props.put("zookeeper.connect", "192.168.61.151:2181,192.168.61.152:2181,192.168.61.153:2181");
        props.put("group.id", "0");
        props.put("auto.offset.reset","earliest");
        props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        props.put("enable.auto.commit","true");
        props.put("zookeeper.session.timeout.ms", "4000");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
    
    
        topics.add("goods");
        //配置consumer的值
        consumer = new KafkaConsumer(props);
        consumer.subscribe(topics);
    
    }
    

    Kafka是通过Properties配置参数的。这里的参数比较多,我们来详细的说一下。
    1.bootstrap.servers,服务的节点列表,这个不用再说了。
    2.zookeeper.connect,Zookeeper的地址
    3.group.id,Consumer Group的id,我们知道,每个Consumer都会属于一个Consumer Group,这里就是给Consumer指定一个所属group的id。
    4.auto.offset.reset,从哪里开始消费,earliest是从最早的offset开始消费数据。
    5.key.deserializer,key的反序列化器。
    6.value.deserializer,value的反序列化器。
    7.enable.auto.commit,是否自动提交offset,如果要做到不重复消费,可以设置为手动提交,在业务中处理offset提交的问题。
    8.zookeeper.session.timeout.ms,和Zookeeper连接的超时时间。
    9.zookeeper.sync.time.ms,Zookeeper同步数据的时间。
    10.auto.commit.interval.ms,多长时间进行offset的自动提交。
    初始化KafkaConsumer后,我们要把KafkaConsumer订阅要关注的topic。下面看一下consume方法

    public void consume(){
    
        ConsumerRecords<Integer,String> consumerRecord = consumer.poll(1000);
        Iterator iterator = consumerRecord.iterator();
        while (iterator.hasNext()){
            ConsumerRecord<Integer,String> record = (ConsumerRecord<Integer,String>)iterator.next();
            LOG.info("offset:"+record.offset()+",key:"+record.key()+",value:"+record.value());
        }
    
    }
    

    通过poll方法去broker上拉取消息。下面来写一个测试方法

    Consumer consumer = new Consumer();
    consumer.consume();
    

    Kafka的consumer开发是很简单的,但是要注意,consumer消费消息的offset管理,否则可能会产生重复消费的情况,在配置中没有设置自动提交offset的话,需要在业务中手动进行处理。
    Kafka的consumer开发就介绍到这里了。

    相关文章

      网友评论

          本文标题:深入理解Kafka(六) Consumer开发

          本文链接:https://www.haomeiwen.com/subject/ryizrqtx.html