美文网首页flink大数据
Flink DataStream 算子 Map、FlatMap、

Flink DataStream 算子 Map、FlatMap、

作者: tracy_668 | 来源:发表于2021-01-08 09:31 被阅读0次

[TOC]
数据转换将数据流从一种形式转换为另一种形式,也就是说输入可以是一个或多个数据流,输出也可以是零,或一个或多个数据流。Flink1.7对transform另起一个新的名字“Operators ”--Operators transform 。程序可以将多个transform组合成复杂的数据流拓扑。

1.Map

Map [DataStream->DataStream]

Map: 一对一转换,即一条转换成另一条。

输入一个元素并生成一个元素。 一个map函数,它将输入流的值加倍:
dataStream.map { x => x * 2 }

package com.bigdata.flink.dataStreamMapOperator;

import com.bigdata.flink.beans.UserAction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Arrays;

/**
 * Summary:
 *      Map: 一对一转换
 */
public class DataStreamMapOperator {
    public static void main(String[] args) throws Exception{

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 输入: 用户行为。某个用户在某个时刻点击或浏览了某个商品,以及商品的价格。
        DataStreamSource<UserAction> source = env.fromCollection(Arrays.asList(
                new UserAction("userID1", 1293984000, "click", "productID1", 10),
                new UserAction("userID2", 1293984001, "browse", "productID2", 8),
                new UserAction("userID1", 1293984002, "click", "productID1", 10)
        ));

        // 转换: 商品的价格乘以8
        SingleOutputStreamOperator<UserAction> result = source.map(new MapFunction<UserAction, UserAction>() {
            @Override
            public UserAction map(UserAction value) throws Exception {

                int newPrice = value.getProductPrice() * 8;
                return new UserAction(value.getUserID(), value.getEventTime(), value.getEventType(), value.getProductID(), newPrice);
            }
        });

        // 输出: 输出到控制台
        // UserAction(userID=userID1, eventTime=1293984002, eventType=click, productID=productID1, productPrice=80)
        // UserAction(userID=userID1, eventTime=1293984000, eventType=click, productID=productID1, productPrice=80)
        // UserAction(userID=userID2, eventTime=1293984001, eventType=browse, productID=productID2, productPrice=64)
        result.print();

        env.execute();
    }
}

2.FlatMap

转换:DataStream → DataStream

FlatMap [DataStream->DataStream]
FlatMap: 一行变零到多行。如下,将一个句子(一行)分割成多个单词(多行)。

输入一个元素并生成零个,一个或多个元素。 将句子分割为单词的flatmap函数:

dataStream.flatMap { str => str.split(" ") }

package com.bigdata.flink.dataStreamFlatMapOperator;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * Summary:
 *      FlatMap: 一行变任意行(0~多行)
 */
public class DataStreamFlatMapOperator {
  public static void main(String[] args) throws Exception{
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

      // 输入: 英文电影台词
      DataStreamSource<String> source = env
              .fromElements(
                      "You jump I jump",
                      "Life was like a box of chocolates"
              );

      // 转换: 将包含chocolates的句子转换为每行一个单词
      SingleOutputStreamOperator<String> result = source.flatMap(new FlatMapFunction<String, String>() {
          @Override
          public void flatMap(String value, Collector<String> out) throws Exception {
              if(value.contains("chocolates")){
                  String[] words = value.split(" ");
                  for (String word : words) {
                      out.collect(word);
                  }
              }
          }
      });

      // 输出: 输出到控制台
      // Life
      // was
      // like
      // a
      // box
      // of
      // chocolates
      result.print();

      env.execute();
  }
}

Filter [DataStream->DataStream]

DataStream → DataStream
计算每个元素的布尔函数,并保留函数返回true的元素。 过滤掉零值的过滤器,通俗来讲就是过滤掉等于0的元素,转换成新的数据流

dataStream.filter { _ != 0 }

4.KeyBy

转换:DataStream → KeyedStream
KeyBy [DataStream->KeyedStream]
KeyBy: 按指定的Key对数据重分区。将同一Key的数据放到同一个分区。

逻辑分区流分为不同的分区。 具有相同key的所有记录都分配给同一分区。 在内部,keyBy()是使用hash分区实现的。 指定key有不同的方法。此Transformations返回KeyedStream,

注意:

  • 分区结果和KeyBy下游算子的并行度强相关。如下游算子只有一个并行度,不管怎么分,都会分到一起。
  • 对于POJO类型,KeyBy可以通过keyBy(fieldName)指定字段进行分区。
  • 对于Tuple类型,KeyBy可以通过keyBy(fieldPosition)指定字段进行分区。
  • 对于一般类型,如上, KeyBy可以通过keyBy(new KeySelector {...})指定字段进行分区。

Reduce [KeyedStream->DataStream]

Reduce: 基于ReduceFunction进行滚动聚合,并向下游算子输出每次滚动聚合后的结果。
注意: Reduce会输出每一次滚动聚合的结果。

相关文章

网友评论

    本文标题:Flink DataStream 算子 Map、FlatMap、

    本文链接:https://www.haomeiwen.com/subject/mlugoktx.html