美文网首页Flink学习教程
flink学习笔记-支持的数据类型

flink学习笔记-支持的数据类型

作者: 大数据研习社 | 来源:发表于2019-03-27 10:08 被阅读0次

说明:本文为《Flink大数据项目实战》学习笔记,想通过视频系统学习Flink这个最火爆的大数据计算框架的同学,推荐学习课程:

Flink大数据项目实战:http://t.cn/EJtKhaz

4.支持的数据类型

Flink对DataSet和DataStream中可使用的元素类型添加了一些约束。原因是系统可以通过分析这些类型来确定有效的执行策略和选择不同的序列化方式。

有7中不同的数据类型:

1.Java Tuple和 Scala Case类;

2.Java POJO;

3.基本类型;

4.通用类;

5.值;

6.Hadoop Writables;

7.特殊类型

4.1Java Tuple

Tuple是包含固定数量各种类型字段的复合类。Flink Java API提供了Tuple1-Tuple25。Tuple的字段可以是Flink的任意类型,甚至嵌套Tuple。

访问Tuple属性的方式有以下两种:

1.属性名(f0,f1…fn)

2.getField(int pos)

4.2Scala Case类

Scala的Case类(以及Scala的Tuple,实际是Case class的特殊类型)是包含了一定数量多种类型字段的组合类型。Tuple字段通过他们的1-offset名称定位,例如 _1代表第一个字段。Case class 通过字段名称获得:

case class WordCount(word: String, count: Int)

val input = env.fromElements(

    WordCount("hello", 1),

    WordCount("world", 2)) // Case Class Data Set

input.keyBy("word")// key by field expression "word"

val input2 = env.fromElements(("hello", 1), ("world", 2)) // Tuple2 Data Set

input2.keyBy(0, 1) // key by field positions 0 and 1

4.3POJOs

Java和Scala的类在满足下列条件时,将会被Flink视作特殊的POJO数据类型专门进行处理:

1.是公共类;

2.无参构造是公共的;

3.所有的属性都是可获得的(声明为公共的,或提供get,set方法);

4.字段的类型必须是Flink支持的。Flink会用Avro来序列化任意的对象。

Flink会分析POJO类型的结构获知POJO的字段。POJO类型要比一般类型好用。此外,Flink访问POJO要比一般类型更高效。

public class WordWithCount {

    public String word;

    public int count;

    public WordWithCount() {}

    public WordWithCount(String word, int count) { this.word = word; this.count = count; }

}

DataStream<WordWithCount> wordCounts = env.fromElements(

    new WordWithCount("hello", 1),

    new WordWithCount("world", 2));

wordCounts.keyBy("word");

4.4基本类型

Flink支持Java和Scala所有的基本数据类型,比如 Integer,String,和Double。

4.5一般通用类

Flink支持大多数的Java,Scala类(API和自定义)。包含不能序列化字段的类在增加一些限制后也可支持。遵循Java Bean规范的类一般都可以使用。

所有不能视为POJO的类Flink都会当做一般类处理。这些数据类型被视作黑箱,其内容是不可见的。通用类使用Kryo进行序列/反序列化。

4.6值类型Values

通过实现org.apache.flinktypes.Value接口的read和write方法提供自定义代码来进行序列化/反序列化,而不是使用通用的序列化框架。

Flink预定义的值类型与原生数据类型是一一对应的(例如:ByteValue, ShortValue, IntValue, LongValue, FloatValue, DoubleValue, StringValue, CharValue, BooleanValue)。这些值类型作为原生数据类型的可变变体,他们的值是可以改变的,允许程序重用对象从而缓解GC的压力。

4.7 Hadoop的Writable类

它实现org.apache.hadoop.Writable接口的类型,该类型的序列化逻辑在write()和readFields()方法中实现。

4.8特殊类型

Flink比较特殊的类型有以下两种:

1.Scala的 Either、Option和Try。

2.Java ApI有自己的Either实现。

4.9类型擦除和类型推理

注意:本小节内容仅针对Java

