import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class FlinkFirstApp2 {
public static void main(String[] args)throws Exception{
final int port;
final String hostname;
try{
final ParameterTool params=ParameterTool.fromArgs(args);
port=params.getInt("port",9999);
hostname=params.get("hostname","192.168.21.138");
}catch (Exception e){
System.err.println("No port or hostname specified.Please run check your parameters");
return;
}
final StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
DataStream lines=env.socketTextStream(hostname,port);
DataStream> result=lines.flatMap(new FlatMapFunction>() {
public void flatMap(String s, Collector> collector){
String[] datas=s.split(" ");
for (String data:datas){
collector.collect(new Tuple2(data,1));
}
}
}).keyBy(0)//以Tuple的第一个字段分组
.timeWindow(Time.seconds(4),Time.seconds(2))//每隔2秒算4秒
.sum(1);//对Tuple的第二个字段求和
result.print().setParallelism(1);
env.execute("FlinkFirstApp");
}
}
/**
* recognized as POJO:lombok plugin
* 1)public
* 2)without arguments constructor
* 3)getter/setter
* 4)serialize(有些东西需要序列化)
*
*/
public class WC {
private Stringword;
private long count;
public WC(String word, long count) {
this.word = word;
this.count = count;
}
public WC() {
}
public StringgetWord() {
return word;
}
public long getCount() {
return count;
}
public void setWord(String word) {
this.word = word;
}
public void setCount(long count) {
this.count = count;
}
@Override
public StringtoString() {
return "WC{" +
"word='" +word +'\'' +
", count=" +count +
'}';
}
}
网友评论