美文网首页
Flink Transtream 算子

Flink Transtream 算子

作者: wudl | 来源:发表于2020-12-23 19:38 被阅读0次

2.5 Transform 算子之-----select

使用场景:
将数据流处理处理之后, 给流打一个标记,然后之后根据标记将流输出出来,就用select 算子。
package com.wudl.core;

import org.apache.flink.shaded.netty4.io.netty.handler.codec.http2.Http2Exception;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;
/**
 * TODO
 * <p>
 * 将数据流进行后,如何从流中将不同的标记取出来,这时就需要用select 算子
 *
 * @author wudl
 * @version 1.0
 * @date 2020/12/23 15:29
 */
public class TransformSelect {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<Integer> inpputDs = env.fromCollection(Arrays.asList(1, 2, 4, 5, 7, 8, 9));
        SplitStream<Integer> splitStream = inpputDs.split(new OutputSelector<Integer>() {
            @Override
            public Iterable<String> select(Integer value) {
                if (value <= 3) {
                    return Arrays.asList("3");
                } else if (value > 3 || value < 7) {
                    return Arrays.asList("4");
                } else {
                    return Arrays.asList("10");
                }
            }
        });
/**
 * 通过不同的标签取出不同的流
 *一个流对应的标签名称,取出来的时候,给定名称就行
 */
        splitStream.select("3").print("3");
        splitStream.select("4").print("4");
        splitStream.select("10").print("10");
        env.execute();

    }
}

2.6Transform 算子之 --connect

特点: 
两个流进行合并成为一个新的流(两个流的数据类型可以不一样)
使用场景:
对账功能,比如第三方交易信息和订单支付做对比。
package com.wudl.core;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;

import java.util.Arrays;

/**
 * TODO
 *  两个流进行合并成为一条流 ---数据类型可以不一致
 * @author wudl
 * @version 1.0
 * @date 2020/12/23 16:13
 */
public class TreansformConnect {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<Integer> inputDs01 = env.fromCollection(Arrays.asList(1, 2, 5, 6, 7, 9));
        DataStreamSource<String> inputDs02 = env.fromCollection(Arrays.asList("hadoop", "java", "flink"));
        ConnectedStreams<Integer, String> connecteDs = inputDs01.connect(inputDs02);

        DataStream<String> mapDs = connecteDs.map(new CoMapFunction<Integer, String, String>() {
            @Override
            public String map1(Integer value) throws Exception {
                return value+"";
            }

            @Override
            public String map2(String value) throws Exception {
                return value;
            }
        });
        mapDs.print();
        env.execute();

    }
}

2.7Transform 算子之 --union

特点:
将两个流合并成为一个新的流(需要主要的是两个流的数据类型必须保持一致)
场景:
对账功能,比如第三方交易信息和订单支付做对比。
package com.wudl.core;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Arrays;

/**
 * TODO
 *  算子 union 将两个流合并成为一个新的流(需要注意的是, 这两个流的数据类型必须要保持一致)
 * @author wudl
 * @version 1.0
 * @date 2020/12/23 17:37
 */
public class TransformUnion {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<String> inutDs01 = env.fromCollection(Arrays.asList("hbase","clickhouse"));
        DataStreamSource<String> inputDs02 = env.fromCollection(Arrays.asList("hadoop", "spark", "flink"));
        DataStream<String> union = inutDs01.union(inputDs02);
        union.print();
        env.execute();


    }
}

相关文章

网友评论

      本文标题:Flink Transtream 算子

      本文链接:https://www.haomeiwen.com/subject/azuunktx.html