美文网首页flinkFlink
flink算子:union和connect

flink算子:union和connect

作者: 王吉吉real | 来源:发表于2021-01-29 22:33 被阅读0次

    在合并数据流时,可以使用union和connect两种算子,两者的使用方式如下。

    union

    1、可以合并两个以上的数据流;
    2、合并的各实时流数据类型必须相同;
    3、合并的结果也是一个同类型数据流,两个DataStream合并结果为DataStream数据流,两个DataSet合并结果为DataSet流;

    union方法代码如下,结果是创建了一个新的数据流

    public final DataStream<T> union(DataStream<T>... streams) {
        List<StreamTransformation<T>> unionedTransforms = new ArrayList<>();
        unionedTransforms.add(this.transformation);
    
        for (DataStream<T> newStream : streams) {
            if (!getType().equals(newStream.getType())) {   //判断数据类型是否一致
                throw new IllegalArgumentException("Cannot union streams of different types: " + getType() + " and " + newStream.getType());
            }
            unionedTransforms.add(newStream.getTransformation());
        }
        //构建新的数据流
        return new DataStream<>(this.environment, new UnionTransformation<>(unionedTransforms));//通过使用 UnionTransformation 将多个 StreamTransformation 合并起来
    }
    

    使用方法:

    //数据流 1 和 2
    final DataStream<Integer> stream1 = env.addSource(...);
    final DataStream<Integer> stream2 = env.addSource(...);
    //union
    stream1.union(stream2)
    

    connect

    1、只能用于连接两个DataStream流,不能用于DataSet;
    2、连接的两个数据流数据类型可以不同
    3、连接的结果为一个ConnectedStreams流
    4、连接后两个流可以使用不同的处理方法,两个流可以共享状态;
    5、连接后可以使用CoMapFunction或CoFlatMapFunction等方法进行处理

    如果连接的两个流是DataStream的话,那么连接后的数据流为 ConnectedStreams,也是新创建的:

    public <R> ConnectedStreams<T, R> connect(DataStream<R> dataStream) {
        return new ConnectedStreams<>(environment, this, dataStream);
    }
    

    如果连接的数据流是一个 BroadcastStream(广播数据流),那么连接后的数据流是一个 BroadcastConnectedStream。

    public <R> BroadcastConnectedStream<T, R> connect(BroadcastStream<R> broadcastStream) {
        return new BroadcastConnectedStream<>(
                environment, this, Preconditions.checkNotNull(broadcastStream), 
                broadcastStream.getBroadcastStateDescriptor());
    }
    

    具体使用:

    //1、连接 DataStream
    DataStream<Tuple2<Long, Long>> src1 = env.fromElements(new Tuple2<>(0L, 0L));
    DataStream<Tuple2<Long, Long>> src2 = env.fromElements(new Tuple2<>(0L, 0L));
    ConnectedStreams<Tuple2<Long, Long>, Tuple2<Long, Long>> connected = src1.connect(src2);
    
    //2、连接 BroadcastStream
    DataStream<Tuple2<Long, Long>> src1 = env.fromElements(new Tuple2<>(0L, 0L));
    final BroadcastStream<String> broadcast = srcTwo.broadcast(utterDescriptor);
    BroadcastConnectedStream<Tuple2<Long, Long>, String> connect = src1.connect(broadcast);
    

    相关文章

      网友评论

        本文标题:flink算子:union和connect

        本文链接:https://www.haomeiwen.com/subject/fsictltx.html