美文网首页
Flink WordCount 和 SQL 实现

Flink WordCount 和 SQL 实现

作者: 一生逍遥一生 | 来源:发表于2021-09-06 11:47 被阅读0次

本文是基于scala 2.11、flink 1.12.0。

Maven Dependencies

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <scala.binary.version>2.11</scala.binary.version>
    <flink.version>1.12.0</flink.version>
    <java.version>1.8</java.version>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <encoding>UTF-8</encoding>
</properties>
<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <!--如果是1.10之后需要添加-->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>

单词计数

DataSet WordCount

public class DataSetWordCount {
    public static void main(String[] args) throws Exception {
        //创建Flink运行的上下文环境
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        //创建DataSet
        DataSet<String> text = env.fromElements(
                "Flink Spark Storm",
                "Flink Flink Flink",
                "Storm Storm Storm"
        );
        //通过内置的转换函数进行计算
        DataSet<Tuple2<String, Integer>> counts = text.flatMap(new LineSplitter())
                //按照第几个元素分组
                .groupBy(0)
                //对分组的操作
                .sum(1);
        counts.printToErr();
    }

    public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
            String[] tokens = value.split("\\W+");
            for (String token : tokens) {
                if (token.length() > 0) {
                    out.collect(new Tuple2<String, Integer>(token, 1));
                }
            }
        }
    }
}

DataStream WordCount

@Data
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class WordWithCount {
    private String word;
    private long count;
}
public class WordCountFlatMapFunction implements FlatMapFunction<String, WordWithCount> {
    @Override
    public void flatMap(String value, Collector<WordWithCount> out) throws Exception {
        for (String word : value.split("\\s")) {
            out.collect(new WordWithCount(word, 1L));
        }
    }
}
public class WordCountReduceFunction implements ReduceFunction<WordWithCount> {
    @Override
    public WordWithCount reduce(WordWithCount a, WordWithCount b) {
        return new WordWithCount(a.getWord(), a.getCount() + b.getCount());
    }
}
public class DataStreamWordCount {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 监听端口
        DataStream<String> text = env.socketTextStream("127.0.0.1", 9000);
        //将接收的数据进行拆分、分组,窗口计算并且进行聚合输出
        DataStream<WordWithCount> windowCounts = text.flatMap(new WordCountFlatMapFunction())
                .keyBy(vale -> vale.getWord())
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .reduce(new WordCountReduceFunction());
        windowCounts.print().setParallelism(1);
        env.execute("Socket Windows WordCount");
    }
}

FlinkTable & SQL WordCount

可以将DataSet、DataStream转换为SQL。

public class TableWordCount {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
        BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);
        String data = "Hello Flink Hello World";
        String[] words = data.split("\\W+");
        ArrayList<WC> list = new ArrayList<>();
        for (String word : words) {
            list.add(new WC(word, 1));
        }
        DataSet<WC> input = fbEnv.fromCollection(list);
        //DataSet 转sql, 指定字段名
        Table table = fbTableEnv.fromDataSet(input, "word,frequency");
        table.printSchema();
        //注册为一个表
        fbTableEnv.createTemporaryView("WordCount", table);
        Table table02 = fbTableEnv.sqlQuery("select word as word, sum(frequency) as frequency from WordCount GROUP BY word");
        //将表转换DataSet
        DataSet<WC> dataSet = fbTableEnv.toDataSet(table02, WC.class);
        dataSet.printToErr();
    }

    public static class WC {
        public String word;
        public long frequency;

        public WC() {
        }

        public WC(String word, long frequency) {
            this.word = word;
            this.frequency = frequency;
        }

        @Override
        public String toString() {
            return word + ", " + frequency;
        }
    }

}

相关文章

网友评论

      本文标题:Flink WordCount 和 SQL 实现

      本文链接:https://www.haomeiwen.com/subject/hbdkwltx.html