以 FlinkKafkaConsumer09
为例
1. 从外到内的调用顺序如下:
序号 | 代码 |
---|---|
1 | StreamExecutionEnvironment#addSource(consumer) |
2 | new FlinkKafkaConsumer09[String](topic, msgSchema, consumerProperties) |
3 | kafkaFetcher.runFetchLoop() |
4 | Handover#pollNext() |
5 | KafkaConsumer#poll(pollTimeout) |
2. 核心代码
由于多了个Handover
对consumer的代理所以代码看起来比较复杂
2.1 Handover
和 KafkaConsumerThread
在 Kafka09Fetcher
中初始化变量
this.handover = new Handover();
this.consumerThread = new KafkaConsumerThread(
LOG,
handover,
kafkaProperties,
unassignedPartitionsQueue,
createCallBridge(),
getFetcherName() + " for " + taskNameWithSubtasks,
pollTimeout,
useMetrics,
consumerMetricGroup,
subtaskMetricGroup);
获取数据部分:
@Override
public void runFetchLoop() throws Exception {
try {
final Handover handover = this.handover;
// kick off the actual Kafka consumer
consumerThread.start();
while (running) {
// this blocks until we get the next records
// it automatically re-throws exceptions encountered in the consumer thread
final ConsumerRecords<byte[], byte[]> records = handover.pollNext();
// get the records for each topic partition
for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitionStates()) {
// 取出单个partition的数据
List<ConsumerRecord<byte[], byte[]>> partitionRecords =
records.records(partition.getKafkaPartitionHandle());
for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
final T value = deserializer.deserialize(
record.key(), record.value(),
record.topic(), record.partition(), record.offset());
if (deserializer.isEndOfStream(value)) {
// end of stream signaled
running = false;
break;
}
// emit the actual record. this also updates offset state atomically
// and deals with timestamps and watermark generation
emitRecord(value, partition, record.offset(), record);
}
}
}
}
finally {
// this signals the consumer thread that no more work is to be done
consumerThread.shutdown();
}
// on a clean exit, wait for the runner thread
try {
consumerThread.join();
}
catch (InterruptedException e) {
// may be the result of a wake-up interruption after an exception.
// we ignore this here and only restore the interruption state
Thread.currentThread().interrupt();
}
}
从上面可以看出外层启动了一个consumerThread.start()
真正的kafka的consumer来接收数据, 而数据的获取又用 handover.pollNext()
代理了下
具体Handover
的实现如下(实现了一个类似单元素的blocking queue):
public final class Handover implements Closeable {
private final Object lock = new Object();
private ConsumerRecords<byte[], byte[]> next;
private Throwable error;
private boolean wakeupProducer;
@Nonnull
public ConsumerRecords<byte[], byte[]> pollNext() throws Exception {
synchronized (lock) {
while (next == null && error == null) {
lock.wait();
}
ConsumerRecords<byte[], byte[]> n = next;
if (n != null) {
next = null;
lock.notifyAll();
return n;
}
else {
ExceptionUtils.rethrowException(error, error.getMessage());
// this statement cannot be reached since the above method always throws an exception
// this is only here to silence the compiler and any warnings
return ConsumerRecords.empty();
}
}
}
public void produce(final ConsumerRecords<byte[], byte[]> element)
throws InterruptedException, WakeupException, ClosedException {
checkNotNull(element);
synchronized (lock) {
while (next != null && !wakeupProducer) {
lock.wait();
}
wakeupProducer = false;
// if there is still an element, we must have been woken up
if (next != null) {
throw new WakeupException();
}
// if there is no error, then this is open and can accept this element
else if (error == null) {
next = element;
lock.notifyAll();
}
// an error marks this as closed for the producer
else {
throw new ClosedException();
}
}
}
}
此时我们将Handover
当作一个队列的话, 就好理解它的作用了, 而它的使用方就是前面提到的KafkaConsumerThread
KafkaConsumerThread
中对于Handover
的使用如下:
@Override
public void run() {
// ...
// this is the means to talk to FlinkKafkaConsumer's main thread
final Handover handover = this.handover;
// This method initializes the KafkaConsumer and guarantees it is torn down properly.
// This is important, because the consumer has multi-threading issues,
// including concurrent 'close()' calls.
try {
this.consumer = getConsumer(kafkaProperties);
}
catch (Throwable t) {
handover.reportError(t);
return;
}
// from here on, the consumer is guaranteed to be closed properly
try {
// ....
// the latest bulk of records. May carry across the loop if the thread is woken up
// from blocking on the handover
ConsumerRecords<byte[], byte[]> records = null;
// reused variable to hold found unassigned new partitions.
// found partitions are not carried across loops using this variable;
// they are carried across via re-adding them to the unassigned partitions queue
List<KafkaTopicPartitionState<TopicPartition>> newPartitions;
// main fetch loop
while (running) {
// check if there is something to commit
if (!commitInProgress) {
// get and reset the work-to-be committed, so we don't repeatedly commit the same
final Tuple2<Map<TopicPartition, OffsetAndMetadata>, KafkaCommitCallback> commitOffsetsAndCallback =
nextOffsetsToCommit.getAndSet(null);
if (commitOffsetsAndCallback != null) {
log.debug("Sending async offset commit request to Kafka broker");
// also record that a commit is already in progress
// the order here matters! first set the flag, then send the commit command.
commitInProgress = true;
consumer.commitAsync(commitOffsetsAndCallback.f0, new CommitCallback(commitOffsetsAndCallback.f1));
}
}
try {
if (hasAssignedPartitions) {
newPartitions = unassignedPartitionsQueue.pollBatch();
}
else {
// if no assigned partitions block until we get at least one
// instead of hot spinning this loop. We rely on a fact that
// unassignedPartitionsQueue will be closed on a shutdown, so
// we don't block indefinitely
newPartitions = unassignedPartitionsQueue.getBatchBlocking();
}
if (newPartitions != null) {
reassignPartitions(newPartitions);
}
} catch (AbortedReassignmentException e) {
continue;
}
if (!hasAssignedPartitions) {
// Without assigned partitions KafkaConsumer.poll will throw an exception
continue;
}
// get the next batch of records, unless we did not manage to hand the old batch over
if (records == null) {
try {
records = consumer.poll(pollTimeout);
}
catch (WakeupException we) {
continue;
}
}
try {
handover.produce(records);
records = null;
}
catch (Handover.WakeupException e) {
// fall through the loop
}
}
// end main fetch loop
}
catch (Throwable t) {
// let the main thread know and exit
// it may be that this exception comes because the main thread closed the handover, in
// which case the below reporting is irrelevant, but does not hurt either
handover.reportError(t);
}
finally {
// make sure the handover is closed if it is not already closed or has an error
handover.close();
// make sure the KafkaConsumer is closed
try {
consumer.close();
}
catch (Throwable t) {
log.warn("Error while closing Kafka consumer", t);
}
}
}
从上我们可以看出conumser
拿到数据之后再交给handover
去处理
3. Flink的checkpoint和kafka的offset提交
每次checkpoint
的时候都会设置要提交kafka的offset, 再在消费数据代码里去异步提交offset
序号 | 代码 |
---|---|
1 | FlinkKafkaConsumerBase#notifyCheckpointComplete(long checkpointId) |
2 | AbstractFetcher#commitInternalOffsetsToKafka |
3 | KafkaConsumerThread#setOffsetsToCommit |
4 |
nextOffsetsToCommit (上面的代码有设置改值的相关代码) |
void setOffsetsToCommit(
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit,
@Nonnull KafkaCommitCallback commitCallback) {
// record the work to be committed by the main consumer thread and make sure the consumer notices that
if (nextOffsetsToCommit.getAndSet(Tuple2.of(offsetsToCommit, commitCallback)) != null) {
log.warn("Committing offsets to Kafka takes longer than the checkpoint interval. " +
"Skipping commit of previous offsets because newer complete checkpoint offsets are available. " +
"This does not compromise Flink's checkpoint integrity.");
}
// if the consumer is blocked in a poll() or handover operation, wake it up to commit soon
handover.wakeupProducer();
synchronized (consumerReassignmentLock) {
if (consumer != null) {
consumer.wakeup();
} else {
// the consumer is currently isolated for partition reassignment;
// set this flag so that the wakeup state is restored once the reassignment is complete
hasBufferedWakeup = true;
}
}
}
3.1 提交offset超时
提交offset超时指的是, 两次checkpoint的时候中,offset的提交还没有完成, 这个时候nextOffsetsToCommit
会被再次设置, 从而会造成之前offset提交会丢失的问题
网友评论