Java编译器在编译之后会丢弃很多泛型类型信息。这在Java中称为类型擦除。这意味着在运行时,对象的实例不再知道其泛型类型。

例如,在JVM中,DataStream<String>和DataStream<Long>的实例看起来是相同的。

List<String> l1 = new ArrayList<String>();

List<Integer> l2 = new ArrayList<Integer>();

System.out.println(l1.getClass() == l2.getClass());

泛型:一种较为准确的说法就是为了参数化类型,或者说可以将类型当作参数传递给一个类或者是方法。

Flink的Java API会试图去重建(可以做类型推理)这些被丢弃的类型信息,并将它们明确地存储在数据集以及操作中。你可以通过DataStream.getType()方法来获取类型,这个方法将返回一个TypeInformation的实例,这个实例是Flink内部表示类型的方式。

5.累加器和计数器

5.1累加器和计数器

计数器是最简单的累加器。

内置累加器主要包含以下几类:

1.IntCounter, LongCounter和 DoubleCounter

2.Histogram(柱状图)

5.2如何使用累加器

第一步:在自定义的转换操作里创建累加器对象:

private IntCounter numLines = new IntCounter();

第二步:注册累加器对象,通常是在rich function的open()方法中。这里你还需要定义累加器的名字getRuntimeContext().addAccumulator(“num-lines”, this.numLines);

第三步:在operator函数的任何地方使用累加器,包括在open()和close()方法中

this.numLines.add(1);

第四步:结果存储在JobExecutionResult里:

JobExecutionResult JobExecutionResult =env.execute("Flink Batch Java API Skeleton")

myJobExecutionResult.getAccumulatorResult("num-lines")

为了实现你自己的累加器,我们需要实现Accumulator接口,如果你想让你自定义的累加器需要被Flink所收录,请创建一个提交请求。可以选择实现Accumulator或者SimpleAccumulator。

1.Accumulator是最灵活的:它定义了需要进行累加的值的类型V以及最后结果的类型R,例如:对于一个histogram,v是数值类型的而R是一个histogram。

2.SimpleAccumulator则是在进行累计数据类型和返回的数据类型一致的情况下使用的,例如计数器。

(7)DataSream API

1.执行计划Graph

Flink通过Stream API (Batch API同理)开发的应用,底层有四层执行计划,我们首先来看Flink的四层执行计划如下图所示。

通过Stream API开发的Flink应用,底层首先转换为StreamGraph,然后再转换为JobGraph,接着转换为ExecutionGraph,最后生成“物理执行图”。

StreamGraph

1.根据用户代码生成最初的图

2.它通过类表示程序的拓扑结构

3.它是在client端生成

JobGraph

1.优化streamgraph

2.将多个符合条件的Node chain在一起

3.在client端生成,然后交给JobManager

ExecutionGraph

JobManger根据JobGraph 并行化生成ExecutionGraph

物理执行图

实际执行图,不可见

1.1 StreamGraph

StreamGraph

通过Stream API提交的文件,首先会被翻译成StreamGraph。StreamGraph的生成的逻辑是在StreamGraphGenerate类的generate方法。而这个generate的方法又会在StreamExecutionEnvironment.execute方法被调用。

1.env中存储 List<StreamTransformation<?> ,里面存储了各种算子操作。

2.StreamTransformation(是一个类)

a)它描述DataStream之间的转化关系 。

b)它包含了StreamOperator/UDF 。

c)它包含了很多子类,比如OneInputTransformation/TwoInputTransform/ SourceTransformation/ SinkTransformation/ SplitTransformation等。

3.StreamNode/StreamEdge

StreamNode(算子)/StreamEdge(算子与算子之间的联系)是通过StreamTransformation来构造。

1.2 StreamGraph转JobGraph

1.3 JobGraph

从StreamGraph到JobGraph转换过程中,内部角色也会进行转换

1.StreamNode->JobVertex:StreamNode转换为JobVertex

2.StreamEdge->JobEdge:StreamEdge转换为JobEdge

3.将符合条件的StreamNode chain成一个JobVertex(顶点)

a)没有禁用Chain

b)上下游算子并行度一致

