1. 消费消息 poll方法
//3. 循环消费消息
while (true) {
//这里返回的ConsumerRecords,并不是ConsumerRecord,可以认为是一批ConsumerRecord
//一个ConsumerRecord 和 ProducerRecorde对应
//使用for循环遍历
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
for (ConsumerRecord<String, String> record : records) {
System.out.println("--------------->:" + record.value());
}
//使用Iterator遍历
Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
while(iterator.hasNext()) {
ConsumerRecord<String,String> next = iterator.next();
System.out.println("--------------->:" + next.value());
}
}
ConsumerRecord代码如下:
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.DefaultRecord;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.TimestampType;
/**
* A key/value pair to be received from Kafka. This also consists of a topic name and
* a partition number from which the record is being received, an offset that points
* to the record in a Kafka partition, and a timestamp as marked by the corresponding ProducerRecord.
*/
public class ConsumerRecord<K, V> {
public static final long NO_TIMESTAMP = RecordBatch.NO_TIMESTAMP;
public static final int NULL_SIZE = -1;
public static final int NULL_CHECKSUM = -1;
private final String topic; //主题
private final int partition;//分区
private final long offset;//偏移量
private final long timestamp;//时间戳
private final TimestampType timestampType;//时间戳
private final int serializedKeySize;//序列化的key的大小
private final int serializedValueSize;
private final Headers headers;//头信息
private final K key;//key
private final V value;//value
private volatile Long checksum;//校验信息
}
按照分区维度来消费消息
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
//records.partitions(): 获取消息集中所有分区
//按照分区维度来消费消息
for (TopicPartition tp : records.partitions()) {
//public List<ConsumerRecord<K, V>> records(TopicPartition partition)
//按照TopicPartition 来获取消息
List<ConsumerRecord<String, String>> consumerRecordList = records.records(tp);
consumerRecordList.forEach(r -> System.out.println(r.partition()+"---->"+r.value()));
}
按照主题维度来消费消息
在ConsumerRecords类中并没有一个方法topics()来获取消费者所订阅的所有主题。因此按照主题消费,就只能根据消费者订阅的主题来处理。
//2. 订阅主题
List<String> asList = Arrays.asList("topic1","topic2");
consumer.subscribe(asList);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
for (String s : asList) {
//按照主题消费消息
for (ConsumerRecord<String, String> record : records.records(s)) {
System.out.println(record.partition()+"---->"+record.value());
}
}
}
网友评论