
任务链是Flink的一种优化技术。
目的:减少本地通信的开销。
使用任务链优化的条件:
1)两个或多个算子设为相同的并行度。
2)通过本地转发(local forward)的方式进行连接
如图:相同并行度的one-to-one操作,(key aggregation和sink算子)形成一个task,原来的算子成为了这个整体的子任务subTask.
条件两者缺一不可:
并行度相同
one-to-one操作
没有关闭任务链合并时
package com.atguigu.testChaining
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
object StreamWordCount {
def main(args: Array[String]): Unit = {
//从外部命令中获取参数
val params: ParameterTool = ParameterTool.fromArgs(args)
val host: String = params.get("host")
val port: Int = params.getInt("port")
//创建流处理环境
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//接收socket文本流
val textDstream: DataStream[String] = environment.socketTextStream(host,port)
//flatMap和Map需要引用的隐式转换
import org.apache.flink.api.scala._
val dataStream: DataStream[(String, Int)] = textDstream.flatMap(_.split("\\s"))
.filter(_.nonEmpty)
.map((_,1)).keyBy(0).sum(1)
dataStream.print().setParallelism(1)
//启动executor,执行任务
environment.execute("Socket stream word count")
}
}

关闭整个任务过程中任务链的合并过程 disableOperatorChaining
//创建流处理环境
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// 关闭整个任务过程中任务链的合并过程
environment.disableOperatorChaining()

从图中可知,整个过程没有任务链合并,每个算子都是一个整体。
关闭某个算子得任务链的合并过程 disableChaining
关闭filter算子的合并任务链
//flatMap和Map需要引用的隐式转换
import org.apache.flink.api.scala._
val dataStream: DataStream[(String, Int)] = textDstream.flatMap(_.split("\\s"))
.filter(_.nonEmpty).disableChaining()
.map((_,1)).keyBy(0).sum(1)

startNewChain 前后切开成两个任务
//flatMap和Map需要引用的隐式转换
import org.apache.flink.api.scala._
val dataStream: DataStream[(String, Int)] = textDstream.flatMap(_.split("\\s"))
.filter(_.nonEmpty).startNewChain()
.map((_,1)).keyBy(0).sum(1)

map到keyby()一定会重分区。
网友评论