c)下游算子的入度为1(也就是说下游节点没有来自其他节点的输入)

d)上下游算子在同一个slot group下游节点的 chain 策略为 ALWAYS(可以与上下游链接,

map、flatmap、filter等默认是ALWAYS)

e)上游节点的 chain 策略为 ALWAYS 或 HEAD(只能与下游链接,不能与上游链接,Source默认是HEAD)

f)上下游算子之间没有数据shuffle (数据分区方式是 forward)

4.根据group指定JobVertex所属SlotSharingGroup

5.配置checkpoint策略

6.配置重启策略

1.4 JobGraph -> ExecutionGraph



1.5 ExecutionGraph

从JobGraph转换ExecutionGraph的过程中,内部会出现如下的转换。

1.ExecutionJobVertex <- JobVertex:JobVertex转换为ExecutionJobVertex 。

2.ExecutionVertex(比如map)可以并发多个任务。

3.ExecutionEdge <- JobEdge:JobEdge转换为ExecutionEdge。

4.ExecutionGraph是一个2维结构。

5.根据2维结构分发对应Vertex到指定slot 。

2. DataStreamContext

Flink通过StreamExecutionEnvironment.getExecutionEnvironment()方法获取一个执行环境,Flink引用是在本地执行,还是以集群方式执行,系统会自动识别。如果是本地执行会调用createLocalEnvironment()方法,如果是集群执行会调用createExecutionEnvironment()。

3.数据源(DataSource)

Flink数据源可以有两种实现方式:

1.内置数据源

a)基于文件

b)基于Socket

c)基于Collection

2.自定义数据源

a)实现SourceFunction(非并行的)

b)实现ParallelSourceFunction

c)继承RichParallelSourceFunction

public class SimpleSourceFunction implements ParallelSourceFunction<Long> {

private long num = 0L;

private volatile boolean isRunning = true;

@Override

public void run(SourceContext<Long> sourceContext) throws Exception {

while (isRunning) {

sourceContext.collect(num); num++;

Thread.sleep(10000);

}

}

@Override

public void cancel() {

isRunning = false;

}

}

4. Transformation

Transformation(Operators/操作符/算子):可以将一个或多个DataStream转换为新的DataStream。

5. DataSink

Flink也包含两类Sink:

1.常用的sink会在后续的connectors中介绍。

2.自定义Sink

自定义Sink可以实现SinkFunction 接口,也可以继承RichSinkFunction。

6.流式迭代运算(Iterations)

简单理解迭代运算:

当前一次运算的输出作为下一次运算的输入(当前运算叫做迭代运算)。不断反复进行某种运算,直到达到某个条件才跳出迭代(是不是想起了递归)

流式迭代运算:

1.它没有最大迭代次数

2.它需要通过split/filter转换操作指定流的哪些部分数据反馈给迭代算子,哪些部分数据被转发到下游DataStream

3.基本套路

1)基于输入流构建IterativeStream(迭代头)

2)定义迭代逻辑(map fun等)

3)定义反馈流逻辑(从迭代过的流中过滤出符合条件的元素组成的部分流反馈给迭代头进行重复计算的逻辑)

4)调用IterativeStream的closeWith方法可以关闭一个迭代(也可表述为定义了迭代尾)

5)定义“终止迭代”的逻辑(符合条件的元素将被分发给下游而不用于进行下一次迭代)

4.流式迭代运算实例

问题域:输入一组数据,我们对他们分别进行减1运算,直到等于0为止.

import org.apache.flink.api.common.functions.FilterFunction;

import org.apache.flink.api.common.functions.MapFunction;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.datastream.IterativeStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**

 * @Author: lifei

* @Date: 2018/12/16下午6:43

 */

public class IterativeStreamJob {

