美文网首页
Flink Transtream 算子

Flink Transtream 算子

作者: wudl | 来源:发表于2020-12-23 19:38 被阅读0次

    2.5 Transform 算子之-----select

    使用场景:
    将数据流处理处理之后, 给流打一个标记,然后之后根据标记将流输出出来,就用select 算子。
    
    package com.wudl.core;
    
    import org.apache.flink.shaded.netty4.io.netty.handler.codec.http2.Http2Exception;
    import org.apache.flink.streaming.api.collector.selector.OutputSelector;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SplitStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import java.util.Arrays;
    /**
     * TODO
     * <p>
     * 将数据流进行后,如何从流中将不同的标记取出来,这时就需要用select 算子
     *
     * @author wudl
     * @version 1.0
     * @date 2020/12/23 15:29
     */
    public class TransformSelect {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            DataStreamSource<Integer> inpputDs = env.fromCollection(Arrays.asList(1, 2, 4, 5, 7, 8, 9));
            SplitStream<Integer> splitStream = inpputDs.split(new OutputSelector<Integer>() {
                @Override
                public Iterable<String> select(Integer value) {
                    if (value <= 3) {
                        return Arrays.asList("3");
                    } else if (value > 3 || value < 7) {
                        return Arrays.asList("4");
                    } else {
                        return Arrays.asList("10");
                    }
                }
            });
    /**
     * 通过不同的标签取出不同的流
     *一个流对应的标签名称,取出来的时候,给定名称就行
     */
            splitStream.select("3").print("3");
            splitStream.select("4").print("4");
            splitStream.select("10").print("10");
            env.execute();
    
        }
    }
    
    

    2.6Transform 算子之 --connect

    特点: 
    两个流进行合并成为一个新的流(两个流的数据类型可以不一样)
    使用场景:
    对账功能,比如第三方交易信息和订单支付做对比。
    
    package com.wudl.core;
    
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.streaming.api.datastream.ConnectedStreams;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.co.CoMapFunction;
    
    import java.util.Arrays;
    
    /**
     * TODO
     *  两个流进行合并成为一条流 ---数据类型可以不一致
     * @author wudl
     * @version 1.0
     * @date 2020/12/23 16:13
     */
    public class TreansformConnect {
        public static void main(String[] args) throws Exception {
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
            DataStreamSource<Integer> inputDs01 = env.fromCollection(Arrays.asList(1, 2, 5, 6, 7, 9));
            DataStreamSource<String> inputDs02 = env.fromCollection(Arrays.asList("hadoop", "java", "flink"));
            ConnectedStreams<Integer, String> connecteDs = inputDs01.connect(inputDs02);
    
            DataStream<String> mapDs = connecteDs.map(new CoMapFunction<Integer, String, String>() {
                @Override
                public String map1(Integer value) throws Exception {
                    return value+"";
                }
    
                @Override
                public String map2(String value) throws Exception {
                    return value;
                }
            });
            mapDs.print();
            env.execute();
    
        }
    }
    
    

    2.7Transform 算子之 --union

    特点:
    将两个流合并成为一个新的流(需要主要的是两个流的数据类型必须保持一致)
    场景:
    对账功能,比如第三方交易信息和订单支付做对比。
    
    package com.wudl.core;
    
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import java.util.Arrays;
    
    /**
     * TODO
     *  算子 union 将两个流合并成为一个新的流(需要注意的是, 这两个流的数据类型必须要保持一致)
     * @author wudl
     * @version 1.0
     * @date 2020/12/23 17:37
     */
    public class TransformUnion {
        public static void main(String[] args) throws Exception {
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            DataStreamSource<String> inutDs01 = env.fromCollection(Arrays.asList("hbase","clickhouse"));
            DataStreamSource<String> inputDs02 = env.fromCollection(Arrays.asList("hadoop", "spark", "flink"));
            DataStream<String> union = inutDs01.union(inputDs02);
            union.print();
            env.execute();
    
    
        }
    }
    
    

    相关文章

      网友评论

          本文标题:Flink Transtream 算子

          本文链接:https://www.haomeiwen.com/subject/azuunktx.html