美文网首页
KAFKA 消费端代码示例

KAFKA 消费端代码示例

作者: GreanTea | 来源:发表于2018-01-10 16:11 被阅读0次

    kafka 消费端代码:

    public static void main(String[] args) {
        String recordStrFormat = "offset = %d, key = %s, value = %s\n";
        Properties props = new Properties();
        props.put("bootstrap.servers", "spidercdh-01:9092");
        props.put("group.id", "default");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", 1000);
        props.put("session.timeout.ms", 30000);
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        //test test2 为topic的名字
        consumer.subscribe(Arrays.asList("test","test2"));
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println(String.format(recordStrFormat, record.offset(), record.key(), record.value()));
                }
            }
        } finally {
            if (null != consumer)
                consumer.close();
        }
    }}

    相关文章

      网友评论

          本文标题:KAFKA 消费端代码示例

          本文链接:https://www.haomeiwen.com/subject/qyvonxtx.html