美文网首页
Flink DataSet和DataStream Print方法

Flink DataSet和DataStream Print方法

作者: biggeng | 来源:发表于2019-04-23 11:21 被阅读0次

    在Flink example中,有两个Wordcount的example特别类似,一个是batch下的WordCount一个是streaming下的WordCount,从用法上来讲也比较类似。

    WordCount example
    1. batch下的WordCount样例
    • 用法
      在Flink on Yarn模式下,配置好HADOOP_CONF_DIR和YARN_CONF_DIR后执行
    bin/flink run -m yarn-cluster -yjm 1024 -ytm 1024 -yqu exampleQ  examples/batch/WordCount.jar
    

    默认情况下如果不加input和output参数,会读取默认的数据WordCountData进行count,然后将结果输出到std out。执行完结果可以看到client端会有输出结果如下。

    (wish,1)
    (with,3)
    (would,2)
    (wrong,1)
    (you,1)
    Program execution finished
    Job with JobID 21eca2a01dbbc9594525824e9590c453 has finished.
    Job Runtime: 12435 ms
    Accumulator Results:
    - 71de74f85dc472654c31b6df79701cf5 (java.util.ArrayList) [170 elements]
    
    1. streaming下的WordCount样例
    • 用法
      对比batch的WordCount,执行命令如下
    bin/flink run -m yarn-cluster -yjm 1024 -ytm 1024 -yqu exampleQ  examples/streaming/WordCount.jar
    

    默认情况下如果不加input和output参数,会读取默认的流数据WordCountData进行count,然后将结果输出到std out。执行完结果可以看到client端会有输出结果如下。
    但是和batch样例不同的是在流处理中,client端并没有输出count结果的二元组。

    问题出在哪了?

    我们进入example的源码比较
    batch下的WordCount

            // get input data
            DataSet<String> text;
            if (params.has("input")) {
                // read the text file from given input path
                text = env.readTextFile(params.get("input"));
            } else {
                // get default test text data
                System.out.println("Executing WordCount example with default input data set.");
                System.out.println("Use --input to specify file input.");
                text = WordCountData.getDefaultTextLineDataSet(env);
            }
    
            DataSet<Tuple2<String, Integer>> counts =
                    // split up the lines in pairs (2-tuples) containing: (word,1)
                    text.flatMap(new Tokenizer())
                    // group by the tuple field "0" and sum up tuple field "1"
                    .groupBy(0)
                    .sum(1);
    
            // emit result
            if (params.has("output")) {
                counts.writeAsCsv(params.get("output"), "\n", " ");
                // execute program
                env.execute("WordCount Example");
            } else {
                System.out.println("Printing result to stdout. Use --output to specify output path.");
                counts.print();
            }
    

    可以看到从默认的数据读到DataSet text中,然后对text进行map和reduce操作统计出结果counts,然后将counts结果print到std out中。
    streaming下的WordCount

        // get input data
            DataStream<String> text;
            if (params.has("input")) {
                // read the text file from given input path
                text = env.readTextFile(params.get("input"));
            } else {
                System.out.println("Executing WordCount example with default input data set.");
                System.out.println("Use --input to specify file input.");
                // get default test text data
                text = env.fromElements(WordCountData.WORDS);
            }
    
            DataStream<Tuple2<String, Integer>> counts =
                // split up the lines in pairs (2-tuples) containing: (word,1)
                text.flatMap(new Tokenizer())
                // group by the tuple field "0" and sum up tuple field "1"
                .keyBy(0).sum(1);
    
            // emit result
            if (params.has("output")) {
                counts.writeAsText(params.get("output"));
            } else {
                System.out.println("Printing result to stdout. Use --output to specify output path.");
                counts.print();
            }
    
            // execute program
            env.execute("Streaming WordCount");
    

    可以看到整个流程和batch没有太大区别,不过用的数据类型是DataStream,那么问题来了,为什么流处理下print没有将结果输出到client.
    我们对比下DataSet的print和DataSteam的print区别。

        /**
         * Prints the elements in a DataSet to the standard output stream {@link System#out} of the JVM that calls
         * the print() method. For programs that are executed in a cluster, this method needs
         * to gather the contents of the DataSet back to the client, to print it there.
         *
         * <p>The string written for each element is defined by the {@link Object#toString()} method.
         *
         * <p>This method immediately triggers the program execution, similar to the
         * {@link #collect()} and {@link #count()} methods.
         *
         * @see #printToErr()
         * @see #printOnTaskManager(String)
         */
        public void print() throws Exception {
            List<T> elements = collect();
            for (T e: elements) {
                System.out.println(e);
            }
        }
    

    可以看到首先是collect操作,客户端获得了DataSet的list,然后在客户端输出结果。由于存在数据收集传输的过程,所以也建议对大规模数据不要轻易使用print方法。
    DataStream的print.

        /**
         * Writes a DataStream to the standard output stream (stdout).
         *
         * <p>For each element of the DataStream the result of {@link Object#toString()} is written.
         *
         * <p>NOTE: This will print to stdout on the machine where the code is executed, i.e. the Flink
         * worker.
         *
         * @return The closed DataStream.
         */
        @PublicEvolving
        public DataStreamSink<T> print() {
            PrintSinkFunction<T> printFunction = new PrintSinkFunction<>();
            return addSink(printFunction).name("Print to Std. Out");
        }
    

    可以看到只是增加了printsink,并没有把数据收集到client端,因此std out也不会在客户端进行,而是在这段代码的执行机器上进行,也就是Flink的TaskManager上。

    结论

    正是因为DataSet和DataStream关于print方法的实现不同,也导致了其行为不一致,所以出现了本文开始讲的两种WordCount输出行为的区别。

    相关文章

      网友评论

          本文标题:Flink DataSet和DataStream Print方法

          本文链接:https://www.haomeiwen.com/subject/qnsmgqtx.html