美文网首页Flink实战
Flink实战之Flink必须依赖Hadoop2.7+?

Flink实战之Flink必须依赖Hadoop2.7+?

作者: 〇白衣卿相〇 | 来源:发表于2020-12-07 00:06 被阅读0次

    背景

    最近在用filesystem connector来写hdfs,底层是通过StreamFileSink实现的。在看官方文档时,有几条注意事项,其中第一条如下:

    When using Hadoop < 2.7, please use the OnCheckpointRollingPolicy which rolls part files on every checkpoint. The reason is that if part files “traverse” the checkpoint interval, then, upon recovery from a failure the StreamingFileSink may use the truncate() method of the filesystem to discard uncommitted data from the in-progress file. This method is not supported by pre-2.7 Hadoop versions and Flink will throw an exception.

    当使用hadoop小于2.7版本时,请使用OnCheckpointRollingPolicy策略来滚动文件。原因是part file有可能跨越多个Checkpoint,当从失败恢复时,StreamingFileSink会使用truncate()方法来丢弃进行中文件当中未提交的部分。只有2.7+版本才支持truncate方法。

    具体什么场景下使用低于2.7的版本会出问题呢,于是做了验证。

    验证

    SQL任务

    通过编译不同版本的flink-hadoop-shaded包来测试,具体如何打包,有时间再开一片单独说明。
    经过测试同一个sql任务运行在hadoop 2.6和2.7版本,都可以正常从Checkpoint恢复。

    这就有点奇怪了,官网不是说会存在这样的场景吗?为什么sql任务不会有问题?具体原因往下面看。

    Streaming任务

    写了一个demo任务,代码如下:

    public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            env.enableCheckpointing(60000);
            env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", "xxx:9092");
            properties.setProperty("group.id", "test");
            DataStream<String> src = env
                    .addSource(new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties));
    
            //default策略
            src.addSink(StreamingFileSink
                    .forRowFormat(
                            new Path("hdfs://xxx/zs_test"),
                            new SimpleStringEncoder<String>("UTF-8"))
                    .withRollingPolicy(DefaultRollingPolicy.builder().build()).build());
    
            /*Checkpoint策略
            src.addSink(StreamingFileSink
                    .forRowFormat(
                            new Path("hdfs://xxx/zs_test"),
                            new SimpleStringEncoder<String>("UTF-8"))
                    .withRollingPolicy(OnCheckpointRollingPolicy.build()).build());
                    */
            env.execute("sink to hdfs");
        }
    

    Rolling Policy 就是用来决定文件什么时候从临时的变成正式文件(in-progress→finished),有Default 和OnCheckpoint两种。
    同时StreamingFileSink支持两种Format,RowFormat和BulkFormat。
    先针对RowFormat在两种不同策略下,对不同的hadoop版本的情况进行了测试。结果是OnCheckpoint策略下2.6和2.7版本都可以正常恢复,Default策略下在2.7版本可以恢复,2.6版本恢复不了。报错如下:

    2020-10-22 16:59:11
    java.io.IOException: Problem while truncating file: hdfs://xxxx/zs_test/2020-10-22--16/.part-2-5.inprogress.2848fb32-b428-45ab-8b85-f44f41f56e5d
    at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.safelyTruncateFile(HadoopRecoverableFsDataOutputStream.java:167)
    at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.<init>(HadoopRecoverableFsDataOutputStream.java:90)
    at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.recover(HadoopRecoverableWriter.java:83)
    at org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriterOutputStreamBasedBucketWriter.resumeInProgressFileFrom(OutputStreamBasedPartFileWriter.java:91) at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restoreInProgressFile(Bucket.java:134) at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.<init>(Bucket.java:121) at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:379) at org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:63) at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:176) at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:164) at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:148) at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.<init>(StreamingFileSinkHelper.java:74) at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:427) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:106) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258) at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambdabeforeInvoke0(StreamTask.java:473) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutorSynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
    at java.lang.Thread.run(Thread.java:748)
    Caused by: java.lang.IllegalStateException: Truncation is not available in hadoop version < 2.7 , You are on Hadoop 2.6.0
    at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.truncate(HadoopRecoverableFsDataOutputStream.java:197)
    at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.safelyTruncateFile(HadoopRecoverableFsDataOutputStream.java:165)
    ... 25 more

    报错信息一目了然,需要handoop 2.7+版本。
    至此基本可以得出结论了。然而还是验证了BulkFormat的场景,结果发现只支持OnCheckpoint 策略,结果也是都可以恢复。

    为什么

    为什么只有在RowFormat+Default策略的场景下才会对hadoop版本有要求,其他场景却没有,看看源码吧。

    SQL

    上面提到SQL任务时没有这个问题的。为什么呢,通过源码可以找到答案。正好也是对上一篇FileSystem Connector做一个补充。
    FileSystemTableSink#consumeDataStream

    public final DataStreamSink<RowData> consumeDataStream(DataStream<RowData> dataStream) {
            RowDataPartitionComputer computer = new RowDataPartitionComputer(
                    defaultPartName,
                    schema.getFieldNames(),
                    schema.getFieldDataTypes(),
                    partitionKeys.toArray(new String[0]));
    
            EmptyMetaStoreFactory metaStoreFactory = new EmptyMetaStoreFactory(path);
            OutputFileConfig outputFileConfig = OutputFileConfig.builder()
                    .withPartPrefix("part-" + UUID.randomUUID().toString())
                    .build();
            FileSystemFactory fsFactory = FileSystem::get;
            FileSystemWithUserFactory fsWithUserFactory = FileSystem::getWithUser;
    
            if (isBounded) {
                FileSystemOutputFormat.Builder<RowData> builder = new FileSystemOutputFormat.Builder<>();
                builder.setPartitionComputer(computer);
                builder.setDynamicGrouped(dynamicGrouping);
                builder.setPartitionColumns(partitionKeys.toArray(new String[0]));
                builder.setFormatFactory(createOutputFormatFactory());
                builder.setMetaStoreFactory(metaStoreFactory);
                builder.setFileSystemFactory(fsFactory);
                builder.setOverwrite(overwrite);
                builder.setStaticPartitions(staticPartitions);
                builder.setTempPath(toStagingPath());
                builder.setOutputFileConfig(outputFileConfig);
                return dataStream.writeUsingOutputFormat(builder.build())
                        .setParallelism(dataStream.getParallelism());
            } else {
            //流式任务都是无界的,所以走这里
                Configuration conf = new Configuration();
                properties.forEach(conf::setString);
                Object writer = createWriter();//根据配置的数据格式format觉得是哪种Format writer,比如parquet、orc都是bucket writer
                TableBucketAssigner assigner = new TableBucketAssigner(computer);
                TableRollingPolicy rollingPolicy = new TableRollingPolicy(// 注意这里TableRollingPolicy,是继承了CheckpointRollingPolicy的,所以sql任务都是用的Checkpoint策略
                        !(writer instanceof Encoder),
                        conf.get(SINK_ROLLING_POLICY_FILE_SIZE).getBytes(),
                        conf.get(SINK_ROLLING_POLICY_ROLLOVER_INTERVAL).toMillis());
    
                BucketsBuilder<RowData, String, ? extends BucketsBuilder<RowData, ?, ?>> bucketsBuilder;
                if (writer instanceof Encoder) {
                    //noinspection unchecked
                    bucketsBuilder = StreamingFileSink.forRowFormat(
                            path, new ProjectionEncoder((Encoder<RowData>) writer, computer))
                            .withBucketAssigner(assigner)
                            .withOutputFileConfig(outputFileConfig)
                            .withRollingPolicy(rollingPolicy);
                } else {
                    //noinspection unchecked
                    bucketsBuilder = StreamingFileSink.forBulkFormat(
                            path, new ProjectionBulkFactory((BulkWriter.Factory<RowData>) writer, computer))
                            .withBucketAssigner(assigner)
                            .withOutputFileConfig(outputFileConfig)
                            .withRollingPolicy(rollingPolicy);
                }
                return createStreamingSink(
                        conf,
                        path,
                        partitionKeys,
                        tableIdentifier,
                        overwrite,
                        dataStream,
                        bucketsBuilder,
                        metaStoreFactory,
                        fsFactory,
                        fsWithUserFactory,
                        conf.get(SINK_ROLLING_POLICY_CHECK_INTERVAL).toMillis());
            }
        }
    

    如上代码,Sql的filesystem connector默认使用的是Checkpoint策略,这个策略的作用就是在做Checkpoint时将所有的in-progress文件都rename成正式可读的文件。也就是说使用这个策略,只要Checkpoint成功了那么所有文件都是finished状态,没有in-progress。
    Default策略,是在满足文件大小或时间间隔时,在未来某个Checkpoint时进行rename,所以一个in-progress文件可能跨越多个Checkpoint,也就是包括in-progress状态的文件

    Streaming

    Streaming任务就可以灵活选择Format和策略了,结论就是上面说的。

    状态恢复

    当从上一个成功的Checkpoint恢复时,会调用initializeState

    public void initializeState(FunctionInitializationContext context) throws Exception {
            this.helper = new StreamingFileSinkHelper<>(
                    bucketsBuilder.createBuckets(getRuntimeContext().getIndexOfThisSubtask()),
                    context.isRestored(),
                    context.getOperatorStateStore(),
                    ((StreamingRuntimeContext) getRuntimeContext()).getProcessingTimeService(),
                    bucketCheckInterval);
        }
    

    初始化StreamingFileSinkHelper会调buckets.initializeState

    public StreamingFileSinkHelper(
                Buckets<IN, ?> buckets,
                boolean isRestored,
                OperatorStateStore stateStore,
                ProcessingTimeService procTimeService,
                long bucketCheckInterval) throws Exception {
            this.bucketCheckInterval = bucketCheckInterval;
            this.buckets = buckets;
            this.bucketStates = stateStore.getListState(BUCKET_STATE_DESC);
            this.maxPartCountersState = stateStore.getUnionListState(MAX_PART_COUNTER_STATE_DESC);
            this.procTimeService = procTimeService;
    
            if (isRestored) {
                buckets.initializeState(bucketStates, maxPartCountersState);
            }
    
            long currentProcessingTime = procTimeService.getCurrentProcessingTime();
            procTimeService.registerTimer(currentProcessingTime + bucketCheckInterval, this);
        }
    

    跟进去看

    private void initializeActiveBuckets(final ListState<byte[]> bucketStates) throws Exception {
            for (byte[] serializedRecoveredState : bucketStates.get()) {
                final BucketState<BucketID> recoveredState =
                        SimpleVersionedSerialization.readVersionAndDeSerialize(
                                bucketStateSerializer, serializedRecoveredState);//反序列化出BucketState这里如果Checkpoint中没有in-progress的文件,InProgressFileRecoverable就是null,否则不为null,这点很关键
                handleRestoredBucketState(recoveredState);
            }
        }
    
    private void handleRestoredBucketState(final BucketState<BucketID> recoveredState) throws Exception {
            final BucketID bucketId = recoveredState.getBucketId();
    
            if (LOG.isDebugEnabled()) {
                LOG.debug("Subtask {} restoring: {}", subtaskIndex, recoveredState);
            }
    
            final Bucket<IN, BucketID> restoredBucket = bucketFactory
                    .restoreBucket(
                            subtaskIndex,
                            maxPartCounter,
                            bucketWriter,
                            rollingPolicy,
                            recoveredState,
                            outputFileConfig
                    );
    
            updateActiveBucketId(bucketId, restoredBucket);
        }
    
    public Bucket<IN, BucketID> restoreBucket(
                final int subtaskIndex,
                final long initialPartCounter,
                final BucketWriter<IN, BucketID> bucketWriter,
                final RollingPolicy<IN, BucketID> rollingPolicy,
                final BucketState<BucketID> bucketState,
                final OutputFileConfig outputFileConfig) throws IOException {
    
            return Bucket.restore(
                    subtaskIndex,
                    initialPartCounter,
                    bucketWriter,
                    rollingPolicy,
                    bucketState,
                    outputFileConfig);
        }
    
    static <IN, BucketID> Bucket<IN, BucketID> restore(
                final int subtaskIndex,
                final long initialPartCounter,
                final BucketWriter<IN, BucketID> bucketWriter,
                final RollingPolicy<IN, BucketID> rollingPolicy,
                final BucketState<BucketID> bucketState,
                final OutputFileConfig outputFileConfig) throws IOException {
            return new Bucket<>(subtaskIndex, initialPartCounter, bucketWriter, rollingPolicy, bucketState, outputFileConfig);
        }
        
    private Bucket(
                final int subtaskIndex,
                final long initialPartCounter,
                final BucketWriter<IN, BucketID> partFileFactory,
                final RollingPolicy<IN, BucketID> rollingPolicy,
                final BucketState<BucketID> bucketState,
                final OutputFileConfig outputFileConfig) throws IOException {
    
            this(
                    subtaskIndex,
                    bucketState.getBucketId(),
                    bucketState.getBucketPath(),
                    initialPartCounter,
                    partFileFactory,
                    rollingPolicy,
                    outputFileConfig);
    
            restoreInProgressFile(bucketState);//恢复InProgressFile
            commitRecoveredPendingFiles(bucketState);
        }
        
    private void restoreInProgressFile(final BucketState<BucketID> state) throws IOException {
            if (!state.hasInProgressFileRecoverable()) {//
                return;
            }
    
            // we try to resume the previous in-progress file
            final InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable = state.getInProgressFileRecoverable();
    
            if (bucketWriter.getProperties().supportsResume()) {
                inProgressPart = bucketWriter.resumeInProgressFileFrom(
                        bucketId, inProgressFileRecoverable, state.getInProgressFileCreationTime());
            } else {
                // if the writer does not support resume, then we close the
                // in-progress part and commit it, as done in the case of pending files.
                bucketWriter.recoverPendingFile(inProgressFileRecoverable).commitAfterRecovery();
            }
        }
    

    关键点在于!state.hasInProgressFileRecoverable(),如果没有in-progress的文件,这里就直接return了,反之才会走到下面。

    public InProgressFileWriter<IN, BucketID> resumeInProgressFileFrom(final BucketID bucketID, final InProgressFileRecoverable inProgressFileRecoverable, final long creationTime) throws IOException {
                final OutputStreamBasedInProgressFileRecoverable outputStreamBasedInProgressRecoverable = (OutputStreamBasedInProgressFileRecoverable) inProgressFileRecoverable;
                return resumeFrom(
                    bucketID,
                    recoverableWriter.recover(outputStreamBasedInProgressRecoverable.getResumeRecoverable()),//recover
                    outputStreamBasedInProgressRecoverable.getResumeRecoverable(),
                    creationTime);
            }
    
    

    看下hadoop recover的实现

    public RecoverableFsDataOutputStream recover(ResumeRecoverable recoverable) throws IOException {
            if (recoverable instanceof HadoopFsRecoverable) {
                return new HadoopRecoverableFsDataOutputStream(fs, (HadoopFsRecoverable) recoverable);
            }
            else {
                throw new IllegalArgumentException(
                        "Hadoop File System cannot recover a recoverable for another file system: " + recoverable);
            }
        }
    
    HadoopRecoverableFsDataOutputStream(
                FileSystem fs,
                HadoopFsRecoverable recoverable) throws IOException {
    
            ensureTruncateInitialized();
    
            this.fs = checkNotNull(fs);
            this.targetFile = checkNotNull(recoverable.targetFile());
            this.tempFile = checkNotNull(recoverable.tempFile());
    
            safelyTruncateFile(fs, tempFile, recoverable);
    
            out = fs.append(tempFile);
    
            // sanity check
            long pos = out.getPos();
            if (pos != recoverable.offset()) {
                IOUtils.closeQuietly(out);
                throw new IOException("Truncate failed: " + tempFile +
                        " (requested=" + recoverable.offset() + " ,size=" + pos + ')');
            }
        }
    
    private static void safelyTruncateFile(
                final FileSystem fileSystem,
                final Path path,
                final HadoopFsRecoverable recoverable) throws IOException {
    
            ensureTruncateInitialized();
    
            waitUntilLeaseIsRevoked(fileSystem, path);
    
            // truncate back and append
            boolean truncated;
            try {
                truncated = private static void safelyTruncateFile(
                final FileSystem fileSystem,
                final Path path,
                final HadoopFsRecoverable recoverable) throws IOException {
    
            ensureTruncateInitialized();
    
            waitUntilLeaseIsRevoked(fileSystem, path);
    
            // truncate back and append
            boolean truncated;
            try {
                truncated = truncate(fileSystem, path, recoverable.offset());
            } catch (Exception e) {
                throw new IOException("Problem while truncating file: " + path, e);
            }
    
            if (!truncated) {
                // Truncate did not complete immediately, we must wait for
                // the operation to complete and release the lease.
                waitUntilLeaseIsRevoked(fileSystem, path);
            }
        }(fileSystem, path, recoverable.offset());
            } catch (Exception e) {
                throw new IOException("Problem while truncating file: " + path, e);
            }
    
            if (!truncated) {
                // Truncate did not complete immediately, we must wait for
                // the operation to complete and release the lease.
                waitUntilLeaseIsRevoked(fileSystem, path);
            }
        }
    

    对hadoop版本的判断在truncate方法里

    private static boolean truncate(final FileSystem hadoopFs, final Path file, final long length) throws IOException {
            if (!HadoopUtils.isMinHadoopVersion(2, 7)) {
                throw new IllegalStateException("Truncation is not available in hadoop version < 2.7 , You are on Hadoop " + VersionInfo.getVersion());
            }
    
            if (truncateHandle != null) {
                try {
                    return (Boolean) truncateHandle.invoke(hadoopFs, file, length);
                }
                catch (InvocationTargetException e) {
                    ExceptionUtils.rethrowIOException(e.getTargetException());
                }
                catch (Throwable t) {
                    throw new IOException(
                            "Truncation of file failed because of access/linking problems with Hadoop's truncate call. " +
                                    "This is most likely a dependency conflict or class loading problem.");
                }
            }
            else {
                throw new IllegalStateException("Truncation handle has not been initialized");
            }
            return false;
        }
    

    如果hadoop版本<2.7就会抛出异常,和上面的异常信息吻合。到此算是明白了。

    结论

    只有在BulkFormat+Default策略下才会有hadoop版本的要求。

    相关文章

      网友评论

        本文标题:Flink实战之Flink必须依赖Hadoop2.7+?

        本文链接:https://www.haomeiwen.com/subject/aytewktx.html