美文网首页
任务链(Operator Chains)

任务链(Operator Chains)

作者: yayooo | 来源:发表于2019-08-23 00:01 被阅读0次

任务链是Flink的一种优化技术。
目的:减少本地通信的开销。
使用任务链优化的条件:
1)两个或多个算子设为相同的并行度。
2)通过本地转发(local forward)的方式进行连接

如图:相同并行度的one-to-one操作,(key aggregation和sink算子)形成一个task,原来的算子成为了这个整体的子任务subTask.

条件两者缺一不可:
并行度相同
one-to-one操作

没有关闭任务链合并时
package com.atguigu.testChaining

import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

object StreamWordCount {
  def main(args: Array[String]): Unit = {
    //从外部命令中获取参数
    val params: ParameterTool = ParameterTool.fromArgs(args)

    val host: String = params.get("host")
    val port: Int = params.getInt("port")

    //创建流处理环境
    val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment



    //接收socket文本流
    val textDstream: DataStream[String] = environment.socketTextStream(host,port)

    //flatMap和Map需要引用的隐式转换
    import org.apache.flink.api.scala._
    val dataStream: DataStream[(String, Int)] = textDstream.flatMap(_.split("\\s"))
      .filter(_.nonEmpty)
      .map((_,1)).keyBy(0).sum(1)

    dataStream.print().setParallelism(1)

    //启动executor,执行任务
    environment.execute("Socket stream word count")

  }

}

没有关闭任务链合并时
关闭整个任务过程中任务链的合并过程 disableOperatorChaining

    //创建流处理环境
    val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    // 关闭整个任务过程中任务链的合并过程
    environment.disableOperatorChaining()

关闭整个任务过程中任务链的合并过程

从图中可知,整个过程没有任务链合并,每个算子都是一个整体。

关闭某个算子得任务链的合并过程 disableChaining

关闭filter算子的合并任务链


    //flatMap和Map需要引用的隐式转换
    import org.apache.flink.api.scala._
    val dataStream: DataStream[(String, Int)] = textDstream.flatMap(_.split("\\s"))
      .filter(_.nonEmpty).disableChaining()
      .map((_,1)).keyBy(0).sum(1)
关闭filter算子的任务链合并
startNewChain 前后切开成两个任务
    //flatMap和Map需要引用的隐式转换
    import org.apache.flink.api.scala._
    val dataStream: DataStream[(String, Int)] = textDstream.flatMap(_.split("\\s"))
      .filter(_.nonEmpty).startNewChain()
      .map((_,1)).keyBy(0).sum(1)

startNewChain

map到keyby()一定会重分区。

相关文章

网友评论

      本文标题:任务链(Operator Chains)

      本文链接:https://www.haomeiwen.com/subject/dzznsctx.html