本专题将会从0到1实现告警处理流程,并会讲解实现过程中使用到的Flink中的技术。
1 Flink的工作方式
使用Flink常用的方式是将Flink作为管道和管道之间的处理器,Flink从源中读取数据,进行逻辑计算后,将结果写入到目的,这里的源和目的可以是同一类系统,例如,都是kafka。Flink内置的和扩展的Connectors:https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/。
Source -> Transform -> Sink
2 Flink中的无界流数据源
Flink的API对于数据处理来说可以分为两大类:DataStream和DataSet。DataStream用于处理无界流,例如,socket、kafka,DataSet用于处理有界流,例如,对象集合、文件。
2.1 socket
socket数据源可以处理通过套接字传输来的数据:
socketTextStream(String hostname, int port);
socketTextStream(String hostname, int port, String delimiter);
socketTextStream(String hostname, int port, String delimiter, long maxRetry);
hostname是套接字的主机名/IP,port是套接字的端口,delimiter是数据之间的分隔符,默认是"\n"
,maxRetry是最大等待时间,当读取的套接字关闭后,会尝试重连多少次,每隔一秒尝试一次,默认值是0,如果是0表明不重连,如果小于0表明一直重连。
DEMO如下:
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Job {
public static void main(String[] args) throws Exception {
// 创建流执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 创建socket流
DataStream<String> dataStream = env.socketTextStream("localhost", 8888, "\n");
// 打印流
dataStream.print();
}
}
将上面的程序编译打包生成jar包后上传到服务器。这里需要连接socket,先使用nc工具监听8888端口:nc -l 8888
,然后提交flink任务。
当在nc里面输入内容后,flink的程序就可以拿到数据。
2.2 Kafka
Kafka应该是跟Flink最匹配的搭档,由于采集数据量巨大,很多系统都会将数据采集后写入Kafka,然后一个程序从kafka读取数据后落地到后端存储,另一个程序通过Flink从Kafka中读,进行逻辑处理。
第一步,添加kafka依赖,这里要注意flink和kafka的版本。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
其中,flink.version为flink的版本,scala.version为scala的版本。
第二步,构建kafka的配置
// 定义kafka的配置
// bootstrap.servers:kafka brokers的地址,以逗号分割
// group.id:消费者组的ID
Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", "kafka_brokers_list");
kafkaProps.setProperty("group.id", "group_id_of_consumer");
第三步,读取数据,进行解析,由于存储到kafka的数据是二进制的,需要进行反序列化,而客户端写入kafka中的数据最常用的方式就是JSON数据,因此,我们重点讨论如何将kafka中的JSON数据解析成便于flink处理的数据。
flink提供了JSONKeyValueDeserializationSchema
,该类可以将kafka中的数据转换为ObjectNode,其实就相当于是个Map。假设kafka中的数据是:{"name": "luo", "age": 17, "phone": { "home": 123, "work": 456}}
, 然后可以通过ObjectNode.get("value").get("name")
获取到名字luo,通过Object.get("value").get("phone").get("home")
就可以获取到家里的电话123。
flink中方便处理的数据类型有两种:Tuple和对象。因此,将数据用JSONKeyValueDeserializationSchema反序列化,再用map操作将数据转换成Tuple或者对象。
将数据转换成对象的实现方式:
// 传递JSONKeyValueDeserializationSchema的对象,该对象有一个参数
// 表明是否携带kafka的一些元数据
DataStreamSource<ObjectNode> kafkaStream =
env.addSource(new FlinkKafkaConsumer<>("topic", new JSONKeyValueDeserializationSchema(false),
kafkaProps));
// 在map的处理函数里面从ObjectNode对象中获取对应的字段,然后生成对象
kafkaStream.map(new MapFunction<ObjectNode, SwitchMetric>() {
@Override
public SwitchMetric map(ObjectNode jsonNodes) throws Exception {
String name = jsonNodes.get("value").get("name").textValue();
String addr = jsonNodes.get("value").get("addr").textValue();
return new People(name, addr);
}
});
将数据转换成Tuple的实现方式:
DataStreamSource<ObjectNode> kafkaStream =
env.addSource(new FlinkKafkaConsumer<>("topic", new JSONKeyValueDeserializationSchema(false),
kafkaProps));
kafkaStream.map(new MapFunction<ObjectNode, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> map(ObjectNode jsonNodes) throws Exception {
String name = jsonNodes.get("value").get("name").textValue();
String addr = jsonNodes.get("value").get("addr").textValue();
return Tuple2.of(name, addr);
}
});
除了上面的先使用JSONKeyValueDeserializationSchema反序列化,再使用map操作转换成合适的类型之外,还可以使用自定义的DeserializationSchema对数据进行处理:
public static class MySchema implements KafkaDeserializationSchema<Tuple2<String, String>> {
private final ObjectMapper mapper = new ObjectMapper();
@Override
public Tuple5<String, String> deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
// 反序列化的主函数:先将数据转换成Map,然后在返回时构造新的Tuple
// 当然,也可以在这里直接将json字符串解析成对象,不过就需要引入新的处理json的包
Map<String,Object> t = mapper.readValue(record.value(), Map.class);
return new Tuple2.of(String.valueOf(t.get("name")), String.valueOf(t.get("addr")));
}
@Override
public boolean isEndOfStream(Tuple2<String, String> nextElement) {
return false;
}
@Override
public TypeInformation<Tuple2<String, String>> getProducedType() {
return new TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
}
}
3 自定义数据源
flink支持自定义数据源,数据源就是可以源源不断地返回数据(对于流而言),因此,数据源可以说就是一个死循环,这个死循环会持续不断地输出数据。如下例所示,MySource实现了SourceFunction接口,里面有两个方法需要实现:
DataStreamSource<People> stream = env.addSource(new MySource());
stream.print();
public static class MySource implements SourceFunction<People> {
private volatile boolean isRunning = true;
@Override
public void run(SourceContext<People> sourceContext) {
while(isRunning) {
sourceContext.collect(new People("name", "home"));
}
}
@Override
public void cancel() {
this.isRunning = false;
}
}
4 小结
本文介绍了如何从数据源中读取数据,然后转换成对象或者元组以方便后续的处理,然后介绍了自定义数据源并创建了一个简单的数据源。
5 参考文档
- Apache Kafka Connector:https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/kafka.html
网友评论