美文网首页Flink源码阅读系列
Flink源码阅读之FileSystem Connector

Flink源码阅读之FileSystem Connector

作者: 〇白衣卿相〇 | 来源:发表于2020-12-04 23:30 被阅读0次

    代码在flink-table-runtime-blink模块,用户指南参考官网.

    目前是旧的实现方式,将会按FLIP-95重新实现FLINK-19336

    入口类FileSystemTableFactory,如何做Factory discover的可以参考之前的博文,这里就不赘述了。

    Sink

    构造FileSystemTableSink对象,传入相关属性参数

    public TableSink<RowData> createTableSink(TableSinkFactory.Context context) {
            Configuration conf = new Configuration();
            context.getTable().getOptions().forEach(conf::setString);
    
            return new FileSystemTableSink(
                    context.getObjectIdentifier(),//connector标识符
                    context.isBounded(),//是否有界流
                    context.getTable().getSchema(),//表的schema
                    getPath(conf),//file 路径
                    context.getTable().getPartitionKeys(),//分区key
                    conf.get(PARTITION_DEFAULT_NAME),//默认分区名称
                    context.getTable().getOptions());//参数
        }
    

    FileSystemTableSink会根据DataStream构造DataStreamSink

    consumeDataStream主要做几个事情:

    1. 构造RowDataPartitionComputer,将分区字段和非分区字段index和type分开。
    2. EmptyMetaStoreFactory空的metastore实现。
    3. UUID生成文件前缀
    4. 构造FileSystemFactory的实现
    5. 根据是否有界流走不同分支处理
    public final DataStreamSink<RowData> consumeDataStream(DataStream<RowData> dataStream) {
            RowDataPartitionComputer computer = new RowDataPartitionComputer(
                    defaultPartName,
                    schema.getFieldNames(),
                    schema.getFieldDataTypes(),
                    partitionKeys.toArray(new String[0]));
    
            EmptyMetaStoreFactory metaStoreFactory = new EmptyMetaStoreFactory(path);
            OutputFileConfig outputFileConfig = OutputFileConfig.builder()
                    .withPartPrefix("part-" + UUID.randomUUID().toString())
                    .build();
            FileSystemFactory fsFactory = FileSystem::get;
    
            if (isBounded) {
                FileSystemOutputFormat.Builder<RowData> builder = new FileSystemOutputFormat.Builder<>();
                builder.setPartitionComputer(computer);
                builder.setDynamicGrouped(dynamicGrouping);
                builder.setPartitionColumns(partitionKeys.toArray(new String[0]));
                builder.setFormatFactory(createOutputFormatFactory());
                builder.setMetaStoreFactory(metaStoreFactory);
                builder.setFileSystemFactory(fsFactory);
                builder.setOverwrite(overwrite);
                builder.setStaticPartitions(staticPartitions);
                builder.setTempPath(toStagingPath());
                builder.setOutputFileConfig(outputFileConfig);
                return dataStream.writeUsingOutputFormat(builder.build())
                        .setParallelism(dataStream.getParallelism());
            } else {
                Configuration conf = new Configuration();
                properties.forEach(conf::setString);
                Object writer = createWriter();
                TableBucketAssigner assigner = new TableBucketAssigner(computer);
                TableRollingPolicy rollingPolicy = new TableRollingPolicy(
                        !(writer instanceof Encoder),
                        conf.get(SINK_ROLLING_POLICY_FILE_SIZE).getBytes(),
                        conf.get(SINK_ROLLING_POLICY_ROLLOVER_INTERVAL).toMillis());
    
                BucketsBuilder<RowData, String, ? extends BucketsBuilder<RowData, ?, ?>> bucketsBuilder;
                if (writer instanceof Encoder) {
                    //noinspection unchecked
                    bucketsBuilder = StreamingFileSink.forRowFormat(
                            path, new ProjectionEncoder((Encoder<RowData>) writer, computer))
                            .withBucketAssigner(assigner)
                            .withOutputFileConfig(outputFileConfig)
                            .withRollingPolicy(rollingPolicy);
                } else {
                    //noinspection unchecked
                    bucketsBuilder = StreamingFileSink.forBulkFormat(
                            path, new ProjectionBulkFactory((BulkWriter.Factory<RowData>) writer, computer))
                            .withBucketAssigner(assigner)
                            .withOutputFileConfig(outputFileConfig)
                            .withRollingPolicy(rollingPolicy);
                }
                return createStreamingSink(
                        conf,
                        path,
                        partitionKeys,
                        tableIdentifier,
                        overwrite,
                        dataStream,
                        bucketsBuilder,
                        metaStoreFactory,
                        fsFactory,
                        conf.get(SINK_ROLLING_POLICY_CHECK_INTERVAL).toMillis());
            }
        }
    

    一般流式任务都是无界流,所以走else分支:

    1. 根据format类型创建Writer对象,比如parquet,是从BulkWriter创建来的
    2. 用TableBucketAssigner包装RowDataPartitionComputer
    3. 构造TableRollingPolicy,用于文件的生成策略,BulkWriter是根据checkpoint的执行来生成文件
    4. 构造BucketsBuilder对象

    createStreamingSink

    1. BucketsBuilder包装成StreamingFileWriter,这是个operator,继承了AbstractStreamOperator
    2. 在inputStream后增加了一个operator,主要处理逻辑在这个operator里面
    3. 如果配置了sink.partition-commit.policy.kind,则会进行commit处理,比如维护partition到metastore或者生成_success文件,同样也是增加了一个operator
    4. 最后通过一个DiscardingSink function将数据丢弃,因为数据在上面operator已经处理过了
    public static DataStreamSink<RowData> createStreamingSink(
                Configuration conf,
                Path path,
                List<String> partitionKeys,
                ObjectIdentifier tableIdentifier,
                boolean overwrite,
                DataStream<RowData> inputStream,
                BucketsBuilder<RowData, String, ? extends BucketsBuilder<RowData, ?, ?>> bucketsBuilder,
                TableMetaStoreFactory msFactory,
                FileSystemFactory fsFactory,
                long rollingCheckInterval) {
            if (overwrite) {
                throw new IllegalStateException("Streaming mode not support overwrite.");
            }
    
            StreamingFileWriter fileWriter = new StreamingFileWriter(
                    rollingCheckInterval,
                    bucketsBuilder);
            DataStream<CommitMessage> writerStream = inputStream.transform(
                    StreamingFileWriter.class.getSimpleName(),
                    TypeExtractor.createTypeInfo(CommitMessage.class),
                    fileWriter).setParallelism(inputStream.getParallelism());
    
            DataStream<?> returnStream = writerStream;
    
            // save committer when we don't need it.
            if (partitionKeys.size() > 0 && conf.contains(SINK_PARTITION_COMMIT_POLICY_KIND)) {
                StreamingFileCommitter committer = new StreamingFileCommitter(
                        path, tableIdentifier, partitionKeys, msFactory, fsFactory, conf);
                returnStream = writerStream
                        .transform(StreamingFileCommitter.class.getSimpleName(), Types.VOID, committer)
                        .setParallelism(1)
                        .setMaxParallelism(1);
            }
            //noinspection unchecked
            return returnStream.addSink(new DiscardingSink()).setParallelism(1);
        }
    

    PS:这里有个java8的函数式接口的写法,第一次接触的同学可能会有点蒙,如果接口只有一个抽象方法,那么接口就是函数式接口,实现方式可以有很多种,最常见的就是使用匿名内部类,还有就是使用lambda或构造器引用来实现。如下,

    FileSystemFactory fsFactory = FileSystem::get;
    //等同于 匿名类
            FileSystemFactory fileSystemFactory = new FileSystemFactory() {
                public FileSystem create(URI fsUri) throws IOException {
                    return FileSystem.get(fsUri);
                }
            };
    
    //      等同于 lambda
            FileSystemFactory fileSystemFactory = uri -> FileSystem.get(uri);
    
    

    数据写入filesystem

    数据处理在StreamingFileWriter#processElement

    public void processElement(StreamRecord<RowData> element) throws Exception {
            helper.onElement(
                    element.getValue(),
                    getProcessingTimeService().getCurrentProcessingTime(),
                    element.hasTimestamp() ? element.getTimestamp() : null,
                    currentWatermark);
        }
    

    在此之前会在initializeState中通过BucketsBuilder创建Buckets,并封装到StreamingFileSinkHelper中

    @Override
        public void initializeState(StateInitializationContext context) throws Exception {
            super.initializeState(context);
            buckets = bucketsBuilder.createBuckets(getRuntimeContext().getIndexOfThisSubtask());
    
            // Set listener before the initialization of Buckets.
            inactivePartitions = new HashSet<>();
            buckets.setBucketLifeCycleListener(new BucketLifeCycleListener<RowData, String>() {
                @Override
                public void bucketCreated(Bucket<RowData, String> bucket) {
                }
    
                @Override
                public void bucketInactive(Bucket<RowData, String> bucket) {
                    inactivePartitions.add(bucket.getBucketId());
                }
            });
    
            helper = new StreamingFileSinkHelper<>(
                    buckets,
                    context.isRestored(),
                    context.getOperatorStateStore(),
                    getRuntimeContext().getProcessingTimeService(),
                    bucketCheckInterval);
            currentWatermark = Long.MIN_VALUE;
        }
    

    回到processElement,跟进代码你会发现最终数据会由Bucket的write写入文件

    void write(IN element, long currentTime) throws IOException {
      //判断是否有inprogress的文件,如果没有则新起一个
            if (inProgressPart == null || rollingPolicy.shouldRollOnEvent(inProgressPart, element)) {
    
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Subtask {} closing in-progress part file for bucket id={} due to element {}.",
                            subtaskIndex, bucketId, element);
                }
    
                inProgressPart = rollPartFile(currentTime);
            }
            inProgressPart.write(element, currentTime);
        }
    

    最终通过调用第三方包中write的方式写入文件系统,如 hadoop、hive、parquet、orc等

    checkpoint

    做cp的是snapshotState方法,主要逻辑在Buckets类中

    public void snapshotState(
                final long checkpointId,
                final ListState<byte[]> bucketStatesContainer,
                final ListState<Long> partCounterStateContainer) throws Exception {
    
            Preconditions.checkState(
                bucketWriter != null && bucketStateSerializer != null,
                    "sink has not been initialized");
    
            LOG.info("Subtask {} checkpointing for checkpoint with id={} (max part counter={}).",
                    subtaskIndex, checkpointId, maxPartCounter);
    
            bucketStatesContainer.clear();
            partCounterStateContainer.clear();
    
            snapshotActiveBuckets(checkpointId, bucketStatesContainer);
            partCounterStateContainer.add(maxPartCounter);
        }
        
    private void snapshotActiveBuckets(
                final long checkpointId,
                final ListState<byte[]> bucketStatesContainer) throws Exception {
    
            for (Bucket<IN, BucketID> bucket : activeBuckets.values()) {
                final BucketState<BucketID> bucketState = bucket.onReceptionOfCheckpoint(checkpointId);
    
                final byte[] serializedBucketState = SimpleVersionedSerialization
                        .writeVersionAndSerialize(bucketStateSerializer, bucketState);
    
                bucketStatesContainer.add(serializedBucketState);
    
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Subtask {} checkpointing: {}", subtaskIndex, bucketState);
                }
            }
        }
    

    这里会对active状态的Bucket进行snapshot

    BucketState<BucketID> onReceptionOfCheckpoint(long checkpointId) throws IOException {
            prepareBucketForCheckpointing(checkpointId);
    
            InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable = null;
            long inProgressFileCreationTime = Long.MAX_VALUE;
    
            if (inProgressPart != null) {
                inProgressFileRecoverable = inProgressPart.persist();
                inProgressFileCreationTime = inProgressPart.getCreationTime();
                this.inProgressFileRecoverablesPerCheckpoint.put(checkpointId, inProgressFileRecoverable);
            }
    
            return new BucketState<>(bucketId, bucketPath, inProgressFileCreationTime, inProgressFileRecoverable, pendingFileRecoverablesPerCheckpoint);//返回BucketState,用于序列化
        }
        
    private void prepareBucketForCheckpointing(long checkpointId) throws IOException {
            if (inProgressPart != null && rollingPolicy.shouldRollOnCheckpoint(inProgressPart)) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Subtask {} closing in-progress part file for bucket id={} on checkpoint.", subtaskIndex, bucketId);
                }
                closePartFile();
            }
    
            if (!pendingFileRecoverablesForCurrentCheckpoint.isEmpty()) {
                pendingFileRecoverablesPerCheckpoint.put(checkpointId, pendingFileRecoverablesForCurrentCheckpoint);
                pendingFileRecoverablesForCurrentCheckpoint = new ArrayList<>();//重置
            }
        }
    

    核心逻辑在closePartFile中,将inprogress状态的文件关闭并由内存提交到文件系统中,得到pendingFileRecoverable对象并存储到pendingFileRecoverablesForCurrentCheckpoint列表里,为snapshot准备。

    private InProgressFileWriter.PendingFileRecoverable closePartFile() throws IOException {
            InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable = null;
            if (inProgressPart != null) {
                pendingFileRecoverable = inProgressPart.closeForCommit();
                pendingFileRecoverablesForCurrentCheckpoint.add(pendingFileRecoverable);
                inProgressPart = null;//置位null
            }
            return pendingFileRecoverable;
        }
    

    写入中的文件是in progress,此时是不可以读取的,什么时候才可以被下游读取,取决于文件什么时候提交。上一步已经将数据写入文件了,但是还没有正式提交。我们知道checkpoint的几个步骤,不了解的可以参考之前的博文,在最后一步checkpointcoordinator会调用各operator的notifyCheckpointComplete方法。

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
            super.notifyCheckpointComplete(checkpointId);
            commitUpToCheckpoint(checkpointId);
        }
    
    public void commitUpToCheckpoint(final long checkpointId) throws IOException {
            final Iterator<Map.Entry<BucketID, Bucket<IN, BucketID>>> activeBucketIt =
                    activeBuckets.entrySet().iterator();
    
            LOG.info("Subtask {} received completion notification for checkpoint with id={}.", subtaskIndex, checkpointId);
    
            while (activeBucketIt.hasNext()) {
                final Bucket<IN, BucketID> bucket = activeBucketIt.next().getValue();
                bucket.onSuccessfulCompletionOfCheckpoint(checkpointId);
    
                if (!bucket.isActive()) {//由于前面一系列清理动作,这里的bucket将不会是active状态
                    // We've dealt with all the pending files and the writer for this bucket is not currently open.
                    // Therefore this bucket is currently inactive and we can remove it from our state.
                    activeBucketIt.remove();
                    notifyBucketInactive(bucket);
                }
            }
        }
    

    文件的提交是在Bucket的onSuccessfulCompletionOfCheckpoint

    void onSuccessfulCompletionOfCheckpoint(long checkpointId) throws IOException {
            checkNotNull(bucketWriter);
    
            Iterator<Map.Entry<Long, List<InProgressFileWriter.PendingFileRecoverable>>> it =
                    pendingFileRecoverablesPerCheckpoint.headMap(checkpointId, true)
                            .entrySet().iterator();
    
            while (it.hasNext()) {
                Map.Entry<Long, List<InProgressFileWriter.PendingFileRecoverable>> entry = it.next();
    
                for (InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable : entry.getValue()) {
                    bucketWriter.recoverPendingFile(pendingFileRecoverable).commit();
                }
                it.remove();
            }
    
            cleanupInProgressFileRecoverables(checkpointId);
        }
    

    在commit方法中对文件进行重命名,使其能够被下游读取,比如hadoop的commit实现

    @Override
            public void commit() throws IOException {
                final Path src = recoverable.tempFile();
                final Path dest = recoverable.targetFile();
                final long expectedLength = recoverable.offset();
    
                final FileStatus srcStatus;
                try {
                    srcStatus = fs.getFileStatus(src);
                }
                catch (IOException e) {
                    throw new IOException("Cannot clean commit: Staging file does not exist.");
                }
    
                if (srcStatus.getLen() != expectedLength) {
                    // something was done to this file since the committer was created.
                    // this is not the "clean" case
                    throw new IOException("Cannot clean commit: File has trailing junk data.");
                }
    
                try {
                    fs.rename(src, dest);
                }
                catch (IOException e) {
                    throw new IOException("Committing file by rename failed: " + src + " to " + dest, e);
                }
            }
    

    最后会对InprogressFile的一些状态做清理工作

    private void cleanupInProgressFileRecoverables(long checkpointId) throws IOException {
            Iterator<Map.Entry<Long, InProgressFileWriter.InProgressFileRecoverable>> it =
                    inProgressFileRecoverablesPerCheckpoint.headMap(checkpointId, false)
                            .entrySet().iterator();
    
            while (it.hasNext()) {
                final InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable = it.next().getValue();
    
                // this check is redundant, as we only put entries in the inProgressFileRecoverablesPerCheckpoint map
                // list when the requiresCleanupOfInProgressFileRecoverableState() returns true, but having it makes
                // the code more readable.
    
                final boolean successfullyDeleted = bucketWriter.cleanupInProgressFileRecoverable(inProgressFileRecoverable);//除了s3,都返回false
                if (LOG.isDebugEnabled() && successfullyDeleted) {
                    LOG.debug("Subtask {} successfully deleted incomplete part for bucket id={}.", subtaskIndex, bucketId);
                }
                it.remove();//清除
            }
        }
    

    partition commit

    分区提交的触发以及提交的策略。
    触发条件分为process-time和partition-time。
    process time的原理是当前Checkpoint需要提交的分区和当前系统时间注册到pendingPartitions map中,在提交时判断注册时间+delay是否小于当前系统时间来确定是否需要提交分区,如果delay=0直接提交。
    所以如果delay=0立即提交,如果有数据延迟的话可能导致该分区过早的提交。如果delay=分区大小,那么就是在Checkpoint间隔+delay后提交上一次Checkpoint需要提交的分区。

    @Override
        public void addPartition(String partition) {
            if (!StringUtils.isNullOrWhitespaceOnly(partition)) {
                this.pendingPartitions.putIfAbsent(partition, procTimeService.getCurrentProcessingTime());
            }
        }
    
        @Override
        public List<String> committablePartitions(long checkpointId) {
            List<String> needCommit = new ArrayList<>();
            long currentProcTime = procTimeService.getCurrentProcessingTime();
            Iterator<Map.Entry<String, Long>> iter = pendingPartitions.entrySet().iterator();
            while (iter.hasNext()) {
                Map.Entry<String, Long> entry = iter.next();
                long creationTime = entry.getValue();
                if (commitDelay == 0 || currentProcTime > creationTime + commitDelay) {
                    needCommit.add(entry.getKey());
                    iter.remove();
                }
            }
            return needCommit;
        }
    

    partition time的原理是基于watermark是否达到分区时间+delay来判断是否要提交。

    @Override
        public void addPartition(String partition) {
            if (!StringUtils.isNullOrWhitespaceOnly(partition)) {
                this.pendingPartitions.add(partition);
            }
        }
    
        @Override
        public List<String> committablePartitions(long checkpointId) {
            if (!watermarks.containsKey(checkpointId)) {
                throw new IllegalArgumentException(String.format(
                        "Checkpoint(%d) has not been snapshot. The watermark information is: %s.",
                        checkpointId, watermarks));
            }
    
            long watermark = watermarks.get(checkpointId);
            watermarks.headMap(checkpointId, true).clear();
    
            List<String> needCommit = new ArrayList<>();
            Iterator<String> iter = pendingPartitions.iterator();
            while (iter.hasNext()) {
                String partition = iter.next();
                LocalDateTime partTime = extractor.extract(
                        partitionKeys, extractPartitionValues(new Path(partition)));//根据path来抽取时间,比如partition='day=2020-12-01/hour=11/minute=11' 转换成 2020-12-01 11:11:00
                if (watermark > toMills(partTime) + commitDelay) {
                    needCommit.add(partition);
                    iter.remove();
                }
            }
            return needCommit;
        }
    

    Source

    读取数据相对于写入数据要简单些。

    创建FileSystemTableSource对象

    public TableSource<RowData> createTableSource(TableSourceFactory.Context context) {
            Configuration conf = new Configuration();
            context.getTable().getOptions().forEach(conf::setString);
    
            return new FileSystemTableSource(
                    context.getTable().getSchema(),
                    getPath(conf),
                    context.getTable().getPartitionKeys(),
                    conf.get(PARTITION_DEFAULT_NAME),
                    context.getTable().getProperties());
        }
    

    构造source function,传入input format用于读取源数据。

    public DataStream<RowData> getDataStream(StreamExecutionEnvironment execEnv) {
       @SuppressWarnings("unchecked")
       TypeInformation<RowData> typeInfo =
             (TypeInformation<RowData>) TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(getProducedDataType());
       // Avoid using ContinuousFileMonitoringFunction
       InputFormatSourceFunction<RowData> func = new InputFormatSourceFunction<>(getInputFormat(), typeInfo);
       DataStreamSource<RowData> source = execEnv.addSource(func, explainSource(), typeInfo);
       return source.name(explainSource());
    }
    

    在run方法中,循环读取数据,发送到下游算子

    public void run(SourceContext<OUT> ctx) throws Exception {
            try {
    
                Counter completedSplitsCounter = getRuntimeContext().getMetricGroup().counter("numSplitsProcessed");
                if (isRunning && format instanceof RichInputFormat) {
                    ((RichInputFormat) format).openInputFormat();
                }
    
                OUT nextElement = serializer.createInstance();
                while (isRunning) {
                    format.open(splitIterator.next());
    
                    // for each element we also check if cancel
                    // was called by checking the isRunning flag
    
                    while (isRunning && !format.reachedEnd()) {
                        nextElement = format.nextRecord(nextElement);
                        if (nextElement != null) {
                            ctx.collect(nextElement);
                        } else {
                            break;
                        }
                    }
                    format.close();
                    completedSplitsCounter.inc();
    
                    if (isRunning) {
                        isRunning = splitIterator.hasNext();
                    }
                }
            } finally {
                format.close();
                if (format instanceof RichInputFormat) {
                    ((RichInputFormat) format).closeInputFormat();
                }
                isRunning = false;
            }
        }
    

    相关文章

      网友评论

        本文标题:Flink源码阅读之FileSystem Connector

        本文链接:https://www.haomeiwen.com/subject/getewktx.html