首先来看一下 FlinkKafkaConsumerBase.run方法,相当于是Flink 从kafka中拉取数据的入口方法:
//入口方法 start a source
public void run(SourceContext<T> sourceContext) throws Exception {
......
// from this point forward:
// - 'snapshotState' will draw offsets from the fetcher,
// instead of being built from `subscribedPartitionsToStartOffsets`
// - 'notifyCheckpointComplete' will start to do work (i.e. commit offsets to
// Kafka through the fetcher, if configured to do so)
//创建Fetcher 从kafka中拉取数据
this.kafkaFetcher = createFetcher(
sourceContext,
subscribedPartitionsToStartOffsets,
periodicWatermarkAssigner,
punctuatedWatermarkAssigner,
(StreamingRuntimeContext) getRuntimeContext(),
offsetCommitMode,
getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP),
useMetrics);
if (!running) {
return;
}
// depending on whether we were restored with the current state version (1.3),
// remaining logic branches off into 2 paths:
// 1) New state - partition discovery loop executed as separate thread, with this
// thread running the main fetcher loop
// 2) Old state - partition discovery is disabled and only the main fetcher loop is executed
if (discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED) {
//未配置KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS
kafkaFetcher.runFetchLoop();
} else {
//仍然调用了kafkaFetcher.runFetchLoop();
runWithPartitionDiscovery();
}
}
createFetcher方法
@Override
protected AbstractFetcher<T, ?> createFetcher(
SourceContext<T> sourceContext,
Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
StreamingRuntimeContext runtimeContext,
OffsetCommitMode offsetCommitMode,
MetricGroup consumerMetricGroup,
boolean useMetrics) throws Exception {
......
return new KafkaFetcher<>(
sourceContext,
assignedPartitionsWithInitialOffsets,
watermarksPeriodic,
watermarksPunctuated,
runtimeContext.getProcessingTimeService(),
runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
runtimeContext.getUserCodeClassLoader(),
runtimeContext.getTaskNameWithSubtasks(),
deserializer,
properties,
pollTimeout,
runtimeContext.getMetricGroup(),
consumerMetricGroup,
useMetrics);
}
返回了一个 KafkaFetcher对象,我们点进去看一下
KafkaFetcher的构造器里面创建了一个 KafkaConsumerThread对象
public KafkaFetcher(
SourceFunction.SourceContext<T> sourceContext,
Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
ProcessingTimeService processingTimeProvider,
long autoWatermarkInterval,
ClassLoader userCodeClassLoader,
String taskNameWithSubtasks,
KafkaDeserializationSchema<T> deserializer,
Properties kafkaProperties,
long pollTimeout,
MetricGroup subtaskMetricGroup,
MetricGroup consumerMetricGroup,
boolean useMetrics) throws Exception {
......
this.consumerThread = new KafkaConsumerThread(
LOG,
//KafkaConsumerThread 构造器中的参数
handover,
kafkaProperties,
//unassignedPartitionsQueue具体是什么呢?咱们会在flink startupMode是如何起作用的 详细去讲
unassignedPartitionsQueue,
getFetcherName() + " for " + taskNameWithSubtasks,
pollTimeout,
useMetrics,
consumerMetricGroup,
subtaskMetricGroup);
}
至此为止createFetch就介绍完了,也可以看作是拉取数据的准备工作,接下来看一下kafkaFetcher.runFetchLoop();
KafkaFetch中的runFetchLoop方法,正式开始从kafka中拉取message
//fetcher message from kafka
public void runFetchLoop() throws Exception {
try {
//KafkaConsumerThread构造的参数之一
final Handover handover = this.handover;
// kick off the actual Kafka consumer
//实际的从kafka中拉取数据的地方
consumerThread.start();
while (running) {
// this blocks until we get the next records
// it automatically re-throws exceptions encountered in the consumer thread
//从handover中获取数据,然后对records进行处理
final ConsumerRecords<byte[], byte[]> records = handover.pollNext();
// get the records for each topic partition
for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitionStates()) {
List<ConsumerRecord<byte[], byte[]>> partitionRecords =
records.records(partition.getKafkaPartitionHandle());
for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
final T value = deserializer.deserialize(record);
if (deserializer.isEndOfStream(value)) {
// end of stream signaled
running = false;
break;
}
// emit the actual record. this also updates offset state atomically
// and deals with timestamps and watermark generation
//发送消息,接下来就是timestamps和watermark的处理了
emitRecord(value, partition, record.offset(), record);
}
}
}
}
finally {
// this signals the consumer thread that no more work is to be done
consumerThread.shutdown();
}
// on a clean exit, wait for the runner thread
try {
consumerThread.join();
}
catch (InterruptedException e) {
// may be the result of a wake-up interruption after an exception.
// we ignore this here and only restore the interruption state
Thread.currentThread().interrupt();
}
}
既然consumerThread.start()开始了实际的kafka consumer,我们一起来看一下consumerThread中的方法
@Override
public void run() {
// early exit check
if (!running) {
return;
}
// this is the means to talk to FlinkKafkaConsumer's main thread
//构造器中参数,用于FlinkKafkaConsumer主线程进行消费,上面提到的handover.pollNext()
final Handover handover = this.handover;
// This method initializes the KafkaConsumer and guarantees it is torn down properly.
// This is important, because the consumer has multi-threading issues,
// including concurrent 'close()' calls.
try {
//获取consumer
this.consumer = getConsumer(kafkaProperties);
}
catch (Throwable t) {
handover.reportError(t);
return;
}
// from here on, the consumer is guaranteed to be closed properly
......
// early exit check
if (!running) {
return;
}
// the latest bulk of records. May carry across the loop if the thread is woken up
// from blocking on the handover
ConsumerRecords<byte[], byte[]> records = null;
// reused variable to hold found unassigned new partitions.
// found partitions are not carried across loops using this variable;
// they are carried across via re-adding them to the unassigned partitions queue
List<KafkaTopicPartitionState<TopicPartition>> newPartitions;
// main fetch loop
while (running) {
// check if there is something to commit
//default false
if (!commitInProgress) {
// get and reset the work-to-be committed, so we don't repeatedly commit the same
//这里具体可以参考[Flink是如何保存Offset的](https://www.jianshu.com/p/ee4fe63f0182)
final Tuple2<Map<TopicPartition, OffsetAndMetadata>, KafkaCommitCallback> commitOffsetsAndCallback =
nextOffsetsToCommit.getAndSet(null);
if (commitOffsetsAndCallback != null) {
log.debug("Sending async offset commit request to Kafka broker");
// also record that a commit is already in progress
// the order here matters! first set the flag, then send the commit command.
commitInProgress = true;
consumer.commitAsync(commitOffsetsAndCallback.f0, new CommitCallback(commitOffsetsAndCallback.f1));
}
}
try {
//hasAssignedPartitions default false
//当发现新的partition的时候,会add到unassignedPartitionsQueue和sub
//具体可以参考 flink startupMode是如何起作用的
if (hasAssignedPartitions) {
newPartitions = unassignedPartitionsQueue.pollBatch();
}
else {
// if no assigned partitions block until we get at least one
// instead of hot spinning this loop. We rely on a fact that
// unassignedPartitionsQueue will be closed on a shutdown, so
// we don't block indefinitely
newPartitions = unassignedPartitionsQueue.getBatchBlocking();
}
if (newPartitions != null) {
reassignPartitions(newPartitions);
}
} catch (AbortedReassignmentException e) {
continue;
}
if (!hasAssignedPartitions) {
// Without assigned partitions KafkaConsumer.poll will throw an exception
continue;
}
// get the next batch of records, unless we did not manage to hand the old batch over
if (records == null) {
try {
//通过kafkaAPI 拉取数据
records = consumer.poll(pollTimeout);
}
catch (WakeupException we) {
continue;
}
}
try {
//handover对records进行"包装",供FlinkKafkaConsumer主线程消费
handover.produce(records);
records = null;
}
catch (Handover.WakeupException e) {
// fall through the loop
}
}
// end main fetch loop
}
catch (Throwable t) {
// let the main thread know and exit
// it may be that this exception comes because the main thread closed the handover, in
// which case the below reporting is irrelevant, but does not hurt either
handover.reportError(t);
}
finally {
// make sure the handover is closed if it is not already closed or has an error
handover.close();
// make sure the KafkaConsumer is closed
try {
consumer.close();
}
catch (Throwable t) {
log.warn("Error while closing Kafka consumer", t);
}
}
}
至此如何从kafka中拉取数据,已经介绍完了
网友评论