美文网首页
聊聊flink的TextOutputFormat

聊聊flink的TextOutputFormat

作者: go4it | 来源:发表于2018-12-03 10:23 被阅读116次

    本文主要研究一下flink的TextOutputFormat

    DataStream.writeAsText

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.java

        /**
         * Writes a DataStream to the file specified by path in text format.
         *
         * <p>For every element of the DataStream the result of {@link Object#toString()} is written.
         *
         * @param path
         *            The path pointing to the location the text file is written to
         * @param writeMode
         *            Controls the behavior for existing files. Options are
         *            NO_OVERWRITE and OVERWRITE.
         *
         * @return The closed DataStream.
         */
        @PublicEvolving
        public DataStreamSink<T> writeAsText(String path, WriteMode writeMode) {
            TextOutputFormat<T> tof = new TextOutputFormat<>(new Path(path));
            tof.setWriteMode(writeMode);
            return writeUsingOutputFormat(tof);
        }
    
        /**
         * Writes the dataStream into an output, described by an OutputFormat.
         *
         * <p>The output is not participating in Flink's checkpointing!
         *
         * <p>For writing to a file system periodically, the use of the "flink-connector-filesystem"
         * is recommended.
         *
         * @param format The output format
         * @return The closed DataStream
         */
        @PublicEvolving
        public DataStreamSink<T> writeUsingOutputFormat(OutputFormat<T> format) {
            return addSink(new OutputFormatSinkFunction<>(format));
        }
    
    • DataStream的writeAsText方法创建了TextOutputFormat,然后通过OutputFormatSinkFunction包装为sink function

    TextOutputFormat

    flink-java-1.7.0-sources.jar!/org/apache/flink/api/java/io/TextOutputFormat.java

    /**
     * A {@link FileOutputFormat} that writes objects to a text file.
     *
     * <p>Objects are converted to Strings using either {@link Object#toString()} or a {@link TextFormatter}.
     * @param <T> type of elements
     */
    @PublicEvolving
    public class TextOutputFormat<T> extends FileOutputFormat<T> {
    
        private static final long serialVersionUID = 1L;
    
        private static final int NEWLINE = '\n';
    
        private String charsetName;
    
        private transient Charset charset;
    
        // --------------------------------------------------------------------------------------------
    
    
        /**
         * Formatter that transforms values into their {@link String} representations.
         * @param <IN> type of input elements
         */
        public interface TextFormatter<IN> extends Serializable {
            String format(IN value);
        }
    
        public TextOutputFormat(Path outputPath) {
            this(outputPath, "UTF-8");
        }
    
        public TextOutputFormat(Path outputPath, String charset) {
            super(outputPath);
            this.charsetName = charset;
        }
    
        public String getCharsetName() {
            return charsetName;
        }
    
        public void setCharsetName(String charsetName) throws IllegalCharsetNameException, UnsupportedCharsetException {
            if (charsetName == null) {
                throw new NullPointerException();
            }
    
            if (!Charset.isSupported(charsetName)) {
                throw new UnsupportedCharsetException("The charset " + charsetName + " is not supported.");
            }
    
            this.charsetName = charsetName;
        }
    
        // --------------------------------------------------------------------------------------------
    
        @Override
        public void open(int taskNumber, int numTasks) throws IOException {
            super.open(taskNumber, numTasks);
    
            try {
                this.charset = Charset.forName(charsetName);
            }
            catch (IllegalCharsetNameException e) {
                throw new IOException("The charset " + charsetName + " is not valid.", e);
            }
            catch (UnsupportedCharsetException e) {
                throw new IOException("The charset " + charsetName + " is not supported.", e);
            }
        }
    
        @Override
        public void writeRecord(T record) throws IOException {
            byte[] bytes = record.toString().getBytes(charset);
            this.stream.write(bytes);
            this.stream.write(NEWLINE);
        }
    
        // --------------------------------------------------------------------------------------------
    
        @Override
        public String toString() {
            return "TextOutputFormat (" + getOutputFilePath() + ") - " + this.charsetName;
        }
    }
    
    • TextOutputFormat继承了FileOutputFormat,其open方法主要是调用FileOutputFormat的open方法,而writeRecord方法则直接往stream进行write,写完一条record之后再写一个换行(\n)

    FileOutputFormat

    flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/io/FileOutputFormat.java

    /**
     * The abstract base class for all Rich output formats that are file based. Contains the logic to
     * open/close the target
     * file streams.
     */
    @Public
    public abstract class FileOutputFormat<IT> extends RichOutputFormat<IT> implements InitializeOnMaster, CleanupWhenUnsuccessful {
        //......
    
        /**
         * Initialization of the distributed file system if it is used.
         *
         * @param parallelism The task parallelism.
         */
        @Override
        public void initializeGlobal(int parallelism) throws IOException {
            final Path path = getOutputFilePath();
            final FileSystem fs = path.getFileSystem();
            
            // only distributed file systems can be initialized at start-up time.
            if (fs.isDistributedFS()) {
                
                final WriteMode writeMode = getWriteMode();
                final OutputDirectoryMode outDirMode = getOutputDirectoryMode();
    
                if (parallelism == 1 && outDirMode == OutputDirectoryMode.PARONLY) {
                    // output is not written in parallel and should be written to a single file.
                    // prepare distributed output path
                    if(!fs.initOutPathDistFS(path, writeMode, false)) {
                        // output preparation failed! Cancel task.
                        throw new IOException("Output path could not be initialized.");
                    }
    
                } else {
                    // output should be written to a directory
    
                    // only distributed file systems can be initialized at start-up time.
                    if(!fs.initOutPathDistFS(path, writeMode, true)) {
                        throw new IOException("Output directory could not be created.");
                    }
                }
            }
        }
        
        @Override
        public void tryCleanupOnError() {
            if (this.fileCreated) {
                this.fileCreated = false;
                
                try {
                    close();
                } catch (IOException e) {
                    LOG.error("Could not properly close FileOutputFormat.", e);
                }
    
                try {
                    FileSystem.get(this.actualFilePath.toUri()).delete(actualFilePath, false);
                } catch (FileNotFoundException e) {
                    // ignore, may not be visible yet or may be already removed
                } catch (Throwable t) {
                    LOG.error("Could not remove the incomplete file " + actualFilePath + '.', t);
                }
            }
        }
    
        @Override
        public void configure(Configuration parameters) {
            // get the output file path, if it was not yet set
            if (this.outputFilePath == null) {
                // get the file parameter
                String filePath = parameters.getString(FILE_PARAMETER_KEY, null);
                if (filePath == null) {
                    throw new IllegalArgumentException("The output path has been specified neither via constructor/setters" +
                            ", nor via the Configuration.");
                }
                
                try {
                    this.outputFilePath = new Path(filePath);
                }
                catch (RuntimeException rex) {
                    throw new RuntimeException("Could not create a valid URI from the given file path name: " + rex.getMessage()); 
                }
            }
            
            // check if have not been set and use the defaults in that case
            if (this.writeMode == null) {
                this.writeMode = DEFAULT_WRITE_MODE;
            }
            
            if (this.outputDirectoryMode == null) {
                this.outputDirectoryMode = DEFAULT_OUTPUT_DIRECTORY_MODE;
            }
        }
    
        @Override
        public void open(int taskNumber, int numTasks) throws IOException {
            if (taskNumber < 0 || numTasks < 1) {
                throw new IllegalArgumentException("TaskNumber: " + taskNumber + ", numTasks: " + numTasks);
            }
            
            if (LOG.isDebugEnabled()) {
                LOG.debug("Opening stream for output (" + (taskNumber+1) + "/" + numTasks + "). WriteMode=" + writeMode +
                        ", OutputDirectoryMode=" + outputDirectoryMode);
            }
            
            Path p = this.outputFilePath;
            if (p == null) {
                throw new IOException("The file path is null.");
            }
            
            final FileSystem fs = p.getFileSystem();
    
            // if this is a local file system, we need to initialize the local output directory here
            if (!fs.isDistributedFS()) {
                
                if (numTasks == 1 && outputDirectoryMode == OutputDirectoryMode.PARONLY) {
                    // output should go to a single file
                    
                    // prepare local output path. checks for write mode and removes existing files in case of OVERWRITE mode
                    if(!fs.initOutPathLocalFS(p, writeMode, false)) {
                        // output preparation failed! Cancel task.
                        throw new IOException("Output path '" + p.toString() + "' could not be initialized. Canceling task...");
                    }
                }
                else {
                    // numTasks > 1 || outDirMode == OutputDirectoryMode.ALWAYS
                    
                    if(!fs.initOutPathLocalFS(p, writeMode, true)) {
                        // output preparation failed! Cancel task.
                        throw new IOException("Output directory '" + p.toString() + "' could not be created. Canceling task...");
                    }
                }
            }
    
    
    
            // Suffix the path with the parallel instance index, if needed
            this.actualFilePath = (numTasks > 1 || outputDirectoryMode == OutputDirectoryMode.ALWAYS) ? p.suffix("/" + getDirectoryFileName(taskNumber)) : p;
    
            // create output file
            this.stream = fs.create(this.actualFilePath, writeMode);
            
            // at this point, the file creation must have succeeded, or an exception has been thrown
            this.fileCreated = true;
        }
    
        @Override
        public void close() throws IOException {
            final FSDataOutputStream s = this.stream;
            if (s != null) {
                this.stream = null;
                s.close();
            }
        }
    }
    
    • FileOutputFormat继承了RichOutputFormat,实现了InitializeOnMaster(initializeGlobal方法)、CleanupWhenUnsuccessful(tryCleanupOnError方法)接口
    • initializeGlobal主要是判断,如果文件是分布式系统文件,那么就在启动的时候全局初始化一下;tryCleanupOnError方法先close,然后再delete文件
    • FileOutputFormat还实现了OutputFormat接口的configure、open、close方法,而writeRecord方法由子类来实现;configure方法主要是配置outputFilePath、writeMode、outputDirectoryMode这几个属性;open方法则根据taskNumber来获取actualFilePath(对于numTasks大于1的,则根据tasknumber在配置的outputFilePath目录下新增文件,文件名为tasknumber对应的数值+1),然后创建stream;close方法只要是关闭stream

    RichOutputFormat

    flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/io/RichOutputFormat.java

    /**
     * An abstract stub implementation for Rich output formats.
     * Rich formats have access to their runtime execution context via {@link #getRuntimeContext()}.
     */
    @Public
    public abstract class RichOutputFormat<IT> implements OutputFormat<IT> {
        
        private static final long serialVersionUID = 1L;
        
        // --------------------------------------------------------------------------------------------
        //  Runtime context access
        // --------------------------------------------------------------------------------------------
        
        private transient RuntimeContext runtimeContext;
    
        public void setRuntimeContext(RuntimeContext t) {
            this.runtimeContext = t;
        }
        
        public RuntimeContext getRuntimeContext() {
            if (this.runtimeContext != null) {
                return this.runtimeContext;
            } else {
                throw new IllegalStateException("The runtime context has not been initialized yet. Try accessing " +
                        "it in one of the other life cycle methods.");
            }
        }
    }
    
    • RichOutputFormat声明实现OutputFormat接口,它主要是增加了RuntimeContext属性

    OutputFormat

    flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/io/OutputFormat.java

    /**
     * The base interface for outputs that consumes records. The output format
     * describes how to store the final records, for example in a file.
     * <p>
     * The life cycle of an output format is the following:
     * <ol>
     *   <li>configure() is invoked a single time. The method can be used to implement initialization from
     *       the parameters (configuration) that may be attached upon instantiation.</li>
     *   <li>Each parallel output task creates an instance, configures it and opens it.</li>
     *   <li>All records of its parallel instance are handed to the output format.</li>
     *   <li>The output format is closed</li>
     * </ol>
     * 
     * @param <IT> The type of the consumed records. 
     */
    @Public
    public interface OutputFormat<IT> extends Serializable {
        
        /**
         * Configures this output format. Since output formats are instantiated generically and hence parameterless, 
         * this method is the place where the output formats set their basic fields based on configuration values.
         * <p>
         * This method is always called first on a newly instantiated output format. 
         *  
         * @param parameters The configuration with all parameters.
         */
        void configure(Configuration parameters);
        
        /**
         * Opens a parallel instance of the output format to store the result of its parallel instance.
         * <p>
         * When this method is called, the output format it guaranteed to be configured.
         * 
         * @param taskNumber The number of the parallel instance.
         * @param numTasks The number of parallel tasks.
         * @throws IOException Thrown, if the output could not be opened due to an I/O problem.
         */
        void open(int taskNumber, int numTasks) throws IOException;
        
        
        /**
         * Adds a record to the output.
         * <p>
         * When this method is called, the output format it guaranteed to be opened.
         * 
         * @param record The records to add to the output.
         * @throws IOException Thrown, if the records could not be added to to an I/O problem.
         */
        void writeRecord(IT record) throws IOException;
        
        /**
         * Method that marks the end of the life-cycle of parallel output instance. Should be used to close
         * channels and streams and release resources.
         * After this method returns without an error, the output is assumed to be correct.
         * <p>
         * When this method is called, the output format it guaranteed to be opened.
         *  
         * @throws IOException Thrown, if the input could not be closed properly.
         */
        void close() throws IOException;
    }
    
    • OutputFormat接口定义了configure、open、writeRecord、close方法

    小结

    • DataStream的writeAsText方法创建了TextOutputFormat,然后通过OutputFormatSinkFunction包装为sink function
    • TextOutputFormat继承了FileOutputFormat,其open方法主要是调用FileOutputFormat的open方法,而writeRecord方法则直接往stream进行write,写完一条record之后再写一个换行(\n)
    • FileOutputFormat继承了RichOutputFormat,实现了InitializeOnMaster(initializeGlobal方法)、CleanupWhenUnsuccessful(tryCleanupOnError方法)接口,以及OutputFormat接口的configure、open、close方法,而writeRecord方法由子类来实现;
    • FileOutputFormat的open方法则根据taskNumber来获取actualFilePath(对于numTasks大于1的,则根据tasknumber在配置的outputFilePath目录下新增文件,文件名为tasknumber对应的数值+1),然后创建stream
    • RichOutputFormat声明实现OutputFormat接口,它主要是增加了RuntimeContext属性;OutputFormat接口则定义了configure、open、writeRecord、close方法

    doc

    相关文章

      网友评论

          本文标题:聊聊flink的TextOutputFormat

          本文链接:https://www.haomeiwen.com/subject/mfgecqtx.html