    public static void main(String[] args) throws Exception {

//输入一组数据,我们对他们分别进行减1运算,直到等于0为止

        final StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Long> input=env.generateSequence(0,100);//1,2,3,4,5

//基于输入流构建IterativeStream(迭代头)

        IterativeStream<Long> itStream=input.iterate();

//定义迭代逻辑(map fun等)

        DataStream<Long> minusOne=itStream.map(new MapFunction<Long, Long>() {

            @Override

            public Long map(Long value) throws Exception {

                return value-1;

            }

        });

//定义反馈流逻辑(从迭代过的流中过滤出符合条件的元素组成的部分流反馈给迭代头进行重复计算的逻辑)

        DataStream<Long> greaterThanZero=minusOne.filter(new FilterFunction<Long>() {

            @Override

            public boolean filter(Long value) throws Exception {

                return value>0;

            }

        });

//调用IterativeStream的closeWith方法可以关闭一个迭代(也可表述为定义了迭代尾)

        itStream.closeWith(greaterThanZero);

//定义“终止迭代”的逻辑(符合条件的元素将被分发给下游而不用于进行下一次迭代)

        DataStream<Long> lessThanZero=minusOne.filter(new FilterFunction<Long>() {

            @Override

            public boolean filter(Long value) throws Exception {

                return value<=0;

            }

        });

        lessThanZero.print();

        env.execute("IterativeStreamJob");

    }

}

7. Execution参数

Controlling Latency(控制延迟)

1.默认情况下,流中的元素并不会一个一个的在网络中传输(这会导致不必要的网络流量消耗),而是缓存起来,缓存的大小可以在Flink的配置文件、 ExecutionEnvironment、设置某个算子上进行配置(默认100ms)。

1)好处:提高吞吐

2)坏处:增加了延迟

2.如何把握平衡

1)为了最大吞吐量,可以设置setBufferTimeout(-1),这会移除timeout机制,缓存中的数据一满就会被发送

2)为了最小的延迟,可以将超时设置为接近0的数(例如5或者10ms)

3)缓存的超时不要设置为0,因为设置为0会带来一些性能的损耗

3.其他更多的Execution参数后面会有专题讲解

8.调试

对于具体开发项目,Flink提供了多种调试手段。Streaming程序发布之前最好先进行调试,看看是不是能按预期执行。为了降低分布式流处理程序调试的难度,Flink提供了一些列方法:

1.本地执行环境

2.Collection Data Sources

3.Iterator Data Sink

本地执行环境:

本地执行环境不需要刻意创建,可以断点调试

final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

DataStream<String> lines = env.addSource(/* some source */);

env.execute();

Collection Data Sources:

Flink提供了一些Java 集合支持的特殊数据源来使得测试更加容易,程序测试成功后,将source和sink替换成真正source和sink即可。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

env.fromElements(1, 2, 3, 4, 5);

env.fromCollection(Collection);

env.fromCollection(Iterator, Class);

env.generateSequence(0, 1000)

Iterator Data Sink:

Flink提供一个特殊的sink来收集DataStream的结果

DataStream<Tuple2<String, Integer>> myResult = ...

Iterator<Tuple2<String, Integer>> myOutput = DataStreamUtils.collect(myResult)

(8)Operators串烧

1. DataStream Transformation

1.1 DataStream转换关系



上图标识了DataStream不同形态直接的转换关系,也可以看出DataStream主要包含以下几类:

1.keyby就是按照指定的key分组

2.window是一种特殊的分组(基于时间)

3.coGroup

4.join Join是cogroup 的特例

5.Connect就是松散联盟,类似于英联邦

1.2 DataStream

DataStream是 Flink 流处理 API 中最核心的数据结构。它代表了一个运行在多个分区上的并行流。

一个DataStream可以从 StreamExecutionEnvironment 通过env.addSource(SourceFunction) 获得。

1.3 map&flatMap

含义:数据映射(1进1出和1进n出)

转换关系:DataStream→ DataStream

使用场景:

ETL时删减计算过程中不需要的字段

案例1:

public class TestMap {

    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Long> input=env.generateSequence(0,10);

        DataStream plusOne=input.map(new MapFunction<Long, Long>() {

            @Override

            public Long map(Long value) throws Exception {

                System.out.println("--------------------"+value);

                return value+1;

            }

        });

        plusOne.print();

        env.execute();

    }

案例2:

