State
Connector
Kafka Consumer
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "hmaster:9092");
properties.setProperty("zookeeper.connect", "hmaster:2181");
properties.setProperty("group.id", "test");
FlinkKafkaConsumer myConsumer = new FlinkKafkaConsumer("wechat", new SimpleStringSchema(), properties);
HDFS Connector
String textPath = "hdfs://172.22.5.130:8020/apps/hive/warehouse/datahandle.db/o5040_sp_zid_201901290819/O5040_sp_zid_201901290819__00bc5e10_3bff_4492_a61e_299996813e6d";
DataSet<Tuple2<LongWritable, Text>> input = env.createInput(HadoopInputs.readHadoopFile(new TextInputFormat(),
LongWritable.class, Text.class, textPath));
自定义Source
定义一个类并实现SourceFunction或者ParaliseFunction、RichParaliseFunction
并调用env.addSource()方法
显示的类一定要指定泛型类型,否则报错
算子
Map: 一对一的映射,一个元素生成一个元素
flatMap: 一对多的映射,一个元素生成若干个元素
reduce: 上次计算的元素结果与本次收到的元素的合并
filter: 返回true保留元素,返回false过滤元素
sum: 汇总计算
keyBy: 按照某个域值的hashCode做分区
Project: 获取Tuple中值域的子集
Iterate: 用于迭代循环操作,两个filter,第一个filter控制闭合,第二个filter控制数据流的输出(如果没有第二个filter操作,每次迭代的结果都会输出,感觉这个策略不是很友好)
broadcast(): 作用于数据流,用于将数据流分发到多个下游服务器,下有服务器中的Operator共享数据
rebalance(): 用于解决数据倾斜问题(解决的方式是生成多个线程,处理同一条数据)
IterativeStream<Long> iteration = initialStream.iterate();
DataStream<Long> iterationBody = iteration.map (/*do something*/);
DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){
@Override
public boolean filter(Long value) throws Exception {
return value > 0;
}
});
iteration.closeWith(feedback);
DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){
@Override
public boolean filter(Long value) throws Exception {
return value <= 0;
}
});
disableOperatorChaining()用来禁止算子的链式操作(在同一个线程种的操作,用于资源的均衡)
Event Time
Processing time: 算子运行的时间,物理机的时间,归整。
Event time: 从事件记录获取到的时间,由数据源生成。
Ingestion time: 事件记录进入Flink的事件。

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
用来设置使用哪种事件特性。
Monitors
- 通过web UI查看Metrics、检查点、背压等;
- Metrics通过RichFunction监控,可以是Counter,Guage,Histogram等。
网友评论