美文网首页
flink基础wordcount demo-java版

flink基础wordcount demo-java版

作者: DuLaGong | 来源:发表于2019-05-13 21:35 被阅读0次

import org.apache.flink.api.common.functions.FlatMapFunction;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.api.java.utils.ParameterTool;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.windowing.time.Time;

import org.apache.flink.util.Collector;

public class FlinkFirstApp2 {

public static void main(String[] args)throws Exception{

final int port;

        final String hostname;

        try{

final ParameterTool params=ParameterTool.fromArgs(args);

            port=params.getInt("port",9999);

            hostname=params.get("hostname","192.168.21.138");

        }catch (Exception e){

System.err.println("No port or hostname specified.Please run check your parameters");

return;

        }

final StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream lines=env.socketTextStream(hostname,port);

        DataStream> result=lines.flatMap(new FlatMapFunction>() {

public void flatMap(String s, Collector> collector){

String[] datas=s.split(" ");

                for (String data:datas){

collector.collect(new Tuple2(data,1));

                }

}

}).keyBy(0)//以Tuple的第一个字段分组

        .timeWindow(Time.seconds(4),Time.seconds(2))//每隔2秒算4秒

        .sum(1);//对Tuple的第二个字段求和

        result.print().setParallelism(1);

        env.execute("FlinkFirstApp");

    }

}

/**

* recognized as POJO:lombok plugin

* 1)public

* 2)without arguments constructor

* 3)getter/setter

* 4)serialize(有些东西需要序列化)

*

*/

public class WC {

private Stringword;

    private long count;

    public WC(String word, long count) {

this.word = word;

        this.count = count;

    }

public WC() {

}

public StringgetWord() {

return word;

    }

public long getCount() {

return count;

    }

public void setWord(String word) {

this.word = word;

    }

public void setCount(long count) {

this.count = count;

    }

@Override

    public StringtoString() {

return "WC{" +

"word='" +word +'\'' +

", count=" +count +

'}';

    }

}

相关文章

网友评论

      本文标题:flink基础wordcount demo-java版

      本文链接:https://www.haomeiwen.com/subject/urrdaqtx.html