Chains 概念
- Flink会尽可能地将operator的subtask链接(chain)在一起形成一个task pipline。每个task pipline在一个线程中执行
优点:它能减少线程之间的切换,减少消息的序列化/反序列化,减少数据在缓冲区的交换,减少了延迟的同时提高整体的吞吐量。
image.png - isChainable条件,满足以下所有条件才能进行chain优化,从而chain到一个JobVertex中
StreamingJobGraphGenerator#createJobGraph
源码中实现了streamGraph->jobGraph转换
JobGraph 的关键在于将多个 StreamNode 优化为一个 JobVertex, 对应的 StreamEdge 则转化为 JobEdge, 并且 JobVertex 和 JobEdge 之间通过 IntermediateDataSet (中间数据集)形成一个生产者和消费者的连接关系。
- 上下游的并行度一致
- 下游节点的入度为1 (也就是说下游节点没有来自其他节点的输入)
- 上下游节点都在同一个 slot group 中(下面会解释 slot group)
- 下游节点的 chain 策略为 ALWAYS(可以与上下游链接,map、flatmap、filter等默认是ALWAYS)
- 上游节点的 chain 策略为 ALWAYS 或 HEAD(只能与下游链接,不能与上游链接,Source默认是HEAD)
- 两个节点间数据分区方式是 forward(该分区器将记录转发给在本地运行的下游的(归属于subtask)的operattion)
- 用户没有禁用 chain
- flink 分区器:
- GlobalPartitioner,全局分区器,默认选择了索引为0的channel进行输出,数据倾斜。
- ForwardPartitioner,该分区器将记录转发给在本地运行的下游的(归属于subtask)的operator
- ShufflePartitioner,该分区器会在所有output channel中选择一个随机的进行输出。
public class ShufflePartitioner<T> extends StreamPartitioner<T>
@Override
public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
int numberOfOutputChannels) {
returnArray[0] = random.nextInt(numberOfOutputChannels);
return returnArray;
}
- HashPartitioner
hash分区器,该分区器对key进行hash后计算得到channel索引。它通过构造器获得KeySelector的实例(该实例用来获取当前记录的key)。 - BroadcastPartitioner
广播分区器,用于将该记录广播给下游的所有的subtask - RebalancePartitioner
重平衡分区器,正儿八经的解决数据倾斜的神器,所有数据都会采用被均衡的通过轮询的方式分配给所有下游channel - RescalePartitioner
根据平行度对数据进行分区,数据回被平行1分2给下游channel,不存在轮询round-robin
task manager solt概念
- solt 是用来对taskmanager内存进行平均分配的,每个solt内存都相同,加起来和等于taskmanager可用内存,但是仅仅对内存做了隔离,并没有对cpu进行隔离
- 一个task manager 可有多个solt,yarn模式下可以使用
-ys,--yarnslots <arg> Number of slots per TaskManager
指定 -
每个 slot 都能跑由多个连续 task 组成的一个 pipeline
image.png
槽位共享组 SlotSharingGroup 与 CoLocationGroup(迭代流使用)
- 默认情况下,Flink 允许subtasks共享slot,条件是它们都来自同一个Job的不同operator的子任务。结果可能一个slot持有该job的整个pipeline
- 在同一个JVM进程中所有的task solt,可以共享TCP连接(基于多路复用)和心跳消息,可以减少数据的网络传输。也能共享一些数据结构,一定程度上减少了每个task的消耗。
image.png
解释:我们将 WordCount 的并行度从之前的2个增加到6个(Source并行度仍为1),并开启slot共享(所有operator都在default共享组),将得到如上图所示的slot分布图。首先,我们不用去计算这个job会其多少个task,总之该任务最终会占用6个slots(最高并行度为6)。其次,我们可以看到 io密集型操作 keyAggregation/sink 被平均地分配到各个 TaskManager。 -
SlotSharingGroup
是Flink中用来实现slot共享的类,它尽可能地让subtasks共享一个slot。相应的,还有一个CoLocationGroup
类用来强制将 subtasks 放到同一个 slot 中。CoLocationGroup
主要用于迭代流中,用来保证迭代头与迭代尾的第i个subtask能被调度到同一个TaskManager上。这里我们不会详细讨论CoLocationGroup
的实现细节。
flink计算资源管理
- 请对照源码看图
- flink 在ResourceManager 已经实现了动态资源管理,ResourceManager的内部类ResourceActionsImpl,可以做到allocateResource,releaseResource动态资源管理,比如yarn中这两个方法的具体实现就涉及到启动新的 container 和释放已经申请的 container
private class ResourceActionsImpl implements ResourceActions {
@Override
public void releaseResource(InstanceID instanceId, Exception cause) {
validateRunsInMainThread();
ResourceManager.this.releaseResource(instanceId, cause);
}
@Override
public Collection<ResourceProfile> allocateResource(ResourceProfile resourceProfile) {
validateRunsInMainThread();
return startNewWorker(resourceProfile);
}
@Override
public void notifyAllocationFailure(JobID jobId, AllocationID allocationId, Exception cause) {
validateRunsInMainThread();
JobManagerRegistration jobManagerRegistration = jobManagerRegistrations.get(jobId);
if (jobManagerRegistration != null) {
jobManagerRegistration.getJobManagerGateway().notifyAllocationFailure(allocationId, cause);
}
}
}
- flink 偏好位置信息生成依据
flink source task 所在节点,并作为consumer task的偏好位置,consumer task优先调度到source task所在节点
gets the location preferences of the vertex's current task execution, as determined by the locationsof the predecessors from which it receives input data.
ExecutionVertex.java
/**
* Gets the location preferences of the vertex's current task execution, as determined by the locations
* of the predecessors from which it receives input data.
* If there are more than MAX_DISTINCT_LOCATIONS_TO_CONSIDER different locations of source data, this
* method returns {@code null} to indicate no location preference.
*
* @return The preferred locations based in input streams, or an empty iterable,
* if there is no input-based preference.
*/
public Collection<CompletableFuture<TaskManagerLocation>> getPreferredLocationsBasedOnInputs() {
// otherwise, base the preferred locations on the input connections
if (inputEdges == null) {
return Collections.emptySet();
}
else {
Set<CompletableFuture<TaskManagerLocation>> locations = new HashSet<>(getTotalNumberOfParallelSubtasks());
Set<CompletableFuture<TaskManagerLocation>> inputLocations = new HashSet<>(getTotalNumberOfParallelSubtasks());
// go over all inputs
for (int i = 0; i < inputEdges.length; i++) {
inputLocations.clear();
ExecutionEdge[] sources = inputEdges[i];
if (sources != null) {
// go over all input sources
for (int k = 0; k < sources.length; k++) {
// look-up assigned slot of input source
CompletableFuture<TaskManagerLocation> locationFuture = sources[k].getSource().getProducer().getCurrentTaskManagerLocationFuture();
// add input location
inputLocations.add(locationFuture);
// inputs which have too many distinct sources are not considered
if (inputLocations.size() > MAX_DISTINCT_LOCATIONS_TO_CONSIDER) {
inputLocations.clear();
break;
}
}
}
// keep the locations of the input with the least preferred locations
if (locations.isEmpty() || // nothing assigned yet
(!inputLocations.isEmpty() && inputLocations.size() < locations.size())) {
// current input has fewer preferred locations
locations.clear();
locations.addAll(inputLocations);
}
}
return locations.isEmpty() ? Collections.emptyList() : locations;
}
}
image.png
总结:
- 建议将 Number of slots per TaskManager 数设置为operator中最高并行度
- flink默认开启solt共享可以充分利用cpu和内存资源
参考
https://blog.jrwang.me/2019/flink-source-code-jobgraph/
http://chenyuzhao.me/2017/02/09/flink-scheduler/
https://blog.jrwang.me/2019/flink-source-code-resource-manager/#scheduler-%E5%92%8C-slotsharingmanager
网友评论