Operators

作者: 小C菜鸟 | 来源:发表于2018-01-20 19:39 被阅读5次

    原文链接

    操作符将一个或多个DataStream转换成为新的DataStream。程序可以将多个转换组合成复杂的数据流拓步。

    本节给出了基本转换的描述,以及应用这些转换后的有效物理分区和Flink的操作符链接的见解。

    DataStream转换操作

    转换操作 描述
    Map
    DataStream → DataStream
    输入一个元素并返回一个元素。一个将输入流的value乘以2的map函数:
    <pre> DataStream<Integer> dataStream = //...
    dataStream.map(new MapFunction<Integer, Integer>() {
    @Override
    public Integer map(Integer value) throws Exception {
    return 2 * value;
    }
    }); </pre>
    FlatMap
    DataStream → DataStream
    输入一个元素,返回0,1或多个元素。一个将句子拆分成单词的flatmap函数:
    <pre> <code> dataStream.flatMap(new FlatMapFunction<String, String>() {
    @Override
    public void flatMap(String value, Collector<String> out) throws Exception {
    for(String word: value.split(" ")){
    out.collect(word);
    }
    }
    });</code> </pre>
    Filter
    DataStream → DataStream
    计算每个元素的布尔函数,并保留函数返回true的值,一个过滤掉0值的filter:
    <pre> dataStream.filter(new FilterFunction<Integer>() {
    @Override
    public boolean filter(Integer value) throws Exception {
    return value != 0;
    }
    }); <code></code> </pre>
    KeyBy
    DataStream → KeyedStream
    逻辑上将一条流划分为不同的区,每个区包含相同键的元素。在内部,这是通过hash分区实现的。如何指定键请看keys。这个转换操作返回一个KeyedStream。
    <pre> <code>dataStream.keyBy("someKey") // Key by field "someKey"
    dataStream.keyBy(0) // Key by the first element of a Tuple</code></pre>
    注意:下述类型不能成为key:
    1. 它是一个POJO类,但是没有重写hashCode()方法而是依赖于Object.hashCode()的实现。
    2. 它是任意类型的数组。
    Reduce
    KeyedStream → DataStream
    keyed数据流上的“滚动”reduce。将当前元素与最后一个reduce值组合并发出新的值。
    一个创建流的部分和的reduce函数:
    <pre> <code> keyedStream.reduce(new ReduceFunction<Integer>() {
    @Override
    public Integer reduce(Integer value1, Integer value2) throws Exception {
    return value1 + value2;
    }
    }); </code></pre>
    Fold
    KeyedStream → DataStream
    具有初始值的keyed数据流上的“滚动”fold。将当前元素与最后一个fold值组合并发出新的值。
    当应用于序列(1,2,3,4,5)上时,fold函数会发出"start-1", "start-1-2", "start-1-2-3", ...
    <pre> <code> DataStream<String> result = keyedStream.fold("start", new FoldFunction<Integer, String>() {
    @Override
    public String fold(String current, Integer value) {
    return current + "-" + value;
    }
    }); </code></pre>
    Aggregations
    KeyedStream → DataStream
    keyed数据流上的滚动聚合。min和minBy的区别在于min返回最小值,minBy返回这个属性上具有最小值的元素(max和maxBy相同).
    <pre> <code> keyedStream.sum(0);
    keyedStream.sum("key");
    keyedStream.min(0);
    keyedStream.min("key");
    keyedStream.max(0);
    keyedStream.max("key");
    keyedStream.minBy(0);
    keyedStream.minBy("key");
    keyedStream.maxBy(0);
    keyedStream.maxBy("key"); </code></pre>
    Window
    KeyedStream → WindowedStream
    可以在已经分区的KeyedStream上定义窗口。窗口根据键的某些特性(例如,在最后5秒到达的数据)分组数据。关于窗口的完整描述请见Windows
    <pre> <code> dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data </code></pre>
    WindowAll
    DataStream → AllWindowedStream
    可以在常规的DataStream上定义窗口。窗口根据一些特性(例如,在最后5秒到达的数据)分组所有流的事件。关于窗口的完整描述请见Windows
    警告: 这在很多情况下不是一个并行的转换。windowAll操作符会把所有的记录收集到一个任务中。
    <pre> <code> dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data</code></pre>
    Window Apply
    WindowedStream → DataStream
    AllWindowedStream → DataStream
    将一个通用函数应用于窗口。下面是一个手动计算窗口元素的函数。
    注意: 如果使用windowAll转换,需要使用AllWindowFunction代替。
    <pre> <code> windowedStream.apply (new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, Window>() {
    public void apply (Tuple tuple, Window window, Iterable<Tuple2<String, Integer>> values, Collector<Integer> out) throws Exception {
    int sum = 0;
    for (value t: values) {
    sum += t.f1;
    }
    out.collect (new Integer(sum));
    }
    });
    // applying an AllWindowFunction on non-keyed window stream
    allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() {
    public void apply (Window window, Iterable<Tuple2<String, Integer>> values, Collector<Integer> out) throws Exception {
    int sum = 0;
    for (value t: values) {
    sum += t.f1;
    }
    out.collect (new Integer(sum));
    }
    }); </code></pre>
    Window Reduce
    WindowedStream → DataStream
    将reduce函数应用到窗口上,并返回reduce的值。
    <pre> <code> windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>>() {
    public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
    return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1);
    }
    }); </code></pre>
    Window Fold
    WindowedStream → DataStream
    将fold函数应用到窗口上,并返回fold值。当应用到列表(1,2,3,4,5)上时,示例函数fold列表成字符串"start-1-2-3-4-5":
    <pre> <code> windowedStream.fold("start", new FoldFunction<Integer, String>() {
    public String fold(String current, Integer value) {
    return current + "-" + value;
    }
    }); </code></pre>
    Aggregations on windows
    WindowedStream → DataStream
    聚合窗口的内容。min和minBy的区别在于min返回最小值,minBy返回这个属性上具有最小值的元素(max和maxBy相同).
    <pre> <code> windowedStream.sum(0);
    windowedStream.sum("key");
    windowedStream.min(0);
    windowedStream.min("key");
    windowedStream.max(0);
    windowedStream.max("key");
    windowedStream.minBy(0);
    windowedStream.minBy("key");
    windowedStream.maxBy(0);
    windowedStream.maxBy("key"); </code></pre>
    Union
    DataStream* → DataStream
    合并两个或多个数据流,然后生成一个新的包含所有流的所有元素的流。注意:如果将数据流与自己合并,在新的流中会得到每个元素两次。
    dataStream.union(otherStream1, otherStream2, ...);
    Window Join
    DataStream,DataStream → DataStream
    在公共窗口和给定键上连接两个数据流。
    <pre> <code> dataStream.join(otherStream)
    .where(<key selector>).equalTo(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply (new JoinFunction () {...}); </code></pre>
    Window CoGroup
    DataStream,DataStream → DataStream
    在公共窗口和给定键上对两个数据流进行分组。
    <pre> <code> dataStream.coGroup(otherStream)
    .where(0).equalTo(1)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply (new CoGroupFunction () {...}); </code></pre>
    Connect
    DataStream,DataStream → ConnectedStreams
    “连接”两个数据流保留它们的类型。连接允许两个流之间的共享状态。
    <pre> <code> DataStream<Integer> someStream = //...
    DataStream<String> otherStream = //...
    ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);</code></pre>
    CoMap, CoFlatMap
    ConnectedStreams → DataStream
    类似于连接数据流上的map和flatMap。
    <pre> <code> connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
    @Override
    public Boolean map1(Integer value) {
    return true;
    }
    @Override
    public Boolean map2(String value) {
    return false;
    }
    });
    connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {
    @Override
    public void flatMap1(Integer value, Collector<String> out) {
    out.collect(value.toString());
    }
    @Override
    public void flatMap2(String value, Collector<String> out) {
    for (String word: value.split(" ")) {
    out.collect(word);
    }
    }
    });</code></pre>
    Split
    DataStream → SplitStream
    根据某些标准将流分成两个或多个流。
    <pre> <code> SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() {
    @Override
    public Iterable<String> select(Integer value) {
    List<String> output = new ArrayList<String>();
    if (value % 2 == 0) {
    output.add("even");
    }
    else {
    output.add("odd");
    }
    return output;
    }
    }); </code></pre>
    Select
    SplitStream → DataStream
    从拆分流中选择一个或多个流。
    <pre> <code> SplitStream<Integer> split;
    DataStream<Integer> even = split.select("even");
    DataStream<Integer> odd = split.select("odd");
    DataStream<Integer> all = split.select("even","odd"); </code></pre>
    Iterate
    DataStream → IterativeStream → DataStream
    通过重定向操作符的输出到前一个操作符,创建一个流上的“反馈”循环。这对于连续更新模型的算法特别有用。下面的代码以一个数据流开始并持续的应用迭代体。大于0的元素被发送回反馈通道,其余元素被发送到下游。完整的描述请参见iterations
    <pre> <code> IterativeStream<Long> iteration = initialStream.iterate();
    DataStream<Long> iterationBody = iteration.map (/do something/);
    DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){
    @Override
    public boolean filter(Integer value) throws Exception {
    return value > 0;
    }
    });
    iteration.closeWith(feedback);
    DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){
    @Override
    public boolean filter(Integer value) throws Exception {
    return value <= 0;
    }
    }); </code></pre>
    Extract Timestamps
    DataStream → DataStream
    从记录中提取时间戳,以便于使用事件时间语义的窗口工作。见Event Time
    stream.assignTimestamps (new TimeStampExtractor() {...});

    可以对Tuple数据流有以下转换:

    转换操作 描述
    Project
    DataStream → DataStream
    选取元组属性的子集。
    <pre> <code> DataStream<Tuple3<Integer, Double, String>> in = // [...]
    DataStream<Tuple2<String, Integer>> out = in.project(2,0); </code></pre>

    物理分区

    Flink通过下述函数,在转换后的流上提供低层次的精确流分区控制(如果需要的话)。

    转换操作 描述
    Custom partitioning
    DataStream → DataStream
    使用用户定义的分区器来为每个元素选择目标任务。
    <pre> <code> dataStream.partitionCustom(partitioner, "someKey");
    dataStream.partitionCustom(partitioner, 0); </code></pre>
    Random partitioning
    DataStream → DataStream
    根据均匀分布随机划分元素。
    <pre> <code> dataStream.shuffle(); </code></pre>
    Rebalancing (Round-robin partitioning)
    DataStream → DataStream
    分区元素循环,为每个分区创建相同的负载。在数据倾斜的情况下对性能优化有用。
    <pre> <code> dataStream.rebalance(); </code></pre>
    Rescaling
    DataStream → DataStream
    Partitions elements, round-robin, to a subset of downstream operations. This is useful if you want to have pipelines where you, for example, fan out from each parallel instance of a source to a subset of several mappers to distribute load but don't want the full rebalance that rebalance() would incur. This would require only local data transfers instead of transferring data over network, depending on other configuration values such as the number of slots of TaskManagers.

    The subset of downstream operations to which the upstream operation sends elements depends on the degree of parallelism of both the upstream and downstream operation. For example, if the upstream operation has parallelism 2 and the downstream operation has parallelism 6, then one upstream operation would distribute elements to three downstream operations while the other upstream operation would distribute to the other three downstream operations. If, on the other hand, the downstream operation has parallelism 2 while the upstream operation has parallelism 6 then three upstream operations would distribute to one downstream operation while the other three upstream operations would distribute to the other downstream operation.

    In cases where the different parallelisms are not multiples of each other one or several downstream operations will have a differing number of inputs from upstream operations.

    Please see this figure for a visualization of the connection pattern in the above example:
    Apache_Flink_1_4_Documentation__Operators.png
    <pre> <code> dataStream.rescale(); </code></pre>
    Broadcasting
    DataStream → DataStream
    向每个分区广播元素。 <pre> <code> dataStream.broadcast(); </code></pre>

    任务链接和资源组

    链接两个子转换意味着它们在同一个线程中以获得更好的性能。Flink默认如果可能会链接操作符(例如,两个map转换)。如果需要,API对链接提供细粒度的控制。
    如果想要在整个作业中禁止链接,使用StreamExecutionEnvironment.disableOperatorChaining()方法。对于更细粒度的控制,可以使用下述函数。注意这些函数只能在DataStream转换之后使用,因为它们引用了前面的转换。例如,你可以使用someStream.map(…). startnewchain(),但你不能使用someStream.startNewChain()。
    在Flink中,资源组是一个槽,见slots。如果需要,您可以在单独的槽中手动隔离操作符。

    转换操作 描述
    Start new chain 从这个操作符开始一个新的链。两个map将被链接,并且filter不会被链接到第一个map。 <pre> <code> someStream.filter(...).map(...).startNewChain().map(...); </code></pre>
    Disable chaining 不要链接map操作符. <pre> <code>someStream.map(...).disableChaining();
    </code></pre>
    Set slot sharing group Set the slot sharing group of an operation. Flink will put operations with the same slot sharing group into the same slot while keeping operations that don't have the slot sharing group in other slots. This can be used to isolate slots. The slot sharing group is inherited from input operations if all input operations are in the same slot sharing group. The name of the default slot sharing group is "default", operations can explicitly be put into this group by calling slotSharingGroup("default"). <pre> <code> someStream.filter(...).slotSharingGroup("name");</code></pre>

    相关文章

      网友评论

        本文标题:Operators

        本文链接:https://www.haomeiwen.com/subject/vfotaxtx.html