美文网首页
基于Flink的实时告警实现(2):读取数据源

基于Flink的实时告警实现(2):读取数据源

作者: luofengmacheng | 来源:发表于2021-03-23 22:48 被阅读0次

本专题将会从0到1实现告警处理流程,并会讲解实现过程中使用到的Flink中的技术。

1 Flink的工作方式

使用Flink常用的方式是将Flink作为管道和管道之间的处理器,Flink从源中读取数据,进行逻辑计算后,将结果写入到目的,这里的源和目的可以是同一类系统,例如,都是kafka。Flink内置的和扩展的Connectors:https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/

Source -> Transform -> Sink

2 Flink中的无界流数据源

Flink的API对于数据处理来说可以分为两大类:DataStream和DataSet。DataStream用于处理无界流,例如,socket、kafka,DataSet用于处理有界流,例如,对象集合、文件。

2.1 socket

socket数据源可以处理通过套接字传输来的数据:

socketTextStream(String hostname, int port);
socketTextStream(String hostname, int port, String delimiter);
socketTextStream(String hostname, int port, String delimiter, long maxRetry);

hostname是套接字的主机名/IP,port是套接字的端口,delimiter是数据之间的分隔符,默认是"\n",maxRetry是最大等待时间,当读取的套接字关闭后,会尝试重连多少次,每隔一秒尝试一次,默认值是0,如果是0表明不重连,如果小于0表明一直重连。

DEMO如下:

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Job {

    public static void main(String[] args) throws Exception {
        // 创建流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 创建socket流
        DataStream<String> dataStream = env.socketTextStream("localhost", 8888, "\n");
        // 打印流
        dataStream.print();
    }
}

将上面的程序编译打包生成jar包后上传到服务器。这里需要连接socket,先使用nc工具监听8888端口:nc -l 8888,然后提交flink任务。

当在nc里面输入内容后,flink的程序就可以拿到数据。

2.2 Kafka

Kafka应该是跟Flink最匹配的搭档,由于采集数据量巨大,很多系统都会将数据采集后写入Kafka,然后一个程序从kafka读取数据后落地到后端存储,另一个程序通过Flink从Kafka中读,进行逻辑处理。

第一步,添加kafka依赖,这里要注意flink和kafka的版本。

<dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka_${scala.version}</artifactId>
        <version>${flink.version}</version>
 </dependency>

其中,flink.version为flink的版本,scala.version为scala的版本。

第二步,构建kafka的配置

// 定义kafka的配置
// bootstrap.servers:kafka brokers的地址,以逗号分割
// group.id:消费者组的ID
Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", "kafka_brokers_list");
kafkaProps.setProperty("group.id", "group_id_of_consumer");

第三步,读取数据,进行解析,由于存储到kafka的数据是二进制的,需要进行反序列化,而客户端写入kafka中的数据最常用的方式就是JSON数据,因此,我们重点讨论如何将kafka中的JSON数据解析成便于flink处理的数据。

flink提供了JSONKeyValueDeserializationSchema,该类可以将kafka中的数据转换为ObjectNode,其实就相当于是个Map。假设kafka中的数据是:{"name": "luo", "age": 17, "phone": { "home": 123, "work": 456}}, 然后可以通过ObjectNode.get("value").get("name")获取到名字luo,通过Object.get("value").get("phone").get("home")就可以获取到家里的电话123。

flink中方便处理的数据类型有两种:Tuple和对象。因此,将数据用JSONKeyValueDeserializationSchema反序列化,再用map操作将数据转换成Tuple或者对象。

将数据转换成对象的实现方式:

// 传递JSONKeyValueDeserializationSchema的对象,该对象有一个参数
// 表明是否携带kafka的一些元数据
DataStreamSource<ObjectNode> kafkaStream =
        env.addSource(new FlinkKafkaConsumer<>("topic", new JSONKeyValueDeserializationSchema(false),
                kafkaProps));

// 在map的处理函数里面从ObjectNode对象中获取对应的字段,然后生成对象
kafkaStream.map(new MapFunction<ObjectNode, SwitchMetric>() {
    @Override
    public SwitchMetric map(ObjectNode jsonNodes) throws Exception {
        String name = jsonNodes.get("value").get("name").textValue();
        String addr = jsonNodes.get("value").get("addr").textValue();
        return new People(name, addr);
    }
});

将数据转换成Tuple的实现方式:

DataStreamSource<ObjectNode> kafkaStream =
        env.addSource(new FlinkKafkaConsumer<>("topic", new JSONKeyValueDeserializationSchema(false),
                kafkaProps));

kafkaStream.map(new MapFunction<ObjectNode, Tuple2<String, String>>() {
    @Override
    public Tuple2<String, String> map(ObjectNode jsonNodes) throws Exception {
        String name = jsonNodes.get("value").get("name").textValue();
        String addr = jsonNodes.get("value").get("addr").textValue();
        return Tuple2.of(name, addr);
    }
});

除了上面的先使用JSONKeyValueDeserializationSchema反序列化,再使用map操作转换成合适的类型之外,还可以使用自定义的DeserializationSchema对数据进行处理:

public static class MySchema implements KafkaDeserializationSchema<Tuple2<String, String>> {

    private final ObjectMapper mapper = new ObjectMapper();

    @Override
    public Tuple5<String, String> deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
        // 反序列化的主函数:先将数据转换成Map,然后在返回时构造新的Tuple
        // 当然,也可以在这里直接将json字符串解析成对象,不过就需要引入新的处理json的包
        Map<String,Object> t = mapper.readValue(record.value(), Map.class);
        
       return new Tuple2.of(String.valueOf(t.get("name")), String.valueOf(t.get("addr")));
    }

    @Override
    public boolean isEndOfStream(Tuple2<String, String> nextElement) {
        return false;
    }

    @Override
    public TypeInformation<Tuple2<String, String>> getProducedType() {
        return new TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
    }
}

3 自定义数据源

flink支持自定义数据源,数据源就是可以源源不断地返回数据(对于流而言),因此,数据源可以说就是一个死循环,这个死循环会持续不断地输出数据。如下例所示,MySource实现了SourceFunction接口,里面有两个方法需要实现:

DataStreamSource<People> stream = env.addSource(new MySource());
stream.print();

public static class MySource implements SourceFunction<People> {

    private volatile boolean isRunning = true;

    @Override
    public void run(SourceContext<People> sourceContext) {
        while(isRunning) {
            sourceContext.collect(new People("name", "home"));
        }
    }

    @Override
    public void cancel() {
        this.isRunning = false;
    }
}

4 小结

本文介绍了如何从数据源中读取数据,然后转换成对象或者元组以方便后续的处理,然后介绍了自定义数据源并创建了一个简单的数据源。

5 参考文档

相关文章

网友评论

      本文标题:基于Flink的实时告警实现(2):读取数据源

      本文链接:https://www.haomeiwen.com/subject/qdmdcltx.html