美文网首页
flink基础wordcount demo-java版

flink基础wordcount demo-java版

作者: DuLaGong | 来源:发表于2019-05-13 21:35 被阅读0次

    import org.apache.flink.api.common.functions.FlatMapFunction;

    import org.apache.flink.api.java.tuple.Tuple2;

    import org.apache.flink.api.java.utils.ParameterTool;

    import org.apache.flink.streaming.api.datastream.DataStream;

    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

    import org.apache.flink.streaming.api.windowing.time.Time;

    import org.apache.flink.util.Collector;

    public class FlinkFirstApp2 {

    public static void main(String[] args)throws Exception{

    final int port;

            final String hostname;

            try{

    final ParameterTool params=ParameterTool.fromArgs(args);

                port=params.getInt("port",9999);

                hostname=params.get("hostname","192.168.21.138");

            }catch (Exception e){

    System.err.println("No port or hostname specified.Please run check your parameters");

    return;

            }

    final StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();

            DataStream lines=env.socketTextStream(hostname,port);

            DataStream> result=lines.flatMap(new FlatMapFunction>() {

    public void flatMap(String s, Collector> collector){

    String[] datas=s.split(" ");

                    for (String data:datas){

    collector.collect(new Tuple2(data,1));

                    }

    }

    }).keyBy(0)//以Tuple的第一个字段分组

            .timeWindow(Time.seconds(4),Time.seconds(2))//每隔2秒算4秒

            .sum(1);//对Tuple的第二个字段求和

            result.print().setParallelism(1);

            env.execute("FlinkFirstApp");

        }

    }

    /**

    * recognized as POJO:lombok plugin

    * 1)public

    * 2)without arguments constructor

    * 3)getter/setter

    * 4)serialize(有些东西需要序列化)

    *

    */

    public class WC {

    private Stringword;

        private long count;

        public WC(String word, long count) {

    this.word = word;

            this.count = count;

        }

    public WC() {

    }

    public StringgetWord() {

    return word;

        }

    public long getCount() {

    return count;

        }

    public void setWord(String word) {

    this.word = word;

        }

    public void setCount(long count) {

    this.count = count;

        }

    @Override

        public StringtoString() {

    return "WC{" +

    "word='" +word +'\'' +

    ", count=" +count +

    '}';

        }

    }

    相关文章

      网友评论

          本文标题:flink基础wordcount demo-java版

          本文链接:https://www.haomeiwen.com/subject/urrdaqtx.html