美文网首页
《Flink入门》WorCount

《Flink入门》WorCount

作者: 饲养员壹号 | 来源:发表于2019-06-13 20:55 被阅读0次

    package test;

    /**

    * Created by admin on 2019/6/8.

    */

    import org.apache.flink.api.common.functions.FlatMapFunction;

    import org.apache.flink.api.java.utils.ParameterTool;

    import org.apache.flink.streaming.api.datastream.DataStream;

    import org.apache.flink.streaming.api.datastream.DataStreamSource;

    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

    import org.apache.flink.streaming.api.windowing.time.Time;

    import org.apache.flink.util.Collector;

    public class WordCount {

    public static void main(String[] args)throws Exception {

    //定义socket的端口号

            int port;

            try{

    ParameterTool parameterTool = ParameterTool.fromArgs(args);

                port = parameterTool.getInt("port");

            }catch (Exception e){

    System.err.println("没有指定port参数,使用默认值9000");

                port =9000;

            }

    //获取运行环境

            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

            //连接socket获取输入的数据

            DataStreamSource text = env.socketTextStream("192.168.191.128", port, "\n");

            //计算数据

            DataStream windowCount = text.flatMap(new FlatMapFunction() {

    public void flatMap(String value, Collector out)throws Exception {

    String[] splits = value.split("\\s");

                    for (String word:splits) {

    out.collect(new WordWithCount(word,1L));

                    }

    }

    })//打平操作,把每行的单词转为<word,count>类型的数据

                    .keyBy("word")//针对相同的word数据进行分组

                    .timeWindow(Time.seconds(2),Time.seconds(1))//指定计算数据的窗口大小和滑动窗口大小

                    .sum("count");

            //把数据打印到控制台

            windowCount.print()

    .setParallelism(1);//使用一个并行度

            //注意:因为flink是懒加载的,所以必须调用execute方法,上面的代码才会执行

            env.execute("streaming word count");

        }

    /**

        * 主要为了存储单词以及单词出现的次数

        */

        public static class WordWithCount{

    public Stringword;

            public long count;

            public WordWithCount(){}

    public WordWithCount(String word, long count) {

    this.word = word;

                this.count = count;

            }

    @Override

            public StringtoString() {

    return "WordWithCount{" +

    "word='" +word +'\'' +

    ", count=" +count +

    '}';

            }

    }

    }

    相关文章

      网友评论

          本文标题:《Flink入门》WorCount

          本文链接:https://www.haomeiwen.com/subject/ggmefctx.html