美文网首页
基于Flink的实时告警实现(2):读取数据源

基于Flink的实时告警实现(2):读取数据源

作者: luofengmacheng | 来源:发表于2021-03-23 22:48 被阅读0次

    本专题将会从0到1实现告警处理流程,并会讲解实现过程中使用到的Flink中的技术。

    1 Flink的工作方式

    使用Flink常用的方式是将Flink作为管道和管道之间的处理器,Flink从源中读取数据,进行逻辑计算后,将结果写入到目的,这里的源和目的可以是同一类系统,例如,都是kafka。Flink内置的和扩展的Connectors:https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/

    Source -> Transform -> Sink
    

    2 Flink中的无界流数据源

    Flink的API对于数据处理来说可以分为两大类:DataStream和DataSet。DataStream用于处理无界流,例如,socket、kafka,DataSet用于处理有界流,例如,对象集合、文件。

    2.1 socket

    socket数据源可以处理通过套接字传输来的数据:

    socketTextStream(String hostname, int port);
    socketTextStream(String hostname, int port, String delimiter);
    socketTextStream(String hostname, int port, String delimiter, long maxRetry);
    

    hostname是套接字的主机名/IP,port是套接字的端口,delimiter是数据之间的分隔符,默认是"\n",maxRetry是最大等待时间,当读取的套接字关闭后,会尝试重连多少次,每隔一秒尝试一次,默认值是0,如果是0表明不重连,如果小于0表明一直重连。

    DEMO如下:

    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    public class Job {
    
        public static void main(String[] args) throws Exception {
            // 创建流执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 创建socket流
            DataStream<String> dataStream = env.socketTextStream("localhost", 8888, "\n");
            // 打印流
            dataStream.print();
        }
    }
    

    将上面的程序编译打包生成jar包后上传到服务器。这里需要连接socket,先使用nc工具监听8888端口:nc -l 8888,然后提交flink任务。

    当在nc里面输入内容后,flink的程序就可以拿到数据。

    2.2 Kafka

    Kafka应该是跟Flink最匹配的搭档,由于采集数据量巨大,很多系统都会将数据采集后写入Kafka,然后一个程序从kafka读取数据后落地到后端存储,另一个程序通过Flink从Kafka中读,进行逻辑处理。

    第一步,添加kafka依赖,这里要注意flink和kafka的版本。

    <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.version}</artifactId>
            <version>${flink.version}</version>
     </dependency>
    

    其中,flink.version为flink的版本,scala.version为scala的版本。

    第二步,构建kafka的配置

    // 定义kafka的配置
    // bootstrap.servers:kafka brokers的地址,以逗号分割
    // group.id:消费者组的ID
    Properties kafkaProps = new Properties();
    kafkaProps.setProperty("bootstrap.servers", "kafka_brokers_list");
    kafkaProps.setProperty("group.id", "group_id_of_consumer");
    

    第三步,读取数据,进行解析,由于存储到kafka的数据是二进制的,需要进行反序列化,而客户端写入kafka中的数据最常用的方式就是JSON数据,因此,我们重点讨论如何将kafka中的JSON数据解析成便于flink处理的数据。

    flink提供了JSONKeyValueDeserializationSchema,该类可以将kafka中的数据转换为ObjectNode,其实就相当于是个Map。假设kafka中的数据是:{"name": "luo", "age": 17, "phone": { "home": 123, "work": 456}}, 然后可以通过ObjectNode.get("value").get("name")获取到名字luo,通过Object.get("value").get("phone").get("home")就可以获取到家里的电话123。

    flink中方便处理的数据类型有两种:Tuple和对象。因此,将数据用JSONKeyValueDeserializationSchema反序列化,再用map操作将数据转换成Tuple或者对象。

    将数据转换成对象的实现方式:

    // 传递JSONKeyValueDeserializationSchema的对象,该对象有一个参数
    // 表明是否携带kafka的一些元数据
    DataStreamSource<ObjectNode> kafkaStream =
            env.addSource(new FlinkKafkaConsumer<>("topic", new JSONKeyValueDeserializationSchema(false),
                    kafkaProps));
    
    // 在map的处理函数里面从ObjectNode对象中获取对应的字段,然后生成对象
    kafkaStream.map(new MapFunction<ObjectNode, SwitchMetric>() {
        @Override
        public SwitchMetric map(ObjectNode jsonNodes) throws Exception {
            String name = jsonNodes.get("value").get("name").textValue();
            String addr = jsonNodes.get("value").get("addr").textValue();
            return new People(name, addr);
        }
    });
    

    将数据转换成Tuple的实现方式:

    DataStreamSource<ObjectNode> kafkaStream =
            env.addSource(new FlinkKafkaConsumer<>("topic", new JSONKeyValueDeserializationSchema(false),
                    kafkaProps));
    
    kafkaStream.map(new MapFunction<ObjectNode, Tuple2<String, String>>() {
        @Override
        public Tuple2<String, String> map(ObjectNode jsonNodes) throws Exception {
            String name = jsonNodes.get("value").get("name").textValue();
            String addr = jsonNodes.get("value").get("addr").textValue();
            return Tuple2.of(name, addr);
        }
    });
    

    除了上面的先使用JSONKeyValueDeserializationSchema反序列化,再使用map操作转换成合适的类型之外,还可以使用自定义的DeserializationSchema对数据进行处理:

    public static class MySchema implements KafkaDeserializationSchema<Tuple2<String, String>> {
    
        private final ObjectMapper mapper = new ObjectMapper();
    
        @Override
        public Tuple5<String, String> deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
            // 反序列化的主函数:先将数据转换成Map,然后在返回时构造新的Tuple
            // 当然,也可以在这里直接将json字符串解析成对象,不过就需要引入新的处理json的包
            Map<String,Object> t = mapper.readValue(record.value(), Map.class);
            
           return new Tuple2.of(String.valueOf(t.get("name")), String.valueOf(t.get("addr")));
        }
    
        @Override
        public boolean isEndOfStream(Tuple2<String, String> nextElement) {
            return false;
        }
    
        @Override
        public TypeInformation<Tuple2<String, String>> getProducedType() {
            return new TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        }
    }
    

    3 自定义数据源

    flink支持自定义数据源,数据源就是可以源源不断地返回数据(对于流而言),因此,数据源可以说就是一个死循环,这个死循环会持续不断地输出数据。如下例所示,MySource实现了SourceFunction接口,里面有两个方法需要实现:

    DataStreamSource<People> stream = env.addSource(new MySource());
    stream.print();
    
    public static class MySource implements SourceFunction<People> {
    
        private volatile boolean isRunning = true;
    
        @Override
        public void run(SourceContext<People> sourceContext) {
            while(isRunning) {
                sourceContext.collect(new People("name", "home"));
            }
        }
    
        @Override
        public void cancel() {
            this.isRunning = false;
        }
    }
    

    4 小结

    本文介绍了如何从数据源中读取数据,然后转换成对象或者元组以方便后续的处理,然后介绍了自定义数据源并创建了一个简单的数据源。

    5 参考文档

    相关文章

      网友评论

          本文标题:基于Flink的实时告警实现(2):读取数据源

          本文链接:https://www.haomeiwen.com/subject/qdmdcltx.html