Flink是如何从kafka中拉取数据的

作者: shengjk1 | 来源:发表于2019-04-07 12:23 被阅读7次

    首先来看一下 FlinkKafkaConsumerBase.run方法,相当于是Flink 从kafka中拉取数据的入口方法:

    //入口方法 start a source
        public void run(SourceContext<T> sourceContext) throws Exception {
            ......
            // from this point forward:
            //   - 'snapshotState' will draw offsets from the fetcher,
            //     instead of being built from `subscribedPartitionsToStartOffsets`
            //   - 'notifyCheckpointComplete' will start to do work (i.e. commit offsets to
            //     Kafka through the fetcher, if configured to do so)
            //创建Fetcher 从kafka中拉取数据
            this.kafkaFetcher = createFetcher(
                    sourceContext,
                    subscribedPartitionsToStartOffsets,
                    periodicWatermarkAssigner,
                    punctuatedWatermarkAssigner,
                    (StreamingRuntimeContext) getRuntimeContext(),
                    offsetCommitMode,
                    getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP),
                    useMetrics);
    
            if (!running) {
                return;
            }
    
            // depending on whether we were restored with the current state version (1.3),
            // remaining logic branches off into 2 paths:
            //  1) New state - partition discovery loop executed as separate thread, with this
            //                 thread running the main fetcher loop
            //  2) Old state - partition discovery is disabled and only the main fetcher loop is executed
            
            if (discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED) {
            //未配置KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS
                kafkaFetcher.runFetchLoop();
            } else {
    //仍然调用了kafkaFetcher.runFetchLoop();
                runWithPartitionDiscovery();
            }
        }
    
    

    createFetcher方法

    @Override
        protected AbstractFetcher<T, ?> createFetcher(
            SourceContext<T> sourceContext,
            Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
            SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
            SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
            StreamingRuntimeContext runtimeContext,
            OffsetCommitMode offsetCommitMode,
            MetricGroup consumerMetricGroup,
            boolean useMetrics) throws Exception {
    ......
    
            return new KafkaFetcher<>(
                sourceContext,
                assignedPartitionsWithInitialOffsets,
                watermarksPeriodic,
                watermarksPunctuated,
                runtimeContext.getProcessingTimeService(),
                runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
                runtimeContext.getUserCodeClassLoader(),
                runtimeContext.getTaskNameWithSubtasks(),
                deserializer,
                properties,
                pollTimeout,
                runtimeContext.getMetricGroup(),
                consumerMetricGroup,
                useMetrics);
        }
    

    返回了一个 KafkaFetcher对象,我们点进去看一下
    KafkaFetcher的构造器里面创建了一个 KafkaConsumerThread对象

    public KafkaFetcher(
            SourceFunction.SourceContext<T> sourceContext,
            Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
            SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
            SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
            ProcessingTimeService processingTimeProvider,
            long autoWatermarkInterval,
            ClassLoader userCodeClassLoader,
            String taskNameWithSubtasks,
            KafkaDeserializationSchema<T> deserializer,
            Properties kafkaProperties,
            long pollTimeout,
            MetricGroup subtaskMetricGroup,
            MetricGroup consumerMetricGroup,
            boolean useMetrics) throws Exception {
    ......
            this.consumerThread = new KafkaConsumerThread(
                LOG,
    //KafkaConsumerThread 构造器中的参数
                handover,
                kafkaProperties,
    //unassignedPartitionsQueue具体是什么呢?咱们会在flink startupMode是如何起作用的 详细去讲
                unassignedPartitionsQueue,
                getFetcherName() + " for " + taskNameWithSubtasks,
                pollTimeout,
                useMetrics,
                consumerMetricGroup,
                subtaskMetricGroup);
        }
    
    

    至此为止createFetch就介绍完了,也可以看作是拉取数据的准备工作,接下来看一下kafkaFetcher.runFetchLoop();
    KafkaFetch中的runFetchLoop方法,正式开始从kafka中拉取message

    //fetcher message from kafka
        public void runFetchLoop() throws Exception {
            try {
    //KafkaConsumerThread构造的参数之一
                final Handover handover = this.handover;
    
                // kick off the actual Kafka consumer
          //实际的从kafka中拉取数据的地方 
                consumerThread.start();
    
                while (running) {
                    // this blocks until we get the next records
                    // it automatically re-throws exceptions encountered in the consumer thread
    //从handover中获取数据,然后对records进行处理
                    final ConsumerRecords<byte[], byte[]> records = handover.pollNext();
    
                    // get the records for each topic partition
                    for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitionStates()) {
    
                        List<ConsumerRecord<byte[], byte[]>> partitionRecords =
                            records.records(partition.getKafkaPartitionHandle());
    
                        for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
                            final T value = deserializer.deserialize(record);
    
                            if (deserializer.isEndOfStream(value)) {
                                // end of stream signaled
                                running = false;
                                break;
                            }
    
                            // emit the actual record. this also updates offset state atomically
                            // and deals with timestamps and watermark generation
    //发送消息,接下来就是timestamps和watermark的处理了
                            emitRecord(value, partition, record.offset(), record);
                        }
                    }
                }
            }
            finally {
                // this signals the consumer thread that no more work is to be done
                consumerThread.shutdown();
            }
    
            // on a clean exit, wait for the runner thread
            try {
                consumerThread.join();
            }
            catch (InterruptedException e) {
                // may be the result of a wake-up interruption after an exception.
                // we ignore this here and only restore the interruption state
                Thread.currentThread().interrupt();
            }
        }
    

    既然consumerThread.start()开始了实际的kafka consumer,我们一起来看一下consumerThread中的方法

    @Override
        public void run() {
            // early exit check
            if (!running) {
                return;
            }
    
            // this is the means to talk to FlinkKafkaConsumer's main thread
    //构造器中参数,用于FlinkKafkaConsumer主线程进行消费,上面提到的handover.pollNext()
            final Handover handover = this.handover;
    
            // This method initializes the KafkaConsumer and guarantees it is torn down properly.
            // This is important, because the consumer has multi-threading issues,
            // including concurrent 'close()' calls.
            try {
    //获取consumer
                this.consumer = getConsumer(kafkaProperties);
            }
            catch (Throwable t) {
                handover.reportError(t);
                return;
            }
    
            // from here on, the consumer is guaranteed to be closed properly
            ......
    
                // early exit check
                if (!running) {
                    return;
                }
    
                // the latest bulk of records. May carry across the loop if the thread is woken up
                // from blocking on the handover
                ConsumerRecords<byte[], byte[]> records = null;
    
                // reused variable to hold found unassigned new partitions.
                // found partitions are not carried across loops using this variable;
                // they are carried across via re-adding them to the unassigned partitions queue
                List<KafkaTopicPartitionState<TopicPartition>> newPartitions;
    
                // main fetch loop
                while (running) {
    
                    // check if there is something to commit
                    //default false
                    if (!commitInProgress) {
                        // get and reset the work-to-be committed, so we don't repeatedly commit the same
    //这里具体可以参考[Flink是如何保存Offset的](https://www.jianshu.com/p/ee4fe63f0182)
                        final Tuple2<Map<TopicPartition, OffsetAndMetadata>, KafkaCommitCallback> commitOffsetsAndCallback =
                                nextOffsetsToCommit.getAndSet(null);
    
                        if (commitOffsetsAndCallback != null) {
                            log.debug("Sending async offset commit request to Kafka broker");
    
                            // also record that a commit is already in progress
                            // the order here matters! first set the flag, then send the commit command.
                            commitInProgress = true;
                            consumer.commitAsync(commitOffsetsAndCallback.f0, new CommitCallback(commitOffsetsAndCallback.f1));
                        }
                    }
    
                    try {
                        //hasAssignedPartitions default false
                        //当发现新的partition的时候,会add到unassignedPartitionsQueue和sub
    //具体可以参考 flink startupMode是如何起作用的
                        if (hasAssignedPartitions) {
                            newPartitions = unassignedPartitionsQueue.pollBatch();
                        }
                        else {
                            // if no assigned partitions block until we get at least one
                            // instead of hot spinning this loop. We rely on a fact that
                            // unassignedPartitionsQueue will be closed on a shutdown, so
                            // we don't block indefinitely
                            newPartitions = unassignedPartitionsQueue.getBatchBlocking();
                        }
                        if (newPartitions != null) {
                            reassignPartitions(newPartitions);
                        }
                    } catch (AbortedReassignmentException e) {
                        continue;
                    }
    
                    if (!hasAssignedPartitions) {
                        // Without assigned partitions KafkaConsumer.poll will throw an exception
                        continue;
                    }
    
                    // get the next batch of records, unless we did not manage to hand the old batch over
                    if (records == null) {
                        try {
    //通过kafkaAPI 拉取数据
                            records = consumer.poll(pollTimeout);
                        }
                        catch (WakeupException we) {
                            continue;
                        }
                    }
    
                    try {
    //handover对records进行"包装",供FlinkKafkaConsumer主线程消费
                        handover.produce(records);
                        records = null;
                    }
                    catch (Handover.WakeupException e) {
                        // fall through the loop
                    }
                }
                // end main fetch loop
            }
            catch (Throwable t) {
                // let the main thread know and exit
                // it may be that this exception comes because the main thread closed the handover, in
                // which case the below reporting is irrelevant, but does not hurt either
                handover.reportError(t);
            }
            finally {
                // make sure the handover is closed if it is not already closed or has an error
                handover.close();
    
                // make sure the KafkaConsumer is closed
                try {
                    consumer.close();
                }
                catch (Throwable t) {
                    log.warn("Error while closing Kafka consumer", t);
                }
            }
        }
    

    至此如何从kafka中拉取数据,已经介绍完了

    相关文章

      网友评论

        本文标题:Flink是如何从kafka中拉取数据的

        本文链接:https://www.haomeiwen.com/subject/dayybqtx.html