美文网首页Flink文档翻译Spark & Flink
Flink的Side Output(侧输出)

Flink的Side Output(侧输出)

作者: 写Bug的张小天 | 来源:发表于2017-06-12 18:15 被阅读493次

    除了从DataStream操作的结果中获取主数据流之外,你还可以产生任意数量额外的侧输出结果流。侧输出结果流的数据类型不需要与主数据流的类型一致,不同侧输出流的类型也可以不同。当您想要拆分数据流时(通常必须复制流),然后从每个流过滤出您不想拥有的数据,此操作将非常有用。
    当使用侧输出流时,你首先得定义一个OutputTag,这个OutputTag将用来标识一个侧输出流:
    Java 代码:

    // this needs to be an anonymous inner class, so that we can analyze the type
    OutputTag<String> outputTag = new OutputTag<String>("side-output") {};
    

    Scala代码:

    val outputTag = OutputTag[String]("side-output")
    

    注意,OutputTag是根据侧输出流所包含的元素的类型来输入的。
    数据发送到侧输出流只能从一个ProcessFunction中发出,你可以使用Context参数来发送数据到一个通过OutputTag标记的侧输出流中:
    Java 代码:

    DataStream<Integer> input = ...;
    
    final OutputTag<String> outputTag = new OutputTag<String>("side-output"){};
    
    SingleOutputStreamOperator<Integer> mainDataStream = input
      .process(new ProcessFunction<Integer, Integer>() {
    
          @Override
          public void processElement(
              Integer value,
              Context ctx,
              Collector<Integer> out) throws Exception {
            // 将数据发送到常规输出中
            out.collect(value);
    
            // 将数据发送到侧输出中
            ctx.output(outputTag, "sideout-" + String.valueOf(value));
          }
        });
    

    Scala代码:

    val input: DataStream[Int] = ...
    val outputTag = OutputTag[String]("side-output")
    
    val mainDataStream = input
      .process(new ProcessFunction[Int, Int] {
        override def processElement(
            value: Int,
            ctx: ProcessFunction[Int, Int]#Context,
            out: Collector[Int]): Unit = {
          // 将数据发送到常规输出中
          out.collect(value)
    
          // 将数据发送到侧输出中
          ctx.output(outputTag, "sideout-" + String.valueOf(value))
        }
      })
    

    你可以在DataStream操作的结果中使用getSideOutput(OutputTag)来获取侧输出,这里为您提供一个DataStream类型,用于输出端输出流的结果:
    Java 代码:

    final OutputTag<String> outputTag = new OutputTag<String>("side-output"){};
    
    SingleOutputStreamOperator<Integer> mainDataStream = ...;
    
    DataStream<String> sideOutputStream = mainDataStream.getSideOutput(outputTag);
    

    Scala代码:

    val outputTag = OutputTag[String]("side-output")
    
    val mainDataStream = ...
    
    val sideOutputStream: DataStream[String] = mainDataStream.getSideOutput(outputTag)
    

    相关文章

      网友评论

        本文标题:Flink的Side Output(侧输出)

        本文链接:https://www.haomeiwen.com/subject/wbolqxtx.html