美文网首页产品
2022-05-22-Flink-46(五)

2022-05-22-Flink-46(五)

作者: 冰菓_ | 来源:发表于2022-05-22 13:00 被阅读0次

    1. 数据分区

    随机和轮询分区,广播,全局分区
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
    
    public class richRescale {
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStreamSource<Integer> source = env.addSource(new RichParallelSourceFunction<Integer>() {
                @Override
                public void run(SourceContext<Integer> ctx) throws Exception {
                    for (int i = 1; i < 10; i++) {
                        if (i % 2 == getRuntimeContext().getIndexOfThisSubtask()) {
                            System.out.println(getRuntimeContext().getIndexOfThisSubtask());
                            ctx.collect(i);
                        }
                    }
                }
    
                @Override
                public void cancel() {
    
                }
            });
    
            source.setParallelism(2).rescale().print().setParallelism(4);
            source.setParallelism(2).rebalance().print().setParallelism(4);
            source.broadcast().print().setParallelism(4);
            source.global().print().setParallelism(4);
    
    
            env.execute();
        }
    }
    
    自定义重分区

    2. Sink

    写入文件系统
    import org.apache.flink.api.common.serialization.SimpleStringEncoder;
    import org.apache.flink.core.fs.Path;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
    import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
    import org.apache.flink.util.TimeUtils;
    
    import java.util.concurrent.TimeUnit;
    
    public class toFile {
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStreamSource<Event> source = env.fromElements(
                    new Event("小米", "0001", 100000L),
                    new Event("小红", "0002", 200000L),
                    new Event("小黄", "0003", 200000L)
    
            );
            StreamingFileSink<String> build = StreamingFileSink.<String>forRowFormat(new Path("./output"), new SimpleStringEncoder<>("UTF-8"))
                    .withRollingPolicy(
                            DefaultRollingPolicy.builder()
                                    .withMaxPartSize(1024 * 1024 * 1024)
                                    .withRolloverInterval(TimeUnit.MINUTES.toMinutes(15))
                                    .withInactivityInterval(TimeUnit.MINUTES.toMinutes(15))
                                    .build()
                    ).build();
            source.map(Event::toString).addSink(build);
    
            env.execute();
        }
    }
    
    写入Kafka

    在实际的生产环境中,经常会有一些场景,需要将Flink处理后的数据快速地写入到一个分布式、高吞吐、高可用、可用保证Exactly Once的消息中间件中,供其他的应用消费处理后的数据。Kafka就是Flink最好的黄金搭档,Flink不但可以从Kafka中消费数据,还可以将处理后的数据写入到Kafka,并且吞吐量高、数据安全、可以保证Exactly Once等。
    Flink可以和Kafka多个版本整合,比如0.11.x、1.x、2.x等,从Flink1.9开始,使用的是kafka 2.2的客户端,所以这里使用kafka的版本是2.2.2,并且使用最新的API。
    下面的例子就是将数据写入到Kafka中,首先要定义一个类实现KafkaSerializationSchema接口,指定一个泛型,String代表要写入到Kafka的数据为String类型。该类的功能是指定写入到Kafka中数据的序列化Schema,需要重写serialize方法,将要写入的数据转成二进制数组,并封装到一个ProducerRecord中返回。

    //自定义String类型数据Kafka的序列化Schema
    public class KafkaStringSerializationSchema implements KafkaSerializationSchema<String> {
        private String topic;
        private String charset;
        //构造方法传入要写入的topic和字符集,默认使用UTF-8
        public KafkaStringSerializationSchema(String topic) {
            this(topic, “UTF-8”);
        }
        public KafkaStringSerializationSchema(String topic, String charset) {
            this.topic = topic;
            this.charset = charset;
        }
        //调用该方法将数据进行序列化
        @Override
        public ProducerRecord<byte[], byte[]> serialize(
                String element, @Nullable Long timestamp) {
            //将数据转成bytes数组
            byte[] bytes = element.getBytes(Charset.forName(charset));
            //返回ProducerRecord
            return new ProducerRecord<>(topic, bytes);
        }
    }
    

    然后将Kafka相关的参数设置到Properties中,再new FlinkKafkaProducer,将要写入的topic名称、Kafka序列化Schema、Properties和写入到Kafka的Semantic语义作为FlinkKafkaProducer构造方法参数传入。最好调用addSink方法将FlinkKafkaProducer的引用传入到该方法中。虽然下面的代码指定了EXACTLY_ONCE语义,但是没有开启Checkpointing,是没法实现的。

    DataStream<String> dataSteam = …
    //写入Kafka的topic
    String topic = “test”;
    //设置Kafka相关参数
    Properties properties = new Properties();
    properties.setProperty(“bootstrap.servers”, “node-1:9092,node-2:9092,node-3:9092”);
    //创建FlinkKafkaProducer
    FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>(
            topic, //指定topic
            new KafkaStringSerializationSchema(topic), //指定写入Kafka的序列化Schema
            properties, //指定Kafka的相关参数
            FlinkKafkaProducer.Semantic.EXACTLY_ONCE //指定写入Kafka为EXACTLY_ONCE语义
    );
    //添加KafkaSink
    dataSteam.addSink(kafkaProducer);
    
    写入Ridis
    <dependency>
        <groupId>org.apache.bahir</groupId>
        <artifactId>flink-connector-redis_2.12</artifactId>
        <version>1.1-SNAPSHOT</version>
    </dependency>
    

    定义一个类(或者静态内部类)实现RedisMapper即可,需要指定一个泛型,这里是Tuple2<String, Integer>,即写入到Redis中的数据的类型,并实现三个方法。第一个方法是getCommandDescription方法,返回RedisCommandDescription实例,在该构造方法中可以指定写入到Redis的方法类型为HSET,和Redis的additionalKey即value为HASH类型外面key的值;第二个方法getKeyFromData是指定value为HASH类型对应key的值;第三个方法geVauleFromData是指定value为HASH类型对应value的值

    public static class RedisWordCountMapper implements RedisMapper<Tuple2<String, Integer>> {
        @Override
        public RedisCommandDescription getCommandDescription() {
            //写入Redis的方法,value使用HASH类型,并指定外面key的值得名称
            return new RedisCommandDescription(RedisCommand.HSET, “WORD_COUNT”);
        }
        @Override
        public String getKeyFromData(Tuple2<String, Integer> data) {
            return data.f0; //指定写入Redis的value里面key的值
        }
        @Override
        public String getValueFromData(Tuple2<String, Integer> data) {
            return data.f1.toString(); //指定写入value里面value的值
        }
    }
    DataStream<Tuple2<String, Integer>> result = wordAndOne.keyBy(0).sum(1);//设置
    Redis的参数,如地址、端口号等
    FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost(“localhost”).setPassword(“123456”).build();
    
    //将数据写入Redis
    result.addSink(new RedisSink<>(conf, new RedisWordCountMapper()));
    
    写入ES
    写入Mysql
    写入自定义Sink

    3.时间和窗口

    时间语义
    1. 处理时间
    2. 事件时间
    水位线

    实际应用中,如果数据量很大,很多数据的当前时间戳戳就会是一样的,这样每来一条数据就提取时间戳,插入水位线,就显得非常浪费,所以策略是周期性的插入一条水位线

    乱序流中周期性生成水位线

    如果我们考虑到大量的数据同时到来的处理效率,我们同样可以周期性的生成水位线,这样的乱序数据就会导致计算误差,迟到的数据就会落到不属于它的窗口,前一个窗口丢失数据,后一个窗口统计了不属于他的窗口
    为了让窗口能够正确收集迟到的数据,我们也可以等上一段时间,比如两秒,也就是用当前已有数据的最大时间戳减去2秒,就是要插入的水位线的时间戳(我们认为会延迟两秒,但是可能不值延迟两秒,导致窗口丢失数据的)

    水位线的特性
    1. 水位线是插入到数据流中的一个标记,可以认为是一个特殊的数据
    2. 水位线的内容是一个时间戳,用来表示当前事件时间的进展
    3. 水位线是基于数据的时间戳生成的
    4. 水位线的时间戳必须单调递增,以确保任务的事件时间时钟一直向前推进
    5. 水位线可以通过设置延迟,来保证正确处理乱序数据
    6. 一个水位线Watermark(t),表示在当前流中事件时间已经到达了时间戳t,这代表t之前的所有数据都到齐了,之后流中不会出现时间戳t` <= t 的数据
    水位线的生成策略
    1. 有序流的watermark生成
    import DAY2.Event;
    import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    public class demo1 {
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(new Configuration());
            env.getConfig().setAutoWatermarkInterval(100L);
            env.fromElements(
                    new Event("小米", "0001", 100000L),
                    new Event("小红", "0002", 200000L),
                    new Event("小黄", "0003", 200000L)
            ).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                @Override
                public long extractTimestamp(Event element, long recordTimestamp) {
                    return element.timestamp;
                }
            })).print();
    
            env.execute();
        }
    }
    
    1. 乱序流的watermark生成
    import DAY2.Event;
    import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import java.time.Duration;
    
    public class demo2 {
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(new Configuration());
            env.getConfig().setAutoWatermarkInterval(100L);
            env.fromElements(
                    new Event("小米", "0001", 100000L),
                    new Event("小红", "0002", 200000L),
                    new Event("小黄", "0003", 200000L)
            ).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofMinutes(1L)).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                @Override
                public long extractTimestamp(Event element, long recordTimestamp) {
                    return element.timestamp;
                }
            })).print();
    
            env.execute();
        }
    }
    
    1. Assigner with periodic waterm
    2. Assigner with punctuated watermark
    水位线的传递

    4. 窗口

    窗口的基本概念

    window 是一种切割无限数据为有限块进行处理的手段

    窗口的分类
    1. 时间窗口,计数窗口
    2. 滚动窗口,滑动窗口,会话窗口(合并)
    3. 全局窗口(自定义触发器)
    窗口API类型
    1. 非按键分区(Non-Keyed Windows)
    2. 按键分区窗口(Keyed Windows)
    窗口函数
    1. 增量聚合函数
      归约函数/全窗口函数
    
    import DAY2.Event;
    import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.api.common.functions.ReduceFunction;
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.api.common.typeinfo.Types;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.configuration.ConfigConstants;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.configuration.RestOptions;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.source.SourceFunction;
    import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
    import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    
    import java.time.Duration;
    import java.util.Random;
    
    public class reduceWindow {
    
        public static void main(String[] args) throws Exception {
            Configuration conf= new Configuration();
            conf.setBoolean(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, true);
            //自定义端口
            conf.setInteger(RestOptions.PORT, 8050);
            StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
            // 默认是200L
            env.getConfig().setAutoWatermarkInterval(1000L);
            env.setParallelism(1);
    
    
            // 随机生成事件数据
    
            DataStreamSource<Event> addSource = env.addSource(new SourceFunction<Event>() {
                private Boolean flag = true;
    
                @Override
                public void run(SourceContext<Event> ctx) throws Exception {
                    Random random = new Random();
                    String[] uid = new String[]{"小", "中", "大"};
                    String[] url = new String[]{"./click", "./show", "./slide"};
                    while (flag) {
                        ctx.collect(new Event(uid[random.nextInt(uid.length)], url[random.nextInt(url.length)], System.currentTimeMillis()));
                        Thread.sleep(100L);
                    }
                }
    
                @Override
                public void cancel() {
                    flag = false;
                }
            });
    
            // 乱序数据
            SingleOutputStreamOperator<Event> outputStreamOperator = addSource.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2L)).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                @Override
                public long extractTimestamp(Event element, long recordTimestamp) {
                    return element.timestamp;
                }
            }));
    
            // map 和 keyby
            outputStreamOperator.map(data -> Tuple2.of(data.user,1L)).returns(Types.TUPLE(Types.STRING,Types.LONG))
                    .keyBy(data -> data.f0)
                    // 滑动窗口
                    //.window(SlidingEventTimeWindows.of(Time.milliseconds(1000L),Time.seconds(100L))
                    // 滚动窗口
                    .window(TumblingEventTimeWindows.of(Time.seconds(1)))
                    .reduce(new ReduceFunction<Tuple2<String, Long>>() {
                        @Override
                        public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
                            return Tuple2.of(value1.f0,value1.f1+ value2.f1);
                        }
                    }).print();
    
            env.execute();
    
        }
    }
    
    
    1. 聚合函数(aggregate)
    
    import DAY2.Event;
    import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.api.common.functions.AggregateFunction;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.api.java.tuple.Tuple3;
    import org.apache.flink.configuration.ConfigConstants;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.configuration.RestOptions;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.source.SourceFunction;
    import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
    import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    
    import java.time.Duration;
    import java.util.Date;
    import java.util.Random;
    
    public class windowAggr_AVG {
    
        public static void main(String[] args) throws Exception {
            Configuration conf= new Configuration();
            conf.setBoolean(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, true);
            //自定义端口
            conf.setInteger(RestOptions.PORT, 8050);
            StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
            // 默认是200L
            env.getConfig().setAutoWatermarkInterval(1000L);
            env.setParallelism(1);
    
    
            // 随机生成事件数据
    
            DataStreamSource<Event> addSource = env.addSource(new SourceFunction<Event>() {
                private Boolean flag = true;
    
                @Override
                public void run(SourceContext<Event> ctx) throws Exception {
                    Random random = new Random();
                    String[] uid = new String[]{"小", "中", "大"};
                    String[] url = new String[]{"./click", "./show", "./slide"};
                    while (flag) {
                        ctx.collect(new Event(uid[random.nextInt(uid.length)], url[random.nextInt(url.length)], System.currentTimeMillis()));
                        Thread.sleep(200L);
                    }
                }
    
                @Override
                public void cancel() {
                    flag = false;
                }
            });
    
            // 乱序数据
            SingleOutputStreamOperator<Event> outputStreamOperator = addSource.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2L)).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                @Override
                public long extractTimestamp(Event element, long recordTimestamp) {
                    return element.timestamp;
                }
            }));
    
    
            outputStreamOperator.keyBy(data -> data.user)
                    .window(TumblingEventTimeWindows.of(Time.seconds(2)))
                    .aggregate(new AggregateFunction<Event, Tuple3<Long,Integer,String>, Tuple2<String,Date>>() {
                        @Override
                        public Tuple3<Long, Integer, String> createAccumulator() {
                            //初始化
                            return Tuple3.of(0L,0,"");
                        }
    
                        @Override
                        public Tuple3<Long, Integer, String> add(Event value, Tuple3<Long, Integer, String> accumulator) {
                            //中间ACC计算
                            return Tuple3.of(accumulator.f0+value.timestamp , accumulator.f1+1,value.user);
                        }
    
                        @Override
                        public Tuple2<String, Date> getResult(Tuple3<Long, Integer, String> accumulator) {
                            return Tuple2.of(accumulator.f2, new Date(accumulator.f0 / accumulator.f1));
                        }
                        //用于会话窗口
                        @Override
                        public Tuple3<Long, Integer, String> merge(Tuple3<Long, Integer, String> a, Tuple3<Long, Integer, String> b) {
                            return Tuple3.of(a.f0+b.f0,a.f1+b.f1,b.f2);
                        }
                    }).print();
    
            env.execute();
    
    
        }
    }
    
    
    1. 全窗口函数(full window functions)
      与增量聚合函数不同,全窗口函数需要先收集窗口中的数据,并在内部缓存起来,等到窗口要输出结果的时候再取出数据进行计算
      窗口函数
      处理窗口函数
    
    import DAY2.Event;
    import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.api.common.typeinfo.Types;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.configuration.ConfigConstants;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.configuration.RestOptions;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.source.SourceFunction;
    import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
    import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    import org.apache.flink.util.Collector;
    
    import java.sql.Timestamp;
    import java.time.Duration;
    import java.util.HashSet;
    import java.util.Random;
    
    public class proessWindow {
    
        public static void main(String[] args) throws Exception {
            Configuration conf= new Configuration();
            conf.setBoolean(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, true);
            //自定义端口
            conf.setInteger(RestOptions.PORT, 8050);
            StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
            // 默认是200L
            env.getConfig().setAutoWatermarkInterval(200L);
            env.setParallelism(1);
    
    
            // 随机生成事件数据
    
            DataStreamSource<Event> addSource = env.addSource(new SourceFunction<Event>() {
                private Boolean flag = true;
    
                @Override
                public void run(SourceContext<Event> ctx) throws Exception {
                    Random random = new Random();
                    String[] uid = new String[]{"小", "中", "大"};
                    String[] url = new String[]{"./click", "./show", "./slide"};
                    while (flag) {
                        ctx.collect(new Event(uid[random.nextInt(uid.length)], url[random.nextInt(url.length)], System.currentTimeMillis()));
                        Thread.sleep(1000L);
                    }
                }
    
                @Override
                public void cancel() {
                    flag = false;
                }
            });
            addSource.print();
            // 乱序数据
            SingleOutputStreamOperator<Event> outputStreamOperator = addSource.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                @Override
                public long extractTimestamp(Event element, long recordTimestamp) {
                    return element.timestamp;
                }
            }));
    
            // map 和 keyby
            outputStreamOperator.map(data -> Tuple2.of(data.user,1L)).returns(Types.TUPLE(Types.STRING,Types.LONG))
                    .keyBy(data -> true)
                    .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                    .process(new ProcessWindowFunction<Tuple2<String, Long>, String, Boolean, TimeWindow>() {
                        @Override
                        public void process(Boolean aBoolean, ProcessWindowFunction<Tuple2<String, Long>, String, Boolean, TimeWindow>.Context context, Iterable<Tuple2<String, Long>> elements, Collector<String> out) throws Exception {
                            HashSet<String> uidSst = new HashSet<>();
                            for (Tuple2<String, Long> element : elements) {
                                 uidSst.add(element.f0);
                            }
                            long start = context.window().getStart();
                            long end = context.window().getEnd();
                            out.collect("窗口   " + uidSst.size() + "  " +new Timestamp(start) + "   " +new Timestamp(end));
    
                        }
                    }).print();
    
            env.execute();
        }
    }
    

    设置延迟时间输出顺序的影响

    1. 两种窗口函数的结合
    窗口其他API
    延迟数据的处理

    相关文章

      网友评论

        本文标题:2022-05-22-Flink-46(五)

        本文链接:https://www.haomeiwen.com/subject/vycburtx.html