public class TestFlatmap {

    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> input=env.fromElements(WORDS);

        DataStream<String> wordStream=input.flatMap(new FlatMapFunction<String, String>() {

            @Override

            public void flatMap(String value, Collector<String> out) throws Exception {

                String[] tokens = value.toLowerCase().split("\\W+");

                for (String token : tokens) {

                    if (token.length() > 0) {

                        out.collect(token);

                    }

                }

            }

        });

        wordStream.print();

        env.execute();

    }

    public static final String[] WORDS = new String[] {

            "To be, or not to be,--that is the question:--",

            "Whether 'tis nobler in the mind to suffer",

            "The slings and arrows of outrageous fortune",

            "And by opposing end them?--To die,--to sleep,--",

            "Be all my sins remember'd."

    };

}

如右上图所示,DataStream各个算子会并行运行,算子之间是数据流分区。如 Source 的第一个并行实例(S1)和 flatMap() 的第一个并行实例(m1)之间就是一个数据流分区。而在 flatMap() 和 map() 之间由于加了 rebalance(),它们之间的数据流分区就有3个子分区(m1的数据流向3个map()实例)。这与 Apache Kafka 是很类似的,把流想象成 Kafka Topic,而一个流分区就表示一个 Topic Partition,流的目标并行算子实例就是 Kafka Consumers。

1.4 filter

含义:数据筛选(满足条件event的被筛选出来进行后续处理),根据FliterFunction返回的布尔值来判断是否保留元素,true为保留,false则丢弃

转换关系:DataStream→ DataStream

使用场景:

过滤脏数据、数据清洗等

案例:

public class TestFilter {

    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Long> input=env.generateSequence(-5,5);

        input.filter(new FilterFunction<Long>() {

            @Override

            public boolean filter(Long value) throws Exception {

                return value>0;

            }

        }).print();

        env.execute();

    }

}

1.5 keyBy

含义:

根据指定的key进行分组(逻辑上把DataStream分成若干不相交的分区,key一样的event会被划分到相同的partition,内部采用hash分区来实现)

转换关系:DataStream→ KeyedStream

限制:

1.可能会出现数据倾斜,可根据实际情况结合物理分区来解决(后面马上会讲到)

2.Key的类型限制:

1)不能是没有覆盖hashCode方法的POJO

2)不能是数组

使用场景:

1.分组(类比SQL中的分组)

案例:

public class TestKeyBy {

    public static void main(String[] args) throws Exception {

//统计各班语文成绩最高分是谁

        final StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Tuple4<String,String,String,Integer>> input=env.fromElements(TRANSCRIPT);

        KeyedStream<Tuple4<String,String,String,Integer>,Tuple> keyedStream = input.keyBy("f0");

        keyedStream.maxBy("f3").print();

        env.execute();

    }

    public static final Tuple4[] TRANSCRIPT = new Tuple4[] {

Tuple4.of("class1","张三","语文",100),

Tuple4.of("class1","李四","语文",78),

Tuple4.of("class1","王五","语文",99),

Tuple4.of("class2","赵六","语文",81),

Tuple4.of("class2","钱七","语文",59),

Tuple4.of("class2","马二","语文",97)

    };

}

1.6 KeyedStream

KeyedStream用来表示根据指定的key进行分组的数据流。

一个KeyedStream可以通过调用DataStream.keyBy()来获得。

在KeyedStream上进行任何transformation都将转变回DataStream。

在实现中,KeyedStream是把key的信息写入到了transformation中。

每个event只能访问所属key的状态,其上的聚合函数可以方便地操作和保存对应key的状态。

1.7 reduce&fold& Aggregations

分组之后当然要对分组之后的数据也就是KeyedStream进行各种聚合操作啦(想想SQL)。

KeyedStream→ DataStream

对于KeyedStream的聚合操作都是滚动的(rolling,在前面的状态基础上继续聚合),千万不要理解为批处理时的聚合操作(DataSet,其实也是滚动聚合,只不过他只把最后的结果给了我们)。

案例1:

public class TestReduce {

    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Tuple4<String,String,String,Integer>> input=env.fromElements(TRANSCRIPT);

