windows 开发的话,我们可以下载nc,把其中的nc.exe 拷贝到我们 C:\Users\当前登录的用户 下就可以。
然后 cmd 输入以下命令,-L 为监听,p是端口,监听7777,下载地址 https://eternallybored.org/misc/netcat/
nc -Lp 7777
启动以下程序就可以模拟实时stream
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.12.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.12.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.12.2</version>
</dependency>
</dependencies>
public class EventData{
private Integer id;
private Long eventTime;
private String data;
private Integer num;
public EventData(){
};
public Integer getNum() {
return num;
}
public void setNum(Integer num) {
this.num = num;
}
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public Long getEventTime() {
return eventTime;
}
public void setEventTime(Long eventTime) {
this.eventTime = eventTime;
}
public String getData() {
return data;
}
public void setData(String data) {
this.data = data;
}
@Override
public String toString() {
return "EventData{" +
"id=" + id +
", eventTime=" + eventTime +
", data='" + data + '\'' +
", num=" + num +
'}';
}
}
package com.example.demo;
import com.example.demo.bean.EventData;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @author big uncle
* @date 2021/6/5 15:36
* @module
**/
public class EventTime {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 监听本地IP
DataStream<String> dataStream = env.socketTextStream("192.168.200.58", 7777);
// 数据转换
dataStream.map(new MapFunction<String, EventData>() {
@Override
public EventData map(String value) throws Exception {
String[] strs = value.split(",");
EventData eventData = new EventData();
eventData.setId(Integer.valueOf(strs[0]));
eventData.setEventTime(Long.valueOf(strs[1]));
eventData.setData(strs[2]);
eventData.setNum(Integer.valueOf(strs[3]));
return eventData;
}
}).print();
env.execute("test");
}
}
测试数据
1,1623051400,test data,1
1,1623051401,test data,1
1,1623051402,test data,1
1,1623051405,test data,3
1,1623051406,test data,3
1,1623051409,test data,3
1,1623051410,test data,5
网友评论