美文网首页
kafka_09_消费消息

kafka_09_消费消息

作者: 平头哥2 | 来源:发表于2019-03-31 11:38 被阅读0次

    1. 消费消息 poll方法

    //3. 循环消费消息
    while (true) {
        //这里返回的ConsumerRecords,并不是ConsumerRecord,可以认为是一批ConsumerRecord
        //一个ConsumerRecord 和 ProducerRecorde对应
        //使用for循环遍历
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
        for (ConsumerRecord<String, String> record : records) {
            System.out.println("--------------->:" + record.value());
        }  
        
        //使用Iterator遍历
        Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
        while(iterator.hasNext()) {
            ConsumerRecord<String,String> next = iterator.next();
            System.out.println("--------------->:" + next.value());
        }
    }
    

    ConsumerRecord代码如下:

    import org.apache.kafka.common.header.Headers;
    import org.apache.kafka.common.header.internals.RecordHeaders;
    import org.apache.kafka.common.record.DefaultRecord;
    import org.apache.kafka.common.record.RecordBatch;
    import org.apache.kafka.common.record.TimestampType;
    
    /**
     * A key/value pair to be received from Kafka. This also consists of a topic name and 
     * a partition number from which the record is being received, an offset that points 
     * to the record in a Kafka partition, and a timestamp as marked by the corresponding ProducerRecord.
     */
    public class ConsumerRecord<K, V> {
        public static final long NO_TIMESTAMP = RecordBatch.NO_TIMESTAMP;
        public static final int NULL_SIZE = -1;
        public static final int NULL_CHECKSUM = -1;
    
        private final String topic; //主题
        private final int partition;//分区
        private final long offset;//偏移量
        private final long timestamp;//时间戳
        private final TimestampType timestampType;//时间戳
        private final int serializedKeySize;//序列化的key的大小
        private final int serializedValueSize;
        private final Headers headers;//头信息
        private final K key;//key
        private final V value;//value
    
        private volatile Long checksum;//校验信息
    }
    

    按照分区维度来消费消息

    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
    //records.partitions(): 获取消息集中所有分区
    //按照分区维度来消费消息
    for (TopicPartition tp : records.partitions()) {
        //public List<ConsumerRecord<K, V>> records(TopicPartition partition)
        //按照TopicPartition 来获取消息
        List<ConsumerRecord<String, String>> consumerRecordList = records.records(tp);
        consumerRecordList.forEach(r -> System.out.println(r.partition()+"---->"+r.value()));
    }
    

    按照主题维度来消费消息

    在ConsumerRecords类中并没有一个方法topics()来获取消费者所订阅的所有主题。因此按照主题消费,就只能根据消费者订阅的主题来处理。

    //2. 订阅主题
    List<String> asList = Arrays.asList("topic1","topic2");
    consumer.subscribe(asList);
    while (true) {
         ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
        for (String s : asList) {
            //按照主题消费消息
            for (ConsumerRecord<String, String> record : records.records(s)) {
                System.out.println(record.partition()+"---->"+record.value());
            }
        }
    }
    

    相关文章

      网友评论

          本文标题:kafka_09_消费消息

          本文链接:https://www.haomeiwen.com/subject/yipybqtx.html