- map 算子 接收 t o t是传入参数的数据类型 o是接收的数据类型
如果返回是Tuple Tuple中也要指定泛型
//todo 此时数据流内是一个一个 单词, 将单词转换成 tuple元组
SingleOutputStreamOperator<Tuple2<String, Integer>> wordTupleDS = wordDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
return Tuple2.of(value, 1);
}
});
-
keyby
在Flink中如果是批处理,分组使用函数:groupBy
,从Flink 1.12以后开始,由于流批一体,无论是流计算还是批处理,分组函数:keyBy
。使用:
如果是元组 可以指定下标
如果是 javabean 可以指定对象属性
指定select 选择器
官方建议 使用 lambda 表达式
image.png
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
*
* 使用keyby算子 对数据流分组
* @author ado
*/
public class TransformatKeyByDemo {
public static void main(String[] args) throws Exception {
// 1. 执行环境-env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 2. 数据源-source
// 使用 可变参数 数据源 传入元素为tuple 元组对象
DataStreamSource<Tuple2<String, Integer>> inputDataStream = env.fromElements(
Tuple2.of("spark", 1), Tuple2.of("flink", 1), Tuple2.of("hadoop", 1),
Tuple2.of("spark", 1), Tuple2.of("flink", 1), Tuple2.of("hadoop", 1)
);
// 3. 数据转换-transformation
// keyby算子有三种使用方式
//todo 数据为元组时,指定key下标
SingleOutputStreamOperator<Tuple2<String, Integer>> ds1 = inputDataStream.keyBy(0).sum(1);
// ds1.printToErr();
//todo 数据为 javabean对象时 指定属性
SingleOutputStreamOperator<Tuple2<String, Integer>> ds2 = inputDataStream.keyBy("f0").sum("f1");
ds2.printToErr();
//todo 数据为 keyselector接口对象时 (最标准)
// IN 是传入的元素 key 是元素key的类型
SingleOutputStreamOperator<Tuple2<String, Integer>> ds3 = inputDataStream.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
// z这个方法其实就是 获取key
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
}).sum(1);
ds3.printToErr();
// todo 官方建议使用 lmbad表达式
inputDataStream.keyBy((Tuple2<String, Integer> value) ->{
return value.f0;
} ).sum(1);
inputDataStream.keyBy(value -> value.f0);
// 4. 数据终端-sink
// 5. 触发执行-execute
env.execute("TransformatKeyByDemo");
}
}
reduce 算子
-
对分组流中每个组内数据的聚合,来一条数据,聚合一条数据
比sum功能全面 -
需要指定 两个变量 tmp item
tmp 表示聚合的中间变量
itrem 表示要聚合的元素
第一次计算的时候 只会赋值 不会计算 -
todo: KeyedStream使用reduce算子对分组流中各个组内数据聚合时,要求组内数据聚合结果类型与流中数据类型相同
package cn.itcast.flink.transformation;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* 测试reduce 算子
* reduce 的返回值必须跟传入的数据流 结果一致
* reduce 接收两个参数 一个是tmp 聚合中间变量 一个是 iteam 要聚合的元素
* // 第一次计算的时候 只会赋值 不会计算
* System.out.println("tmp = " + tmp + ", item = " + item);
* @author ado
*/
public class TransformationReduceDemo {
public static void main(String[] args) throws Exception {
// 1. 执行环境-env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. 数据源-source
DataStreamSource<Tuple2<String, Integer>> inputDataStream = env.fromElements(
Tuple2.of("flink", 1), Tuple2.of("spark", 1), Tuple2.of("flink", 1),
Tuple2.of("flink", 1), Tuple2.of("flink", 1), Tuple2.of("spark", 1)
);
// 3. 数据转换-transformation
KeyedStream<Tuple2<String, Integer>, Tuple> stream = inputDataStream.keyBy(0);
SingleOutputStreamOperator<Tuple2<String, Integer>> reduceDS= stream.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> tmp,
Tuple2<String, Integer> item) throws Exception {
// 第一次计算的时候 只会赋值 不会计算
System.out.println("tmp = " + tmp + ", item = " + item);
Integer historyValue = tmp.f1;
Integer nowValue = item.f1;
historyValue += nowValue;
return Tuple2.of(tmp.f0, historyValue);
}
});
// 4. 数据终端-sink
reduceDS.printToErr();
// 5. 触发执行-execute
env.execute("TransformationReduceDemo");
}
}
- max/maxby
-
max或min:
只会求出最大或最小的那个字段
,其他的字段不管 -
maxBy或minBy:
会求出最大或最小的那个字段和对应的其他的字段
package cn.itcast.flink.transformation;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
*
* 测试Flink 流式计算 max min /maxby minby 算子
* 功能 求分组后的 最大最小值
* - **max或min**:`只会求出最大或最小的那个字段`,其他的字段不管
* - **maxBy或minBy**:`会求出最大或最小的那个字段和对应的其他的字段`
* @author ado
*/
public class TransformationMaxMinDemo {
public static void main(String[] args) throws Exception {
// 1. 执行环境-env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 2. 数据源-source
DataStreamSource<Tuple3<String, String, Integer>> inputDS = env.fromElements(
Tuple3.of("上海", "浦东新区", 777),
Tuple3.of("上海", "闵行区", 999),
Tuple3.of("上海", "杨浦区", 666),
Tuple3.of("北京", "东城区", 567),
Tuple3.of("北京", "西城区", 987),
Tuple3.of("上海", "静安区", 888),
Tuple3.of("北京", "海淀区", 9999)
);
// 3. 数据转换-transformation
inputDS.keyBy(0).max(2).printToErr();
inputDS.keyBy(0).maxBy(2).print();
// 4. 数据终端-sink
// 5. 触发执行-execute
env.execute("TransformationMaxMinDemo");
}
}
union collect 算子
-
union
函数:可以合并多个同类型的数据流
,并生成同类型的数据流,即可以将多个DataStream[T]合并为一个新的DataStream[T]。 -
connect
函数:与union函数功能类似,用来连接两个数据流
,且2个数据流数据类型可不一样
-
connect只能连接两个数据流,union可以连接多个数据流;
-
connect所连接的两个数据流的数据类型可以不一致,union所连接的两个数据流的数据类型必须一致;
todo:2个流进行连接connect,数据类型可以不相同,通常应用于大表数据流关联小表数据流 当将2个流连接以后,得到连接流,必须调用map、flatMap等算子,对流中数据处理,否则不能输出
package cn.itcast.flink.transformation;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import scala.Tuple2;
/**
* @author ado
*/
public class TransformationUnionConnectDemo {
public static void main(String[] args) throws Exception {
// 1. 执行环境-env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 2. 数据源-source
DataStream<String> dataStream01 = env.fromElements("A", "B", "C", "D");
DataStream<String> dataStream02 = env.fromElements("aa", "bb", "cc", "dd");
DataStream<Integer> dataStream03 = env.fromElements(1, 2, 3, 4);
// 3. 数据转换-transformation
// todo: 2个流进行union,要求流中数据类型必须相同
//将两个流的数据合到一个流中 列如 DS1(a,b,c ) +DS2(A,B,C) -> DS3(a,b,c,A,B,C)
DataStream<String> unionDS = dataStream01.union(dataStream02);
unionDS.print();
// 4. 数据终端-sink
/*
todo:2个流进行连接connect,数据类型可以不相同,通常应用于大表数据流关联小表数据流
当将2个流连接以后,得到连接流,必须调用map、flatMap等算子,对流中数据处理,否则不能输出
*/
ConnectedStreams<String, Integer> connectDS = dataStream01.connect(dataStream03);
SingleOutputStreamOperator<String> output = connectDS.map(new CoMapFunction<String, Integer, String>() {
// COMAP接收的是三个参数 1 2是两个流的输入参数 3 是out 输出
@Override
public String map1(String value) throws Exception {
return value;
}
@Override
public String map2(Integer value) throws Exception {
return value+"";
}
});
output.printToErr("connect");
// 5. 触发执行-execute
env.execute("TransformationUnionConnectDemo");
}
}
网友评论