美文网首页数客联盟
Flink Operators 实战高级篇

Flink Operators 实战高级篇

作者: Woople | 来源:发表于2019-11-12 10:14 被阅读0次

    返回

    本文将介绍Window Join,Window CoGroup和Window Interval Join的基本用法。

    DataStream Transformations Window

    window算子在flink中是非常重要的,要理解window算子首先要明白window的相关机制和原理。本文将从实战的角度讲解api的使用,详细的原理机制建议先阅读官方文档Windows。下面以Tumbling Windows为例讲解一些常见用法。下面基于ProcessingTime的样例都适用于EventTime。

    基于ProcessingTime的基本用法

    Window Join

    定义
    Transformation Description
    DataStream,DataStream → DataStream Join two data streams on a given key and a common window.
    说明

    将两个window的数据进行join

    样例
    代码
    public class WindowJoinDemo {
        public static void main(String[] args) throws Exception {
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
            env.setParallelism(1);
            DataStream<Tuple2<String, Integer>> orangeStream = env.addSource(new DataSource());
            DataStream<Tuple2<String, Integer>> greenStream = env.addSource(new DataSource());
            DataStream<Tuple3<String, Integer, Integer>> joinedStream = runWindowJoin(orangeStream, greenStream, 5);
            joinedStream.print("join");
            env.execute("Windowed Join Demo");
        }
    
        public static DataStream<Tuple3<String, Integer, Integer>> runWindowJoin(
                DataStream<Tuple2<String, Integer>> grades,
                DataStream<Tuple2<String, Integer>> salaries,
                long windowSize) {
    
            return grades.join(salaries)
                    .where(new NameKeySelector())
                    .equalTo(new NameKeySelector())
                    .window(TumblingProcessingTimeWindows.of(Time.seconds(windowSize)))
                    .apply(new JoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple3<String, Integer, Integer>>() {
    
                        @Override
                        public Tuple3<String, Integer, Integer> join(
                                Tuple2<String, Integer> first,
                                Tuple2<String, Integer> second) {
                            return new Tuple3<String, Integer, Integer>(first.f0, first.f1, second.f1);
                        }
                    });
        }
    
        private static class NameKeySelector implements KeySelector<Tuple2<String, Integer>, String> {
            @Override
            public String getKey(Tuple2<String, Integer> value) {
                return value.f0;
            }
        }
    
        private static class DataSource extends RichParallelSourceFunction<Tuple2<String, Integer>> {
            private volatile boolean running = true;
    
            @Override
            public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
                int bound = 50;
                String[] keys = new String[]{"foo", "bar", "baz"};
    
                final long numElements = RandomUtils.nextLong(10, 20);
                int i = 0;
                while (running && i < numElements) {
                    Thread.sleep(RandomUtils.nextLong(1, 5) * 1000L);
                    Tuple2 data = new Tuple2<>(keys[RandomUtils.nextInt(0, 3)], RandomUtils.nextInt(0, bound));
                    ctx.collect(data);
                    System.out.println(Thread.currentThread().getId() + "-sand data:" + data);
                    i++;
                }
            }
    
            @Override
            public void cancel() {
                running = false;
            }
        }
    }
    
    输出结果
    59-sand data:(bar,49)
    58-sand data:(bar,44)
    58-sand data:(foo,2)
    59-sand data:(baz,34)
    58-sand data:(baz,2)
    59-sand data:(baz,29)
    join> (baz,34,2)
    join> (baz,29,2)
    
    说明

    两条流里面的数据类型都是Tuple2,随机生成一些数据,窗口大小设置为5秒,根据两个流数据中的key进行join

    扫描下方二维码进入语雀查看完整文章

    钉钉、微信扫码

    相关文章

      网友评论

        本文标题:Flink Operators 实战高级篇

        本文链接:https://www.haomeiwen.com/subject/agcfictx.html