        KeyedStream<Tuple4<String,String,String,Integer>,Tuple> keyedStream = input.keyBy(0);

        keyedStream.reduce(new ReduceFunction<Tuple4<String, String, String, Integer>>() {

            @Override

            public Tuple4<String, String, String, Integer> reduce(Tuple4<String, String, String,

Integer> value1, Tuple4<String, String, String, Integer> value2) throws Exception {

                value1.f3+=value2.f3;

                return value1;

            }

        }).print();

        env.execute();

    }

    public static final Tuple4[] TRANSCRIPT = new Tuple4[] {

Tuple4.of("class1","张三","语文",100),

Tuple4.of("class1","李四","语文",78),

Tuple4.of("class1","王五","语文",99),

Tuple4.of("class2","赵六","语文",81),

Tuple4.of("class2","钱七","语文",59),

Tuple4.of("class2","马二","语文",97)

    };

}

案例2:

public class TestFold {

    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Tuple4<String,String,String,Integer>> input=env.fromElements(TRANSCRIPT);

        DataStream<String> result =input.keyBy(0).fold("Start", new FoldFunction<Tuple4<String,String,String,Integer>,String>() {

            @Override

            public String fold(String accumulator, Tuple4<String, String, String, Integer> value) throws Exception {

                return accumulator + "=" + value.f1;

            }

        });

        result.print();

        env.execute();

    }

    public static final Tuple4[] TRANSCRIPT = new Tuple4[] {

Tuple4.of("class1","张三","语文",100),

Tuple4.of("class1","李四","语文",78),

Tuple4.of("class1","王五","语文",99),

Tuple4.of("class2","赵六","语文",81),

Tuple4.of("class2","钱七","语文",59),

Tuple4.of("class2","马二","语文",97)

    };

}

1.8 Interval join

KeyedStream,KeyedStream→ DataStream

在给定的周期内,按照指定的key对两个KeyedStream进行join操作,把符合join条件的两个event拉到一起,然后怎么处理由用户你来定义。

key1 == key2 && e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound

场景:把一定时间范围内相关的分组数据拉成一个宽表

案例:

public class TestIntervalJoin {

    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();

        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        DataStream<Transcript> input1=env.fromElements(TRANSCRIPTS).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Transcript>() {

            @Override

            public long extractAscendingTimestamp(Transcript element) {

                return element.time;

            }

        });

        DataStream<Student> input2=env.fromElements(STUDENTS).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Student>() {

            @Override

            public long extractAscendingTimestamp(Student element) {

  return element.time;

            }

        });

        KeyedStream<Transcript,String>  keyedStream=input1.keyBy(new KeySelector<Transcript, String>() {

            @Override

            public String getKey(Transcript value) throws Exception {

                return value.id;

            }

        });

        KeyedStream<Student,String>  otherKeyedStream=input2.keyBy(new KeySelector<Student, String>() {

            @Override

            public String getKey(Student value) throws Exception {

                return value.id;

            }

        });

        //e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound

        // key1 == key2 && leftTs - 2 < rightTs < leftTs + 2

        keyedStream.intervalJoin(otherKeyedStream)

                .between(Time.milliseconds(-2), Time.milliseconds(2))

                .upperBoundExclusive()

                .lowerBoundExclusive()

                .process(new ProcessJoinFunction<Transcript, Student, Tuple5<String,String,String,String,Integer>>() {

                    @Override

                    public void processElement(Transcript transcript, Student student, Context ctx, Collector<Tuple5<String, String, String, String, Integer>> out) throws Exception {

                        out.collect(Tuple5.of(transcript.id,transcript.name,student.class_,transcript.subject,transcript.score));

                    }

                }).print();

        env.execute();

    }

