美文网首页
Flink SocketWindowWordCount 源码分

Flink SocketWindowWordCount 源码分

作者: Bamboooooo_Yoo | 来源:发表于2018-04-18 15:52 被阅读0次

    今天搭好了Flink单机环境,试了自带的单词统计程序,为了尽快上手使用Flink开发,来看一下单词统计的源码~

    public class SocketWindowWordCount {
        public static void main(String[] args) throws Exception {
    
            // 用final修饰符定义端口号,表示不可变
            final int port;
            try {
                final ParameterTool params = ParameterTool.fromArgs(args);
                port = params.getInt("port");
            } catch (Exception e) {
                System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'");
                return;
            }
    
            // (1)获取执行环境
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // (2)获取数据流,例子中是从指定端口的socket中获取用户输入的文本
            DataStream<String> text = env.socketTextStream("localhost", port, "\n");
    
            // (3)transformation操作,对数据流实现算法
            DataStream<WordWithCount> windowCounts = text
                    //将用户输入的文本流以非空白符的方式拆开来,得到单个的单词,存入命名为out的Collector中
                    .flatMap(new FlatMapFunction<String, WordWithCount>() {
                        public void flatMap(String value, Collector<WordWithCount> out) {
                            for (String word : value.split("\\s")) {
                                out.collect(new WordWithCount(word, 1L));
                            }
                        }
                    })
                    //将输入的文本分为不相交的分区,每个分区包含的都是具有相同key的元素。也就是说,相同的单词被分在了同一个区域,下一步的reduce就是统计分区中的个数
                    .keyBy("word")
                    //滑动窗口机制,每1秒计算一次最近5秒
                    .timeWindow(Time.seconds(5), Time.seconds(1))
                    //一个在KeyedDataStream上“滚动”进行的reduce方法。将上一个reduce过的值和当前element结合,产生新的值并发送出。
                    //此处是说,对输入的两个对象进行合并,统计该单词的数量和
                    .reduce(new ReduceFunction<WordWithCount>() {
                        public WordWithCount reduce(WordWithCount a, WordWithCount b) {
                            return new WordWithCount(a.word, a.count + b.count);
                        }
                    });
    
            // 单线程执行该程序
            windowCounts.print().setParallelism(1);
    
            env.execute("Socket Window WordCount");
        }
    
        // 统计单词的数据结构,包含两个变量和三个方法
        public static class WordWithCount {
            //两个变量存储输入的单词及其数量
            public String word;
            public long count;
            
            public WordWithCount() {}
            
            public WordWithCount(String word, long count) {
                this.word = word;
                this.count = count;
            }
    
            @Override
            public String toString() {
                return word + " : " + count;
            }
        }
    }
    
    1. 时间窗口:

    (1) timeWindow(Time.seconds(2)),只有一个参数,表示是翻滚时间窗口(Tumbling window),即不重叠的时间窗口,只统计本窗口内的数据;
    (2) timeWindow(Time.seconds(5),Time.seconds(1)),有两个参数,表示是滑动时间窗口(Sliding window),即每过t2时间,统计前t1时间内的数据。 本例中就是,每秒计算前5秒内的数据。

    2. DataStream的transformation操作 API

    参考:https://www.cnblogs.com/lanyun0520/p/5730403.html

    相关文章

      网友评论

          本文标题:Flink SocketWindowWordCount 源码分

          本文链接:https://www.haomeiwen.com/subject/zfbvkftx.html