执行环境
创建一个执行环境,表示当前执行程序的上下文,如果程序是独立调用的则此方法返回本地执行环境,如果从命令行客户端调用程序提交到集群,则此方法返回此集群的执行环境,也就是是说,getExecutionEnvironment(),会根据查询运行的方式决定返回什么样的运行环境。是最常用的一种创建执行环境的方式。
# 流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
#批处理
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
1.读取的数据源-Elements
DataStreamSource<Integer> integerDataStreamSource = env.fromElements(1, 2, 2, 5, 44);
2.读取的数据源-Collection
public class SensorReading {
private String id;
private Long timestamp;
private Double temperature;
public SensorReading() {
}
public SensorReading(String id, Long timestamp, Double temperature) {
this.id = id;
this.timestamp = timestamp;
this.temperature = temperature;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public Long getTimestamp() {
return timestamp;
}
public void setTimestamp(Long timestamp) {
this.timestamp = timestamp;
}
public Double getTemperature() {
return temperature;
}
public void setTemperature(Double temperature) {
this.temperature = temperature;
}
@Override
public String toString() {
return "SensorReading{" +
"id='" + id + '\'' +
", timestamp=" + timestamp +
", temperature=" + temperature +
'}';
}
DataStreamSource<SensorReading> sensorDataStream = env.fromCollection(Arrays.asList(
new SensorReading("sensor_1", 1547718199L, 35.8),
new SensorReading("sensor_6", 1547718201L, 15.8),
new SensorReading("sensor_7", 1547718202L, 37.8),
new SensorReading("sensor_10", 1547718203L, 38.8)
));
3.读取的数据源-File
//从文件读取
DataStreamSource<String> stringDataStreamSource = env.readTextFile("/path/to/hello.txt");
4.从socket文本流读取数据
DataStream<String> inputDataStream = env.socketTextStream(localhost,7777);
最终执行
sensorDataStream.print("sensor");
integerDataStreamSource.print("int");
stringDataStreamSource.print("file");
env.execute("my job name");
网友评论