美文网首页
kafka_09_消费消息

kafka_09_消费消息

作者: 平头哥2 | 来源:发表于2019-03-31 11:38 被阅读0次

1. 消费消息 poll方法

//3. 循环消费消息
while (true) {
    //这里返回的ConsumerRecords,并不是ConsumerRecord,可以认为是一批ConsumerRecord
    //一个ConsumerRecord 和 ProducerRecorde对应
    //使用for循环遍历
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
    for (ConsumerRecord<String, String> record : records) {
        System.out.println("--------------->:" + record.value());
    }  
    
    //使用Iterator遍历
    Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
    while(iterator.hasNext()) {
        ConsumerRecord<String,String> next = iterator.next();
        System.out.println("--------------->:" + next.value());
    }
}

ConsumerRecord代码如下:

import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.DefaultRecord;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.TimestampType;

/**
 * A key/value pair to be received from Kafka. This also consists of a topic name and 
 * a partition number from which the record is being received, an offset that points 
 * to the record in a Kafka partition, and a timestamp as marked by the corresponding ProducerRecord.
 */
public class ConsumerRecord<K, V> {
    public static final long NO_TIMESTAMP = RecordBatch.NO_TIMESTAMP;
    public static final int NULL_SIZE = -1;
    public static final int NULL_CHECKSUM = -1;

    private final String topic; //主题
    private final int partition;//分区
    private final long offset;//偏移量
    private final long timestamp;//时间戳
    private final TimestampType timestampType;//时间戳
    private final int serializedKeySize;//序列化的key的大小
    private final int serializedValueSize;
    private final Headers headers;//头信息
    private final K key;//key
    private final V value;//value

    private volatile Long checksum;//校验信息
}

按照分区维度来消费消息

ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
//records.partitions(): 获取消息集中所有分区
//按照分区维度来消费消息
for (TopicPartition tp : records.partitions()) {
    //public List<ConsumerRecord<K, V>> records(TopicPartition partition)
    //按照TopicPartition 来获取消息
    List<ConsumerRecord<String, String>> consumerRecordList = records.records(tp);
    consumerRecordList.forEach(r -> System.out.println(r.partition()+"---->"+r.value()));
}

按照主题维度来消费消息

在ConsumerRecords类中并没有一个方法topics()来获取消费者所订阅的所有主题。因此按照主题消费,就只能根据消费者订阅的主题来处理。

//2. 订阅主题
List<String> asList = Arrays.asList("topic1","topic2");
consumer.subscribe(asList);
while (true) {
     ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
    for (String s : asList) {
        //按照主题消费消息
        for (ConsumerRecord<String, String> record : records.records(s)) {
            System.out.println(record.partition()+"---->"+record.value());
        }
    }
}

相关文章

  • kafka_09_消费消息

    1. 消费消息 poll方法 ConsumerRecord代码如下: 按照分区维度来消费消息 按照主题维度来消费消...

  • 消息消费

    总览 消息消费分为两种形式并发消费、顺序消费;这次主要讲并发消费。 消息从Broker拉取到客户端之后,等待客户端...

  • RocketMQ基础篇 Consumer消费消息

    消费消息逻辑 消费消息逻辑主要分为三个模块 Rebalance 拉取消息 消费消息 Rebalance 集群模式下...

  • RocketMQ消息重试

    RocketMQ为了保证消息被消费采用ACK确认机制,消费者消费消息时需要给Broker反馈消息消费的情况,成功或...

  • 消息队列的消费语义和投递语义

    消费语义 如何保证消息最多消费一次 如何保证消息至少消费一次 如何保证消息恰好消费一次 投递语义 如何保证消息最多...

  • 消费消息(二)

    批量消息发送 批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的topic,相同的waitSt...

  • 消费消息(一)

    创建消费者Consumer,制定消费者组名 自定NameServer地址 订阅主题Topic和Tag 设置回调参数...

  • 消息中间件—RocketMQ消息消费(三)(消息消费重试)

    摘要:如果Consumer端消费消息失败,那么RocketMQ是如何对失败的异常情况进行处理?前面两篇Rocket...

  • RocketMQ消息消费概览

    概览 RocketMQ消息消费以消费组为消费维度的,消费组之间消息消费有集群模式与广播模式两种消费模式。 广播模式...

  • Apache Pulsar 消息传递模型(2)-消息确认

    当使用跨机器分布的消息传递系统时,可能会发生故障。在消费者从消息传递系统中的主题消费消息的情况下,消费消息的消费者...

网友评论

      本文标题:kafka_09_消费消息

      本文链接:https://www.haomeiwen.com/subject/yipybqtx.html