Flink提供:侧边输出SideOutput
方式,可以将1个流进行侧边输出多个流。不影响主要流的方式
![](https://img.haomeiwen.com/i27732789/6a23ed1862968070.png)
1 使用场景
在处理数据的时候,有时候想对不同情况的数据进行不同的处理,那么就需要把数据流进行分流。可以在主数据流上产生出任意数量额外的侧输出流。
与flatMap的不同
- SIdeOutput 是将一个流,侧边输出多个流,不影响主流
- flatMap 处理完成后还是一个流,且该流的数据类型都是相同的
- SideOutput输出结果流的数据类型不需要与主数据流的类型一致,不同侧输出流的类型也可以不同。
2 使用
以下以 获取 数据流中的奇 偶数为例子
2-1 定义标签
// todo 使用OutputTag 定义标签 传入的long类型
// 子类对象
OutputTag<Long> oddTag = new OutputTag<Long>("side-odd"){}; //定义偶数标签
OutputTag<Long> evenTag = new OutputTag<Long>("side-even"){}; //定义奇数标签
2-2 调用DataStream中process
底层处理函数
按照判断逻辑给数据打标签,因为底层api内可以传入一个context上下文对象,可以上下文对象对数据打标签,常用的底层api
- ProcessFunction
- CoProcessFunction
- ProcessWindowFunction
- ProcessAllWindowFunction
//todo 使用datastream 中最底层API process算子 对流中每条数据进行处理
//T输入是 Long类型 O 输出是String
SingleOutputStreamOperator<String> processDataStream = inputDataStream.process(new ProcessFunction<Long, String>() {
@Override
public void processElement(Long value, Context ctx, Collector<String> out) throws Exception {
// 将其平方 为了测试
double pow = Math.pow(value, 2);
out.collect(pow + "");
//如果满足条件 就给这条数据使用 ctx 上下文对象 打上标签
if (value % 2 == 0) ctx.output(evenTag, value);
else ctx.output(oddTag, value);
}
});
2-3 获取对应标签的DataStream
使用 getSideOutput(标签名)
获取
//todo 根据标签获取侧边流
processDataStream.getSideOutput(oddTag).print("odd");
DataStream<Long> evenSideDS = processDataStream.getSideOutput(oddTag);
evenSideDS.print("envn");
代码
package transformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
/**
* Flink 流计算中转换算状况:使用侧边输出流进行分割主流中数据,进行打标签,到侧边输出(相当于分割子流)
* @author xuanyu
*/
public class TransformationSideOutputDemo {
public static void main(String[] args) throws Exception {
// 1. 执行环境-env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1) ;
// 2. 数据源-source
DataStreamSource<Long> inputDataStream = env.fromSequence(1, 10);
// 3. 数据转换-transformation
// step1. 定义输出标签:奇数标签和偶数标签
OutputTag<Long> oddTag = new OutputTag<Long>("side-odd"){} ;
OutputTag<Long> evenTag = new OutputTag<Long>("side-even"){} ;
// todo: process算子(属于DataStream API中提供最底层API)对流中每条数据处理
SingleOutputStreamOperator<String> processDataStream = inputDataStream.process(
new ProcessFunction<Long, String>() {
@Override
public void processElement(Long value, Context ctx, Collector<String> out) throws Exception {
// value -> 数据流中每条条数据,ctx -> 表示程序运行时上下文对象, out:收集器,将数据集发送到下游数据处理
double powValue = Math.pow(value, 2);
out.collect(powValue + "");
// step2. 判断流中数据是否符合条件,如果符合条件,打上对应标签,并且输出
if(value % 2 == 0){
// 此时数据属于偶数,打上偶数标签
ctx.output(evenTag, value);
}else{
// 此时数据属于奇数
ctx.output(oddTag, value);
}
}
}
);
// 4. 数据终端-sink
processDataStream.printToErr();
// step3. 依据标签获取侧边输出流
DataStream<Long> oddStream = processDataStream.getSideOutput(oddTag);
oddStream.printToErr("odd");
DataStream<Long> evenStream = processDataStream.getSideOutput(evenTag);
evenStream.printToErr("even");
// 5. 触发执行-execute
env.execute("TransformationSideOutputDemo");
}
}
网友评论