美文网首页
《Flink入门》WorCount

《Flink入门》WorCount

作者: 饲养员壹号 | 来源:发表于2019-06-13 20:55 被阅读0次

package test;

/**

* Created by admin on 2019/6/8.

*/

import org.apache.flink.api.common.functions.FlatMapFunction;

import org.apache.flink.api.java.utils.ParameterTool;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.datastream.DataStreamSource;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.windowing.time.Time;

import org.apache.flink.util.Collector;

public class WordCount {

public static void main(String[] args)throws Exception {

//定义socket的端口号

        int port;

        try{

ParameterTool parameterTool = ParameterTool.fromArgs(args);

            port = parameterTool.getInt("port");

        }catch (Exception e){

System.err.println("没有指定port参数,使用默认值9000");

            port =9000;

        }

//获取运行环境

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //连接socket获取输入的数据

        DataStreamSource text = env.socketTextStream("192.168.191.128", port, "\n");

        //计算数据

        DataStream windowCount = text.flatMap(new FlatMapFunction() {

public void flatMap(String value, Collector out)throws Exception {

String[] splits = value.split("\\s");

                for (String word:splits) {

out.collect(new WordWithCount(word,1L));

                }

}

})//打平操作,把每行的单词转为<word,count>类型的数据

                .keyBy("word")//针对相同的word数据进行分组

                .timeWindow(Time.seconds(2),Time.seconds(1))//指定计算数据的窗口大小和滑动窗口大小

                .sum("count");

        //把数据打印到控制台

        windowCount.print()

.setParallelism(1);//使用一个并行度

        //注意:因为flink是懒加载的,所以必须调用execute方法,上面的代码才会执行

        env.execute("streaming word count");

    }

/**

    * 主要为了存储单词以及单词出现的次数

    */

    public static class WordWithCount{

public Stringword;

        public long count;

        public WordWithCount(){}

public WordWithCount(String word, long count) {

this.word = word;

            this.count = count;

        }

@Override

        public StringtoString() {

return "WordWithCount{" +

"word='" +word +'\'' +

", count=" +count +

'}';

        }

}

}

相关文章

网友评论

      本文标题:《Flink入门》WorCount

      本文链接:https://www.haomeiwen.com/subject/ggmefctx.html