美文网首页
Flink入门案例-WordCount流处理

Flink入门案例-WordCount流处理

作者: Anthons | 来源:发表于2021-03-29 14:51 被阅读0次

一、maven项目的pom.xml中的依赖

   <properties>
        <flink.version>1.9.1</flink.version>
    </properties>
    <!--引入flink依赖-->
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cep_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>

二、测试数据

input file path:./hello.txt

hello world
hello flink
hello spark
hello scala
how are you
fine thank you
and you

三、Flink WordCount Java版

package com.cn.wc;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class StreamWordCount {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置并行度
        env.setParallelism(2);
        String inputPath = "C:\\workstation\\maven_project\\flink_wordcount\\src\\main\\resources\\hello.txt";

        // DataStreamSource 继承SingleOutputStreamOperator,其继承DataStream
        // 从文件中读取数据并模仿流式数据
        DataStreamSource<String> inputDataStream = env.readTextFile(inputPath);

        // 从socket(localhost:9000 可自己定义)文本流读取数据
        // DataStreamSource<String> inputDataStream2 = env.socketTextStream("localhost", 9000);

        // 基于数据流进行转换操作
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = inputDataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] words = s.split(" ");
                for (String word : words) {
                    collector.collect(new Tuple2<String, Integer>(word, 1));
                }
            }
        })
        .keyBy(0) // 针对相同的word 合并,批处理groupby,流处理keyby
        .sum(1);
        result.print(); // 到这里不会输出
        // 触发流执行任务
        env.execute();
    }
}

四、运行结果

注:随着流的不断的触发任务会不断更新结果。
2> (how,1)
1> (hello,1)
2> (you,1)
2> (fine,1)
1> (hello,2)
2> (you,2)
1> (hello,3)
2> (and,1)
1> (spark,1)
1> (hello,4)
2> (you,3)
1> (scala,1)
2> (world,1)
1> (are,1)
1> (thank,1)
2> (flink,1)

相关文章

网友评论

      本文标题:Flink入门案例-WordCount流处理

      本文链接:https://www.haomeiwen.com/subject/odxbhltx.html