美文网首页flink
Flink 从 0 到 1 学习 —— Apache Flink

Flink 从 0 到 1 学习 —— Apache Flink

作者: tracy_668 | 来源:发表于2021-01-05 08:04 被阅读0次

    [TOC]
    来源于http://www.54tianzhisheng.cn/2018/10/13/flink-introduction/

    Flink 是一种流式计算框架,为什么我会接触到 Flink 呢?

    因为我目前在负责的是监控平台的告警部分,负责采集到的监控数据会直接往 kafka 里塞,然后告警这边需要从 kafka topic 里面实时读取到监控数据,并将读取到的监控数据做一些 聚合/转换/计算 等操作,然后将计算后的结果与告警规则的阈值进行比较,然后做出相应的告警措施(钉钉群、邮件、短信、电话等)。画了个简单的图如下:

    image.png

    目前告警这块的架构是这样的结构,刚进公司那会的时候,架构是所有的监控数据直接存在 ElasticSearch 中,然后我们告警是去 ElasticSearch 中搜索我们监控指标需要的数据,幸好 ElasticSearch 的搜索能力够强大。但是你有没有发现一个问题,就是所有的监控数据从采集、采集后的数据做一些 计算/转换/聚合、再通过 Kafka 消息队列、再存进 ElasticSearch 中,再而去 ElasticSearch 中查找我们的监控数据,然后做出告警策略。整个流程对监控来说看起来很按照常理,但是对于告警来说,如果中间某个环节出了问题,比如 Kafka 消息队列延迟、监控数据存到 ElasticSearch 中写入时间较长、你的查询姿势写的不对等原因,这都将导致告警从 ElasticSearch 查到的数据是有延迟的。也许是 30 秒、一分钟、或者更长,这样对于告警来说这无疑将导致告警的消息没有任何的意义。

    为什么这么说呢?为什么需要监控告警平台呢?无非就是希望我们能够尽早的发现问题,把问题给告警出来,这样开发和运维人员才能够及时的处理解决好线上的问题,以免给公司造成巨大的损失。

    更何况现在还有更多的公司在做那种提前预警呢!这种又该如何做呢?需要用大数据和机器学习的技术去分析周期性的历史数据,然后根据这些数据可以整理出来某些监控指标的一些周期性(一天/七天/一月/一季度/一年)走势图,这样就大概可以绘图出来。然后根据这个走势图,可以将当前时间点的监控指标的数据使用量和走势图进行对比,在快要达到我们告警规则的阈值时,这时就可以提前告一个预警出来,让运维提前知道预警,然后提前查找问题,这样就能够提早发现问题所在,避免损失,将损失降到最小!当然,这种也是我打算做的,应该可以学到不少东西的。

    于是乎,我现在就在接触流式计算框架 Flink,类似的还有常用的 Spark 等。

    数据集类型有哪些呢:

    • 无穷数据集:无穷的持续集成的数据集合
    • 有界数据集:有限不会改变的数据集合

    数据运算模型有哪些呢:

    • 流式:只要数据一直在产生,计算就持续地进行
    • 批处理:在预先定义的时间内运行计算,当完成时释放计算机资源

    Flink 它可以处理有界的数据集、也可以处理无界的数据集、它可以流式的处理数据、也可以批量的处理数据。

    Flink 是什么 ?

    image.png image.png image.png

    从下至上,Flink 整体结构


    image.png

    1、部署:Flink 支持本地运行、能在独立集群或者在被 YARN 或 Mesos 管理的集群上运行, 也能部署在云上。

    2、运行:Flink 的核心是分布式流式数据引擎,意味着数据以一次一个事件的形式被处理。

    3、API:DataStream、DataSet、Table、SQL API。

    4、扩展库:Flink 还包括用于复杂事件处理,机器学习,图形处理和 Apache Storm 兼容性的专用代码库。

    Flink 数据流编程模型

    抽象级别
    Flink 提供了不同的抽象级别以开发流式或批处理应用。

    image.png
    • 最底层提供了有状态流。它将通过 过程函数(Process Function)嵌入到 DataStream API 中。它允许用户可以自由地处理来自一个或多个流数据的事件,并使用一致、容错的状态。除此之外,用户可以注册事件时间和处理事件回调,从而使程序可以实现复杂的计算。
    • DataStream / DataSet API 是 Flink 提供的核心 API ,DataSet 处理有界的数据集,DataStream 处理有界或者无界的数据流。用户可以通过各种方法(map / flatmap / window / keyby / sum / max / min / avg / join 等)将数据进行转换 / 计算。
    • Table API 是以 表 为中心的声明式 DSL,其中表可能会动态变化(在表达流数据时)。Table API 提供了例如 select、project、join、group-by、aggregate 等操作,使用起来却更加简洁(代码量更少)。

    你可以在表与 DataStream/DataSet 之间无缝切换,也允许程序将 Table API 与 DataStream 以及 DataSet 混合使用。

    • Flink 提供的最高层级的抽象是 SQL 。这一层抽象在语法与表达能力上与 Table API 类似,但是是以 SQL查询表达式的形式表现程序。SQL 抽象与 Table API 交互密切,同时 SQL 查询可以直接在 Table API 定义的表上执行。

    Flink 程序与数据流结构

    image.png

    Flink 应用程序结构就是如上图所示:

    1、Source: 数据源,Flink 在流处理和批处理上的 source 大概有 4 类:基于本地集合的 source、基于文件的 source、基于网络套接字的 source、自定义的 source。自定义的 source 常见的有 Apache kafka、Amazon Kinesis Streams、RabbitMQ、Twitter Streaming API、Apache NiFi 等,当然你也可以定义自己的 source。

    1. 2、Transformation:数据转换的各种操作,有 Map / FlatMap / Filter / KeyBy / Reduce / Fold / Aggregations / Window / WindowAll / Union / Window join / Split / Select / Project 等,操作很多,可以将数据转换计算成你想要的数据。

    3、Sink:接收器,Flink 将转换计算后的数据发送的地点 ,你可能需要存储下来,Flink 常见的 Sink 大概有如下几类:写入文件、打印出来、写入 socket 、自定义的 sink 。自定义的 sink 常见的有 Apache kafka、RabbitMQ、MySQL、ElasticSearch、Apache Cassandra、Hadoop FileSystem 等,同理你也可以定义自己的 sink。

    为什么选择 Flink?

    Flink 是一个开源的分布式流式处理框架:

    ①提供准确的结果,甚至在出现无序或者延迟加载的数据的情况下。

    ②它是状态化的容错的,同时在维护一次完整的的应用状态时,能无缝修复错误。

    ③大规模运行,在上千个节点运行时有很好的吞吐量和低延迟。

    更早的时候,我们讨论了数据集类型(有界 vs 无穷)和运算模型(批处理 vs 流式)的匹配。Flink 的流式计算模型启用了很多功能特性,如状态管理,处理无序数据,灵活的视窗,这些功能对于得出无穷数据集的精确结果是很重要的。

    分布式运行
    flink 作业提交架构流程可见下图:

    image.png

    1、Program Code:我们编写的 Flink 应用程序代码

    2、Job Client:Job Client 不是 Flink 程序执行的内部部分,但它是任务执行的起点。 Job Client 负责接受用户的程序代码,然后创建数据流,将数据流提交给 Job Manager 以便进一步执行。 执行完成后,Job Client 将结果返回给用户

    image.png

    3、Job Manager:主进程(也称为作业管理器)协调和管理程序的执行。 它的主要职责包括安排任务,管理checkpoint ,故障恢复等。机器集群中至少要有一个 master,master 负责调度 task,协调 checkpoints 和容灾,高可用设置的话可以有多个 master,但要保证一个是 leader, 其他是 standby; Job Manager 包含 Actor system、Scheduler、Check pointing 三个重要的组件

    4、Task Manager:从 Job Manager 处接收需要部署的 Task。Task Manager 是在 JVM 中的一个或多个线程中执行任务的工作节点。 任务执行的并行性由每个 Task Manager 上可用的任务槽决定。 每个任务代表分配给任务槽的一组资源。 例如,如果 Task Manager 有四个插槽,那么它将为每个插槽分配 25% 的内存。 可以在任务槽中运行一个或多个线程。 同一插槽中的线程共享相同的 JVM。 同一 JVM 中的任务共享 TCP 连接和心跳消息。Task Manager 的一个 Slot 代表一个可用线程,该线程具有固定的内存,注意 Slot 只对内存隔离,没有对 CPU 隔离。默认情况下,Flink 允许子任务共享 Slot,即使它们是不同 task 的 subtask,只要它们来自相同的 job。这种共享可以有更好的资源利用率。

    image.png

    安装以及demo

    安装 Flink
    2、在 Mac OS X 上安装 Flink 是非常方便的。推荐通过 homebrew 来安装。

    brew install apache-flink

    3、检查安装:

    flink --version

    Version: 1.10.1, Commit ID: c5915cf
    
    

    /usr/local/Cellar/apache-flink/1.10.1/libexec/bin

    ./start-cluster.sh

    接着就可以进入 web 页面(http://localhost:8081/) 查看

    image.png

    demo

    1、新建一个 maven 项目

    image.png

    创建一个 SocketTextStreamWordCount 文件,加入以下代码:

    package com.zhisheng.flink;
    
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.util.Collector;
    
    /**
     * Created by zhisheng_tian on 2018/9/18
     */
    public class SocketTextStreamWordCount {
        public static void main(String[] args) throws Exception {
            //参数检查
            if (args.length != 2) {
                System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>");
                return;
            }
    
            String hostname = args[0];
            Integer port = Integer.parseInt(args[1]);
    
    
            // set up the streaming execution environment
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            //获取数据
            DataStreamSource<String> stream = env.socketTextStream(hostname, port);
    
            //计数
            SingleOutputStreamOperator<Tuple2<String, Integer>> sum = stream.flatMap(new LineSplitter())
                    .keyBy(0)
                    .sum(1);
    
            sum.print();
    
            env.execute("Java WordCount from SocketTextStream Example");
        }
    
        public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) {
                String[] tokens = s.toLowerCase().split("\\W+");
    
                for (String token: tokens) {
                    if (token.length() > 0) {
                        collector.collect(new Tuple2<String, Integer>(token, 1));
                    }
                }
            }
        }
    }
    

    接着进入工程目录,使用以下命令打包。

    mvn clean package -Dmaven.test.skip=true
    
    

    然后我们开启监听 9000 端口:

    nc -l 9000

    最后进入 flink 安装目录 bin 下执行以下命令跑程序:

    flink run -c com.tracy.SocketTextStreamWordCount /Users/xxx/xxx/project/flinkstudy/target/flink-study-1.0-SNAPSHOT.jar 127.0.0.1 9000
    
    

    执行完上述命令后,我们可以在 webUI 中看到正在运行的程序:

    image.png

    我们可以在 nc 监听端口中输入 text,比如:

    flinkstudy xxx$ nc -l 9000
    hello flink
    good niha
    nice 
    very good
    
    

    然后我们通过 tail 命令看一下输出的 log 文件,来观察统计结果。进入目录 apache-flink/1.10.1/libexec/log,执行以下命令:

    tail -f flink-xxx-taskexecutor-0-xxx.local.out

    (hello,1)
    (flink,1)
    (good,1)
    (niha,1)
    (nice,1)
    (very,1)
    (good,2)
    
    

    相关文章

      网友评论

        本文标题:Flink 从 0 到 1 学习 —— Apache Flink

        本文链接:https://www.haomeiwen.com/subject/bkouoktx.html