2.5 Transform 算子之-----select
使用场景:
将数据流处理处理之后, 给流打一个标记,然后之后根据标记将流输出出来,就用select 算子。
package com.wudl.core;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http2.Http2Exception;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;
/**
* TODO
* <p>
* 将数据流进行后,如何从流中将不同的标记取出来,这时就需要用select 算子
*
* @author wudl
* @version 1.0
* @date 2020/12/23 15:29
*/
public class TransformSelect {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Integer> inpputDs = env.fromCollection(Arrays.asList(1, 2, 4, 5, 7, 8, 9));
SplitStream<Integer> splitStream = inpputDs.split(new OutputSelector<Integer>() {
@Override
public Iterable<String> select(Integer value) {
if (value <= 3) {
return Arrays.asList("3");
} else if (value > 3 || value < 7) {
return Arrays.asList("4");
} else {
return Arrays.asList("10");
}
}
});
/**
* 通过不同的标签取出不同的流
*一个流对应的标签名称,取出来的时候,给定名称就行
*/
splitStream.select("3").print("3");
splitStream.select("4").print("4");
splitStream.select("10").print("10");
env.execute();
}
}
2.6Transform 算子之 --connect
特点:
两个流进行合并成为一个新的流(两个流的数据类型可以不一样)
使用场景:
对账功能,比如第三方交易信息和订单支付做对比。
package com.wudl.core;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import java.util.Arrays;
/**
* TODO
* 两个流进行合并成为一条流 ---数据类型可以不一致
* @author wudl
* @version 1.0
* @date 2020/12/23 16:13
*/
public class TreansformConnect {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Integer> inputDs01 = env.fromCollection(Arrays.asList(1, 2, 5, 6, 7, 9));
DataStreamSource<String> inputDs02 = env.fromCollection(Arrays.asList("hadoop", "java", "flink"));
ConnectedStreams<Integer, String> connecteDs = inputDs01.connect(inputDs02);
DataStream<String> mapDs = connecteDs.map(new CoMapFunction<Integer, String, String>() {
@Override
public String map1(Integer value) throws Exception {
return value+"";
}
@Override
public String map2(String value) throws Exception {
return value;
}
});
mapDs.print();
env.execute();
}
}
2.7Transform 算子之 --union
特点:
将两个流合并成为一个新的流(需要主要的是两个流的数据类型必须保持一致)
场景:
对账功能,比如第三方交易信息和订单支付做对比。
package com.wudl.core;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;
/**
* TODO
* 算子 union 将两个流合并成为一个新的流(需要注意的是, 这两个流的数据类型必须要保持一致)
* @author wudl
* @version 1.0
* @date 2020/12/23 17:37
*/
public class TransformUnion {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> inutDs01 = env.fromCollection(Arrays.asList("hbase","clickhouse"));
DataStreamSource<String> inputDs02 = env.fromCollection(Arrays.asList("hadoop", "spark", "flink"));
DataStream<String> union = inutDs01.union(inputDs02);
union.print();
env.execute();
}
}
网友评论