写给大忙人的Flink的Data Types

作者: shengjk1 | 来源:发表于2020-03-16 18:33 被阅读0次

    一.Flink 中 Data Type 组成

    • 基本数据类型:java 的 8 中基本数据类型加上它们各自的包装类型,在加上 void , String, Date,BigDecimal, BigInteger.

    • 基本数据类型的数据和 Object 类型的数组

    • 复合类型
      1.Flink Java Tuples
      2. scala case classes
      3. Row
      4. POJOs: 如果要被 Flink 识别的也允许按 name 引用的话,需要复符合一定的规则(否则的话,会被当做泛型处理)
      1). 这个类是 pulic 的并且没有非静态的内部类。
      2). 得有一个没有参数的 pulic 构造器
      3).所有非静态的非 transient 的属性(包括所有的父类)都必须是 pulic 或者符合 java beans 命名规范的 getter setter 方法。

    • 辅助类型 (集合类、Option、Either 等)

    • 泛型:不会被 Flink 自带的序列化器序列化,而被是 Kryo

    二、Flink 是如何处理 Data Type 的
    首先Flink会根据自身的序列化器进行序列化,如果不行,则默认回退到 Kryo 序列化器进行序列化。

    可能碰到的问题,如下:

    • Registering subtypes
      如果方法签名是父类,而返回或者使用的是子类,也就是所谓的协变返回类型关于协变返回类型。让 Flink 知道所有的子类可以在一定的程度上提高性能。
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.registerType(KuduTableDesc.class);
    
    • Registering custom serializers
      虽然 Flink 自己序列化不了的会给 Kryo,但是 Kryo 也不能很好的处理掉所有的类型,这个时候就要自定义序列化器了。
    final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
    env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class, MyCustomSerializer.class);
    
    • Adding Type Hints
      Flink 可能无法推断出泛型的类型,仅仅在 Java Api 中时必要的。
    DataSet<SomeType> result = dataSet
        .map(new MyGenericNonInferrableFunction<Long, SomeType>())
            .returns(SomeType.class);
            
    DataSet<SomeType> result = dataSet
        .map(new MyGenericNonInferrableFunction<Long, SomeType>())
            .returns(new TypeHint<SomeType.class});
    
    • Manually creating a TypeInformation
      Flink 可能无法推断出泛型的类型时
    TypeInformation<String> info = TypeInformation.of(String.class);
    
    TypeInformation<Tuple2<String, Double>> info = TypeInformation.of(new TypeHint<Tuple2<String, Double>>(){});
    

    三、常见的 returns 的使用

    .returns(Types.TUPLE(Types.INT,Types.INT))
    .returns(Types.STRING)
    .returns(TypeInformation.of(String.class))
    .returns(new TypeHint<Tuple2<String, String>>(){})
    .returns(TypeInformation.of(new TypeHint<Tuple2<ConsumerRecord, String>>() {}))
    .returns(SomeType.class)
    

    相关文章

      网友评论

        本文标题:写给大忙人的Flink的Data Types

        本文链接:https://www.haomeiwen.com/subject/lwtoehtx.html