美文网首页flink入门大数据Flink
flink学习之八-keyby&reduce

flink学习之八-keyby&reduce

作者: AlanKim | 来源:发表于2019-03-17 08:12 被阅读250次

    上文学习了简单的map、flatmap、filter,在这里开始继续看keyBy及reduce

    keyBy

    先看定义,通过keyBy,DataStream→KeyedStream。

    逻辑上将流分区为不相交的分区。具有相同Keys的所有记录都分配给同一分区。在内部,keyBy()是使用散列分区实现的。指定键有不同的方法。

    此转换返回KeyedStream,其中包括使用被Keys化状态所需的KeyedStream

    dataStream.keyBy("someKey") // Key by field "someKey"
    dataStream.keyBy(0) // Key by the first element of a Tuple(数组)
        
    

    注意 如果出现以下情况,则类型不能成为关键

    1. 它是POJO类型但不覆盖hashCode()方法并依赖于Object.hashCode()实现。
    2. 它是任何类型的数组。

    看段代码:

    public class KeyByTestJob {
    
        public static void main(String[] args) throws Exception {
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
            env.fromElements(Tuple2.of(2L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(2L, 4L), Tuple2.of(1L, 2L))
                    .keyBy(0) // 以数组的第一个元素作为key
                    .map((MapFunction<Tuple2<Long, Long>, String>) longLongTuple2 -> "key:" + longLongTuple2.f0 + ",value:" + longLongTuple2.f1)
                    .print();
    
            env.execute("execute");
        }
    }
    

    运行后,结果如下:

    3> key:1,value:5
    3> key:1,value:7
    3> key:1,value:2
    4> key:2,value:3
    4> key:2,value:4
    

    可以看到,前面的 3> 和 4> 输出 本身是个分组,而且顺序是从先输出key=1的tuple数组,再输出key=2的数组。

    也就是说,keyby类似于sql中的group by,将数据进行了分组。后面基于keyedSteam的操作,都是组内操作。

    断点看了下keyedStream的结构:

    keyedStream.png

    可以看到,包含了keyType、keySelector,以及转换后的PartitionTransformation,也就是已经做了分区了。后续的所有操作都是按照分区内数据来处理的。

    reduce

    reduce表示将数据合并成一个新的数据,返回单个的结果值,并且 reduce 操作每处理一个元素总是创建一个新值。而且reduce方法不能直接应用于SingleOutputStreamOperator对象,也好理解,因为这个对象是个无限的流,对无限的数据做合并,没有任何意义哈!

    所以reduce需要针对分组或者一个window(窗口)来执行,也就是分别对应于keyBy、window/timeWindow 处理后的数据,根据ReduceFunction将元素与上一个reduce后的结果合并,产出合并之后的结果。

    在上面代码的基础上修改:

    public class KeyByTestJob {
    
        public static void main(String[] args) throws Exception {
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
            env.fromElements(Tuple2.of(2L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(2L, 4L), Tuple2.of(1L, 2L))
                    .keyBy(0) // 以数组的第一个元素作为key
                    .reduce((ReduceFunction<Tuple2<Long, Long>>) (t2, t1) -> new Tuple2<>(t1.f0, t2.f1 + t1.f1)) // value做累加
                    .print();
    
            env.execute("execute");
        }
    }
    
    3> (1,5)
    3> (1,12)
    3> (1,14)
    4> (2,3)
    4> (2,7)
    

    可以看到,分组后,每次有一个数组进来,都会产生新的数据,依然是按照分组来输出的。

    如果改下reduce中的实现:

    ReduceFunction<Tuple2<Long, Long>>) (t2, t1) -> new Tuple2<>(t1.f0 + t2.f0, t2.f1 + t1.f1)
    

    那么输出就是:

    2019-01-22 12:04:56.083 [Keyed Reduce -> Sink: Print to Std. Out (2/4)] INFO  org.apache.flink.runtime.taskmanager.Task - Keyed Reduce -> Sink: Print to Std. Out (2/4) (7117b0831e59cae2201e6f7097356214) switched from RUNNING to FINISHED.
    2019-01-22 12:04:56.083 [Keyed Reduce -> Sink: Print to Std. Out (2/4)] INFO  org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Keyed Reduce -> Sink: Print to Std. Out (2/4) (7117b0831e59cae2201e6f7097356214).
    2019-01-22 12:04:56.083 [Keyed Reduce -> Sink: Print to Std. Out (2/4)] INFO  org.apache.flink.runtime.taskmanager.Task - Ensuring all FileSystem streams are closed for task Keyed Reduce -> Sink: Print to Std. Out (2/4) (7117b0831e59cae2201e6f7097356214) [FINISHED]
    4> (2,3)
    4> (4,7)
    
    ...
    
    2019-01-22 12:04:56.118 [flink-akka.actor.default-dispatcher-4] INFO  o.a.flink.runtime.executiongraph.ExecutionGraph - Keyed Reduce -> Sink: Print to Std. Out (2/4) (7117b0831e59cae2201e6f7097356214) switched from RUNNING to FINISHED.
    2019-01-22 12:04:56.122 [flink-akka.actor.default-dispatcher-4] INFO  o.a.flink.runtime.executiongraph.ExecutionGraph - Keyed Reduce -> Sink: Print to Std. Out (1/4) (0fdc49eb18050efa3acec361978f3e93) switched from RUNNING to FINISHED.
    2019-01-22 12:04:56.125 [flink-akka.actor.default-dispatcher-4] INFO  o.a.flink.runtime.executiongraph.ExecutionGraph - Keyed Reduce -> Sink: Print to Std. Out (4/4) (1607b502ab2791f2f567c61da214bd82) switched from RUNNING to FINISHED.
    3> (1,5)
    3> (2,12)
    3> (3,14)
    

    可以看到输出结果,一方面是是key-reduce的状态,从RUNNING迁移到FINISHED;另一方面是按组输出了最终的reduce值。

    聚合

    KeyedStream→DataStream

    在被Keys化数据流上滚动聚合。min和minBy之间的差异是min返回最小值,而minBy返回该字段中具有最小值的数据元(max和maxBy类似)。

    ---TODO 这里存疑,因为返回的数据始终是数据源,难道是我写错了什么?SingleOutputStreamOperator<Tuple2>改成SingleOutputStreamOperator<Long> 也是一样的结果,等待后续继续验证。

    keyedStream.sum(0);
    keyedStream.sum("key");
    keyedStream.min(0);
    keyedStream.min("key");
    keyedStream.max(0);
    keyedStream.max("key");
    keyedStream.minBy(0);
    keyedStream.minBy("key");
    keyedStream.maxBy(0);
    keyedStream.maxBy("key");
    

    继续在上面代码的基础上做实验:

    sum
    public class KeyByTestJob {
    
        public static void main(String[] args) throws Exception {
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
            KeyedStream keyedStream =  env.fromElements(Tuple2.of(2L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(2L, 4L), Tuple2.of(1L, 2L))
                    .keyBy(0) // 以数组的第一个元素作为key
                    ;
    
            SingleOutputStreamOperator<Tuple2> sumStream = keyedStream.sum(0);
            sumStream.addSink(new PrintSinkFunction<>());
    
            env.execute("execute");
        }
    

    对第一个元素(位置0)做sum,结果如下:

    3> (1,5)
    3> (2,5)
    3> (3,5)
    ...
    4> (2,3)
    2019-01-22 21:27:07.401 [flink-akka.actor.default-dispatcher-3] INFO  o.a.flink.runtime.executiongraph.ExecutionGraph - Source: Collection Source (1/1) (f3368fedb9805b1e59f4443252a2fb2b) switched from RUNNING to FINISHED.
    4> (4,3)
    

    可以看到,对第一个数据(也就是key)做了累加,然后value以第一个进来的数据为准。

    如过改成keyedStream.sum(1); 也就是针对第二个元素求和,得到的结果如下:

    4> (2,3)
    4> (2,7)
    ...
    3> (1,5)
    3> (1,12)
    2019-01-23 10:50:47.498 [flink-akka.actor.default-dispatcher-5] INFO  o.a.flink.runtime.executiongraph.ExecutionGraph - Source: Collection Source (1/1) (df09751c6722a5942b058a1300ae9fb3) switched from RUNNING to FINISHED.
    3> (1,14)
    
    min
    SingleOutputStreamOperator<Tuple2> sumStream = keyedStream.min(1);
    

    得到的输出结果是:

    3> (1,5)  -- 第一组 第一个数据到的结果
    3> (1,5)  -- 第一组 第二个数据到的结果
    4> (2,3)  -- 第二组 第一个数据到的结果
    4> (2,3)  -- 第二组 第二个数据到的结果
    3> (1,2)  -- 第一组 第三个数据到的结果
    

    这里顺序有点乱,不过没问题,数据按照顺序一个一个的过来,然后计算当前数据过来时有最小value的数据。

    minBy
    SingleOutputStreamOperator<Tuple2> sumStream = keyedStream.minBy(1);
    
    3> (1,5)
    3> (1,5)
    4> (2,3)
    3> (1,2)
    4> (2,3)
    

    类似的,只是组间打印的顺序有区别而已。

    max
    SingleOutputStreamOperator<Tuple2> sumStream = keyedStream.max(1);
    
    3> (1,5)
    4> (2,3)
    3> (1,7)
    4> (2,4)
    3> (1,7)
    
    

    按照顺序,取最大的数据

    maxBy
    SingleOutputStreamOperator<Tuple2> sumStream = keyedStream.maxBy(1);
    
    
    3> (1,5)
    4> (2,3)
    3> (1,7)
    4> (2,4)
    3> (1,7)
    
    

    有一点要牢记,数据是一直流过来的,这些聚合方法都是在每次收到新的数据之后,重新计算/比较得出来的结果,而不是只有一个最终结果。

    相关文章

      网友评论

        本文标题:flink学习之八-keyby&reduce

        本文链接:https://www.haomeiwen.com/subject/jttouqtx.html