美文网首页
Flink入门案例-WordCount流处理

Flink入门案例-WordCount流处理

作者: Anthons | 来源:发表于2021-03-29 14:51 被阅读0次

    一、maven项目的pom.xml中的依赖

       <properties>
            <flink.version>1.9.1</flink.version>
        </properties>
        <!--引入flink依赖-->
        <dependencies>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-java</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_2.12</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-cep_2.12</artifactId>
                <version>${flink.version}</version>
            </dependency>
        </dependencies>
    

    二、测试数据

    input file path:./hello.txt

    hello world
    hello flink
    hello spark
    hello scala
    how are you
    fine thank you
    and you

    三、Flink WordCount Java版

    package com.cn.wc;
    
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.util.Collector;
    
    public class StreamWordCount {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 设置并行度
            env.setParallelism(2);
            String inputPath = "C:\\workstation\\maven_project\\flink_wordcount\\src\\main\\resources\\hello.txt";
    
            // DataStreamSource 继承SingleOutputStreamOperator,其继承DataStream
            // 从文件中读取数据并模仿流式数据
            DataStreamSource<String> inputDataStream = env.readTextFile(inputPath);
    
            // 从socket(localhost:9000 可自己定义)文本流读取数据
            // DataStreamSource<String> inputDataStream2 = env.socketTextStream("localhost", 9000);
    
            // 基于数据流进行转换操作
            SingleOutputStreamOperator<Tuple2<String, Integer>> result = inputDataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                    String[] words = s.split(" ");
                    for (String word : words) {
                        collector.collect(new Tuple2<String, Integer>(word, 1));
                    }
                }
            })
            .keyBy(0) // 针对相同的word 合并,批处理groupby,流处理keyby
            .sum(1);
            result.print(); // 到这里不会输出
            // 触发流执行任务
            env.execute();
        }
    }
    
    

    四、运行结果

    注:随着流的不断的触发任务会不断更新结果。
    2> (how,1)
    1> (hello,1)
    2> (you,1)
    2> (fine,1)
    1> (hello,2)
    2> (you,2)
    1> (hello,3)
    2> (and,1)
    1> (spark,1)
    1> (hello,4)
    2> (you,3)
    1> (scala,1)
    2> (world,1)
    1> (are,1)
    1> (thank,1)
    2> (flink,1)

    相关文章

      网友评论

          本文标题:Flink入门案例-WordCount流处理

          本文链接:https://www.haomeiwen.com/subject/odxbhltx.html