美文网首页Flink
Flink-1.12(四)Flink API

Flink-1.12(四)Flink API

作者: _大叔_ | 来源:发表于2021-06-09 18:50 被阅读0次

    Flink 开发一个简单的应用程序只需要构建环境、构建数据源、构建数据处理方案、构建数据输出及执行程序这五个步骤,但每个步骤都有对应其他强大的API,所以本文一一举例学习。

    构建环境

    流处理

    StreamExecutionEnvironment env = null;
    // 构建流环境,如果在本地则创建本地环境,如果是集群,则创建集群环境
    env  = StreamExecutionEnvironment.getExecutionEnvironment();
    // 创建本地执行环境并设置并行数
    env = StreamExecutionEnvironment.createLocalEnvironment(3);
    // 创建远程执行环境,jobmanager的IP,端口,并行度,运行程序的位置
    env = StreamExecutionEnvironment.createRemoteEnvironment("10.xxx.xx.103",6123,5,"D:/test/abc.jar");
    

    第三种方式可以直接从本地代码中构建与远程集群的Flink JobManager 的RPC连接,通过指定应用程序所在的jar包,将运行程序远程拷贝到 JobManager 节点上,然后将Flink 应用运行在远程的环境中,本地程序相当于一个客户端。

    批处理

    ExecutionEnvironment env = null;
    // 构建流环境,如果在本地则创建本地环境,如果是集群,则创建集群环境
    env  = ExecutionEnvironment.getExecutionEnvironment();
    // 创建本地执行环境并设置并行数
    env = ExecutionEnvironment.createLocalEnvironment(3);
    // 创建远程执行环境,jobmanager的IP,端口,并行度,运行程序的位置
    env = ExecutionEnvironment.createRemoteEnvironment("10.xxx.xx.103",6123,5,"D:/test/abc.jar");
    

    构建数据源

            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 从集合中读取数据
            DataStream<Integer> ds = env.fromCollection(Arrays.asList(1,2,3,4,5,6));
            // 直接读取数据
            DataStream<Integer> ds1 = env.fromElements(1,2,3,4,5);
            // 从文件读取
            DataStream<String> ds2 = env.readTextFile("D:\\workspace\\spring\\middleware\\flink\\flink-test\\src\\main\\resources\\hello.txt");
            // 从 kafka 读取
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", "10.240.30.104:9092");
            properties.setProperty("group.id", "test");
            properties.setProperty("enable.auto.commit", "true");
            properties.setProperty("auto.commit.interval.ms", "0");
            FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<String>("topic1", new SimpleStringSchema(),properties);
            DataStream<String> ds3 = env.addSource(myConsumer);
            // 自定义数据源
            DataStream<String> ds4 = env.addSource(new CustomizeSource());
    

    自定义数据源

    package com.example.demo;
    
    import org.apache.flink.streaming.api.functions.source.SourceFunction;
    
    /**
     * @author big uncle
     * @date 2021/6/3 13:54
     * @module
     **/
    public class CustomizeSource implements SourceFunction<String> {
    
        private boolean running = true;
    
        /**
         * 读取数据
        **/
        @Override
        public void run(SourceContext<String> ctx) throws Exception {
            String str = "a-b-c:";
            int i =0;
            while (running){
                i++;
                ctx.collect(str+i);
                Thread.sleep(1000);
            }
        }
    
        /**
         * 关闭
        **/
        @Override
        public void cancel() {
            running = false;
        }
    }
    

    Transform

    map fliter flatmap

        public static void main(String[] args) throws Exception {
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStream<String> dataStream = env.fromElements("a,b,c,d,e,1,2,3,4");
            // 读取值是 "a,b,c,d,e,1,2,3,4" 转变为 一个个的元素
            dataStream.flatMap(new FlatMapFunction<String, Object>() {
                @Override
                public void flatMap(String s, Collector<Object> collector) throws Exception {
                    String[] str = s.split(",");
                    for(String ss : str) {
                        collector.collect(ss);
                    }
                }
            })
            // 为 true 才会输出,过滤掉不是 a 的值
            .filter(i -> i.equals("a"))
            // 转黄给 a 添加 字符串 0
            .map(i -> i+"0")
            // 输出结果为 a0
            .print();
            env.execute("a");
        }
    

    key

    所有的聚合操作只能在 key 分组(分区)之后。sum()、min()、max()、minBy()、maxBy()、reduce() 比较器

        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStream<Integer> dataStream = env.fromElements(10,27,3,7,1,50,22,19);
            
            dataStream.keyBy(new KeySelector<Integer, Object>() {
                @Override
                // 把所有数据放到一个分区 0
                public Object getKey(Integer integer) throws Exception {
                    return 0;
                }
            })
            // 对 分区进行聚合,聚合下标为 0,注意这里是下标,最后输出数据139,会显示累加值
            .sum(0).print();
            env.execute("test");
        }
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStream<Integer> dataStream = env.fromElements(10,27,3,7,1,50,22,19);
    
            // keyBy 可以根据位置,也可以根据字段
            dataStream.keyBy(new KeySelector<Integer, Object>() {
                @Override
                // 把所有数据放到一个分区 0
                public Object getKey(Integer integer) throws Exception {
                    return 0;
                }
            })
            // 注意这里是下标,也可以是字段名
            .min(0).print();
            env.execute("test");
        }
    

    min()、minBy() ,针对元祖和对象而言,min 会对比某个字段的大小,但不会更新其他字段值,minBy() 会更新其他字段值。

        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStream<Integer> dataStream = env.fromElements(10,27,3,7,1,50,22,19);
    
            // keyBy 可以根据位置,也可以根据字段
            dataStream.keyBy(new KeySelector<Integer, Object>() {
                @Override
                // 把所有数据放到一个分区 0
                public Object getKey(Integer integer) throws Exception {
                    return 0;
                }
            })
            .reduce(new ReduceFunction<Integer>() {
                @Override
                public Integer reduce(Integer t1, Integer t2) throws Exception {
                    return t2 - t1 > 0 ? t2 : t1;
                }
            }).print();
            env.execute("test");
        }
    

    reduce 可以比较两个值,更自由一些

    split 和 Select (OutputTag)Connect和CoMap以及Union

    从代码中可以看出帮我们收集了所有数据,我们可以在做其他操作,多用于做数据标签。

        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStream<Integer> dataStream = env.fromElements(10,27,3,7,1,50,22,19);
    
            final OutputTag<String> outputTag = new OutputTag<String>("side-output"){};
    
            SingleOutputStreamOperator<Object> stream = dataStream.process(new ProcessFunction<Integer, Object>() {
    
                @Override
                public void processElement(Integer value, Context ctx, Collector<Object> out) throws Exception {
                    if(value > 10) {
                        out.collect(value);
                    }
                    ctx.output(outputTag,"其他类型的流"+value.toString());
                }
            });
            stream.print("原始数据进行过滤后:");
            DataStream<String> sideOutputStream = stream.getSideOutput(outputTag);
            sideOutputStream.print("标签处理:");
    
            // 合流
            sideOutputStream.connect(stream).flatMap(new CoFlatMapFunction<String,Object,Integer>() {
    
                // 第一个流如何处理
                @Override
                public void flatMap1(String value, Collector out) throws Exception {
                    String str = value.toString();
                    str = str.replaceAll("其他类型的流","");
                    out.collect(Integer.valueOf(str));
                }
                // 第二流如何处理
                @Override
                public void flatMap2(Object value, Collector out) throws Exception {
                    out.collect(Integer.valueOf(value.toString()));
                }
    
            }).keyBy(new KeySelector<Integer, Object>() {
                @Override
                public Object getKey(Integer value) throws Exception {
                    return 0;
                }
            }).sum(0).print("合并后输出:");
    
            env.execute("test");
        }
    
    标签处理::1> 其他类型的流3
    标签处理::2> 其他类型的流7
    标签处理::3> 其他类型的流1
    标签处理::7> 其他类型的流10
    合并后输出::6> 1
    合并后输出::6> 8
    合并后输出::6> 11
    合并后输出::6> 21
    原始数据进行过滤后::8> 27
    原始数据进行过滤后::6> 19
    标签处理::8> 其他类型的流27
    原始数据进行过滤后::5> 22
    原始数据进行过滤后::4> 50
    标签处理::5> 其他类型的流22
    标签处理::6> 其他类型的流19
    标签处理::4> 其他类型的流50
    合并后输出::6> 48
    合并后输出::6> 75
    合并后输出::6> 97
    合并后输出::6> 119
    合并后输出::6> 138
    合并后输出::6> 157
    合并后输出::6> 207
    合并后输出::6> 257
    
    

    该方法只能在以下处理时使用。

    flink 1.12 没有 Split 和 Select,换成了 OutputTag

    Union 要求多条流合并,必须是相同的数据类型,比较简单,这里就不说了。

    分区

    keyBy 默认是按照 hash 分区,除了KeyBy 还有其他的分区方式。

    • broadcast() 广播,给下游分区全部广播同一份数据
    • shuffle() 随机把当前数据发配个下游分区
    • foeward() 只放在当前分区做计算
    • rebalance() 轮询给下游分区发数据
    • rescale() 给相同分组的下游分区轮询发数据
    • global() 丢给下游分区的第一个实例
    • partitionCustom() 用户自定义重分区方式

    Sink 输出

    kafka

        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStream<Integer> dataStream = env.fromElements(10,27,3,7,1,50,22,19);
            dataStream.addSink(new FlinkKafkaProducer("topic-test",new SimpleStringSchema(),new Properties()));
            env.execute("test");
        }
    

    自定义

        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStream<Integer> dataStream = env.fromElements(10,27,3,7,1,50,22,19);
            dataStream.addSink(new MySink()).setParallelism(1);
            env.execute("test");
        }
    
        public static class MySink extends RichSinkFunction<Integer> {
            // 不建议 在open 和 clos 写类似数据库连接 把握不好..死翘翘,但可以在 open 和 clous 检查连接状态,或有链接池 获取也行
            @Override
            public void open(Configuration parameters) throws Exception {
                System.out.println("open 有多少数据呗调用多少次");
            }
    
            @Override
            public void close() throws Exception {
                System.out.println("close 有多少数据呗调用多少次");
            }
    
            @Override
            public void invoke(Integer value, Context context) throws Exception {
                // 输出数据 可以用来写 到 redis kafka  hdfs 等
                System.out.println(value);
            }
        }
    

    想要直到 flink 支持更多的 source 或 sink 可以去 maven库里 搜索 flink connector

    window

    window 就是将无限流切割为有限流的一种方式,它会将流数据分发到有限大小的桶(bucket)中进行分析。window 不是等有数据才去做窗口,而是有一个桶,等数据,桶可以存在多个。只要桶不关闭,哪怕晚迟到的数据,依然可以被分配到对应的桶里。

    基本类型

    时间窗口(Time Window)

    按时间段做为窗口,如8:01~8:02 为一个窗口
    时间窗口分为:滚动时间窗口(TumBlingWindows)、滑动时间窗口(SlidingWindows)、会话窗口(Session Windows)

    • 滚动时间窗口:时间是对齐的,如8:00~9:00 下一个窗口就是 9:00~10:00,数据不会重叠(不重复数据),如果数据刚好在整点,按照 [ ) 左开右闭,会属于新时间。


    • 滑动时间窗口:窗口长度固定,可以有重叠(重复数据)


    • 会话窗口:一定时间没有收到数据会话结束,有新的数据来,开启新的会话。session gap 为最小的间隔。


    计数窗口(Count Window)

    按多少各数做为一个窗口,如10个数 为一个窗口。
    计数窗口分为:滚动计数窗口、滑动计数窗口

    代码演示

    如果想要调用时间窗口,必须在 keyBy 之后,但也可以直接使用 windowALL() 方法,该方法把所有数据放到一个分区,并行度为一。

        public static void main(String[] args) {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStream<Integer> dataStream = env.fromElements(10,27,3,7,1,50,22,19);
            // 滚动窗口,可以看这个方法的实现类
            dataStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(15)));
            // session 窗口
            dataStream.keyBy(0).window(EventTimeSessionWindows.withGap(Time.minutes(1)));
            // 滚动窗口
            dataStream.keyBy(0).timeWindow(Time.seconds(15));
            // 滑动窗口
            dataStream.keyBy(0).timeWindow(Time.seconds(20),Time.seconds(5));
            // 滑动窗口
            dataStream.keyBy(0).window(SlidingProcessingTimeWindows.of(Time.seconds(20),Time.seconds(5)));
            // 滚动计数窗口
            dataStream.keyBy(0).countWindow(10);
            // 滑动计数窗口
            dataStream.keyBy(0).countWindow(10,3);
        }
    

    窗口函数

    增量聚合函数

    每条数据到来进行计算,保持一个简单的状态。一定会等到时间点(窗口结束点)到才会输出结果。这种窗口可以做一些max、min等比较,累计的函数

    • ResuceFunction
    • AggregateFunction
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStream<String> dataStream = env.socketTextStream("192.168.200.58",7777);
            // 滑动窗口
            dataStream.map(i -> Integer.valueOf(i)).keyBy(new KeySelector<Integer, Object>() {
                @Override
                public Object getKey(Integer value) throws Exception {
                    return 0;
                }
            }).window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    //               .reduce(new ReduceFunction<Integer>() {
    //                   // 比较两个数大小等 可以实现 max min sum 等操作
    //                   @Override
    //                   public Integer reduce(Integer value1, Integer value2) throws Exception {
    //                       return null;
    //                   }
    //               });
                    // 实现一个计数器,统计有多少个数据
                    .aggregate(new AggregateFunction<Integer, Integer, Integer>() {
                        // 创建一个累加器,初始值
                        @Override
                        public Integer createAccumulator() {
                            return 0;
                        }
    
                        // 累加规则
                        @Override
                        public Integer add(Integer value, Integer accumulator) {
                            return accumulator + 1;
                        }
    
                        // 输出结果
                        @Override
                        public Integer getResult(Integer accumulator) {
                            return accumulator;
                        }
    
                        // merge一般使用的是 session window 做合并操作
                        @Override
                        public Integer merge(Integer a, Integer b) {
                            return null;
                        }
                    }).print();
            env.execute("test");
        }
    
    全窗口函数

    先把窗口所有数据收集起来,等到计算的时候会便利所有数据。这种窗口可以做一些中位数或平均数等操作。

    • ProcessWindowFunction
    • WindowFunction
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStream<String> dataStream = env.socketTextStream("192.168.200.58",7777);
            // 滑动窗口
            dataStream.map(i -> Integer.valueOf(i)).keyBy(new KeySelector<Integer, Object>() {
                @Override
                public Object getKey(Integer value) throws Exception {
                    return 0;
                }
            }).window(TumblingProcessingTimeWindows.of(Time.seconds(5))) // 攒齐5s 的数据,求平均值
                    // 参数1 输入类型,2 输出类型,3 key的类型,4 就是个TimeWindow
                    .apply(new WindowFunction<Integer, Integer, Object, TimeWindow>() {
                        // 参数1 key的类型,2 就是window,3 所有输入的数据,4 输出收集器
                        @Override
                        public void apply(Object o, TimeWindow window, Iterable<Integer> input, Collector<Integer> out) throws Exception {
                            List<Integer> list = IteratorUtils.toList(input.iterator());
                            int count = list.size();
                            int sum = list.stream().reduce((i,y) -> i+y).get();
                            out.collect(sum/count);
                        }
                    }).print();
    
            env.execute("test");
        }
    

    把 apply 换成 process 是一样的。

                    .process(new ProcessWindowFunction<Integer, Integer, Object, TimeWindow>() {
                        // context 会拿到的信息比 window 多,包含了 window
                        @Override
                        public void process(Object o, Context context, Iterable<Integer> elements, Collector<Integer> out) throws Exception {
                            List<Integer> list = IteratorUtils.toList(elements.iterator());
                            int count = list.size();
                            int sum = list.stream().reduce((i,y) -> i+y).get();
                            out.collect(sum/count);
                        }
                    }).print();
    

    在滑动时间窗口,或滑动计数中,如下:

    我们会想滑动计数,应该是10个数,来2个滑动一次,但实际流操作中,当你输入2个数,就会输出一次,如下:

    假设我们的程序是算平均值,第一次滑动是 (27+10)/2 第二次滑动则是 (27+10+3+7)/4 依此类推。这种方式是等补齐滑动窗口的长度,才会全额计算,就是10个一算。

    其他可选API
    • trigger() 触发器:定义windows什么时候关闭,触发计算并输出结果,分两步的原因是,考虑数据会迟到,迟到的数据到windows关闭的时间,可以先输出结果,在等等认为会迟到的数据,然后关闭。
    • evictor 移除器:定义某些数据的逻辑。
    • allowedLateness() 允许处理迟到的数据,5秒后输出结果,等1分钟才关闭窗口,但1分钟内该窗口会实时更新数据,不断输出新逻辑数据。
    window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                    .allowedLateness(Time.minutes(1));
    
    • sideOutputLateData() 将迟到的数据放入侧输出流
    • getSideOutput() 获取侧输出流
            // 标记输出流(侧输出流)
            OutputTag<Integer> outputTag = new OutputTag<Integer>("late"){};
    
            SingleOutputStreamOperator<Integer> ds = dataStream.map(i -> Integer.valueOf(i)).keyBy(new KeySelector<Integer, Object>() {
                @Override
                public Object getKey(Integer value) throws Exception {
                    return 0;
                }
            }).window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                    .allowedLateness(Time.minutes(1))
                    // 如果1分钟后还有迟到数据,就放到一个侧输出流里,给一个标记,也可以不搭配 allowedLateness() 使用
                    .sideOutputLateData(outputTag)
                    .sum(0);
    
            ds.getSideOutput(outputTag).print();
    

    这种等的方式,迟到数据底属于 1窗口 还是属于 2窗口呢?那就需要和数据本身的时间作比较了,所以以上只是API的调用,但不建议这样使用,针对迟到数据我们一定会用迟到数据的本身时间,来决定他在哪个窗口。

    ProcessFunction API

    ProcessFunction 可以访问时间戳、watermark以及注册定时事件。还可以输出特定的一些事件,例如超时事件等。ProcessFunction用来构建事件驱动的应用以及实现自定义的业务逻辑(使用之前的window函数和转换算子、Richfunction都无法实现)。例如,flink sql 就是使用ProcessFunction 实现的。

    flink 提供了8个 ProcessFunction

    • ProcessFunction
    • KeyedProcessFunction 分组之后得到KeyedStream,调用该 ProcessFunction
    • CoProcessFunction
    • ProcessJooinFunction
    • BroadcastProcessFunction
    • KeyedBroadcastProcessFunction
    • ProcessWindowFunction
    • ProcessAllWindowFunction

    KeyedProcessFunction

    public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            DataStream<String> dataStream = env.socketTextStream("192.168.200.58", 7777);
            // 数据转换
            DataStream<EventData> stream = dataStream.map(new MapFunction<String, EventData>() {
                @Override
                public EventData map(String value) throws Exception {
                    String[] strs = value.split(",");
                    return new EventData(
                            strs[0],
                            Long.valueOf(strs[1]),
                            String.valueOf((Long.valueOf(strs[1])-5)),
                            Integer.valueOf(strs[3])
                    );
                }
            });
            stream.keyBy(new KeySelector<EventData, String>() {
                @Override
                public String getKey(EventData value) throws Exception {
                    return value.getId();
                }
            }).process(new MyProcess()).print();
            env.execute("test");
        }
    
        public static class MyProcess extends KeyedProcessFunction<String,EventData,Integer> {
            @Override
            public void processElement(EventData value, Context ctx, Collector<Integer> out) throws Exception {
                out.collect(value.hashCode());
                // context
                ctx.timestamp(); //获取数据时间戳
                ctx.getCurrentKey();// 获取key
                //ctx.output();//测输出流,可以通过构造函数吧 测输出流传入
                ctx.timerService().currentProcessingTime();// 获取处理时间(属于时间语义的一种)
                ctx.timerService().currentWatermark();// 获取水位(事件时间)(属于时间语义的一种)
                ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 1000);//处理时间
                // 事件时间,让定时器 延时10s 处理
                ctx.timerService().registerEventTimeTimer((value.getEventTime() + 10) * 1000L);
                // 删除处理定时器,根据相同的时间为标准,有多个定时器,每个定时器的时间肯定不同,根据时间进行删除
                ctx.timerService().deleteProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 1000);
                ctx.timerService().deleteEventTimeTimer((value.getEventTime() + 10) * 1000L);
            }
            // 触发定时器 timestamp,就是触发时间
            @Override
            public void onTimer(long timestamp, OnTimerContext ctx, Collector<Integer> out) throws Exception {
                super.onTimer(timestamp, ctx, out);
                ctx.getCurrentKey();
    //            ctx.output();
                // 查看基于什么类型的时间
                ctx.timeDomain();
            }
        }
    

    (1)处理时间——调⽤ ctx.timerService().registerProcessingTimeTimer()注册;onTimer()在系统时间戳达到Timer设定的时间戳时触发。
    (2)事件时间——调⽤ ctx.timerService().registerEventTimeTimer()注册;onTimer()在Flink内部⽔印达到或超过Timer设定的时间戳时触发。

    ProcessFunction 相关视频

    相关文章

      网友评论

        本文标题:Flink-1.12(四)Flink API

        本文链接:https://www.haomeiwen.com/subject/nekurltx.html