.P

  public static final Transcript[] TRANSCRIPTS = new Transcript[] {

new Transcript("1","张三","语文",100,System.currentTimeMillis()),

new Transcript("2","李四","语文",78,System.currentTimeMillis()),

new Transcript("3","王五","语文",99,System.currentTimeMillis()),

new Transcript("4","赵六","语文",81,System.currentTimeMillis()),

new Transcript("5","钱七","语文",59,System.currentTimeMillis()),

new Transcript("6","马二","语文",97,System.currentTimeMillis())

    };

    public static final Student[] STUDENTS = new Student[] {

new Student("1","张三","class1",System.currentTimeMillis()),

new Student("2","李四","class1",System.currentTimeMillis()),

new Student("3","王五","class1",System.currentTimeMillis()),

new Student("4","赵六","class2",System.currentTimeMillis()),

new Student("5","钱七","class2",System.currentTimeMillis()),

new Student("6","马二","class2",System.currentTimeMillis())

    };

    private static class Transcript{

        private String id;

        private String name;

        private String subject;

        private int score;

        private long time;

        public Transcript(String id, String name, String subject, int score, long time) {

            this.id = id;

            this.name = name;

            this.subject = subject;

            this.score = score;

            this.time = time;

        }

        public String getId() {

            return id;

        }

        public void setId(String id) {

            this.id = id;

        }

        public String getName() {

            return name;

 }

        public void setName(String name) {

            this.name = name;

        }

        public String getSubject() {

            return subject;

        }

        public void setSubject(String subject) {

            this.subject = subject;

        }

        public int getScore() {

            return score;

        }

        public void setScore(int score) {

            this.score = score;

        }

        public long getTime() {

            return time;

        }

        public void setTime(long time) {

            this.time = time;

        }

    }

    private static class Student{

        private String id;

        private String name;

        private String class_;

        private long time;

        public Student(String id, String name, String class_, long time) {

            this.id = id;

            this.name = name;

            this.class_ = class_;

            this.time = time;

        }

   public String getId() {

            return id;

        }

        public void setId(String id) {

            this.id = id;

        }

        public String getName() {

            return name;

        }

        public void setName(String name) {

            this.name = name;

        }

        public String getClass_() {

            return class_;

        }

        public void setClass_(String class_) {

            this.class_ = class_;

        }

        public long getTime() {

            return time;

        }

        public void setTime(long time) {

            this.time = time;

        }

    }

}

相关文章

  • flink学习笔记-支持的数据类型

    说明:本文为《Flink大数据项目实战》学习笔记,想通过视频系统学习Flink这个最火爆的大数据计算框架的同学,推...

  • Flink基础系列18-支持的数据类型

    一.Flink支持的数据类型 Flink流应用程序处理的是以数据对象表示的事件流。所以在Flink内部,我们需要能...

  • 03-flink编程模型

    03-flink编程模型 Flink编程接口 根据数据类型分为两大类: 支持批计算的接口DataSet API 支...

  • Apache Flink——Flink 支持的数据类型

    一、Flink 的类型系统 Flink 作为一个分布式处理框架,处理的是以数据对象作为元素的流。如果用水流来类比,...

  • Flink支持的数据类型

    Flink流应用程序处理的是以数据对象表示的事件流。所以在Flink内部,我们需要能够处理这些对象。它们需要被序列...

  • Flink10:Flink支持的数据类型

    Flink流应用程序处理的是以数据对象表示的事件流。所以在Flink内部,我们需要能够处理这些对象。它们需要被序列...

  • Flink 流流关联( Interval Join)总结

    Flink对流流JOIN的支持 Flink对于join的支持有多种支持,可参考 Flink Join类型, 本文主...

  • Flink(10) 支持的数据类型

    简介 Flink 流应用程序处理的是以数据对象表示的事件流,所有我们需要能够处理这些对象,他们需要被序列化和反序列...

  • MySQL数据类型

    MySQL学习笔记(2) mysql支持的数据类型 mysql数值类型 关于每个类型的详细信息可以通过 ? int...

  • Flink学习笔记:Connectors概述

    本文为《Flink大数据项目实战》学习笔记,想通过视频系统学习Flink这个最火爆的大数据计算框架的同学,推荐学习...

网友评论

    本文标题:flink学习笔记-支持的数据类型

    本文链接:https://www.haomeiwen.com/subject/kmyavqtx.html