前言
这篇文章其实是好几个月之前发过的。今天很忙,可以预见没空写新东西了,但是又不想就这么跳过去,于是从近一段时间写的文章里挑一篇阅读量最低的,重新贴一次吧。
答案放在前面
将YARN配置项中的yarn.scheduler.fair.assignmultiple
参数设为false,或者手动设定yarn.scheduler.fair.max.assign
参数的值为一个较小的正数(如3或4)。
提出问题
我们在有20个节点(之前的规模,现在是40个节点)的专用集群上运行所有Flink流式作业。为与其他大数据集群保持一致,采用的框架版本为CDH 5.13.3自带的Hadoop 2.6.0。
在开发过程中发现,Flink on YARN作业的TaskManager经常分布不均匀,集中在少量节点上。如启动了10个TaskManager的作业,可能只有3个节点分配有Container,并且分配比例是7:2:1。久而久之使得各节点的负载很倾斜,不利于平稳运行。
分析问题
以生产环境下通用的公平调度器(FairScheduler)为例,从源码入手,简单看看YARN到底是如何分配Container的。
随NodeManager心跳分配
NodeManager每隔yarn.resourcemanager.nodemanagers.heartbeat-interval-ms
参数规定的时间(社区版默认为1秒,CDH版默认为100毫秒)向ResourceManager发送心跳事件NodeUpdateSchedulerEvent。FairScheduler会处理这个事件。
@Override
public void handle(SchedulerEvent event) {
switch (event.getType()) {
// ...
case NODE_UPDATE:
if (!(event instanceof NodeUpdateSchedulerEvent)) {
throw new RuntimeException("Unexpected event type: " + event);
}
NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
nodeUpdate(nodeUpdatedEvent.getRMNode());
break;
// ...
}
}
在nodeUpdate()方法中,会看到从NodeManager拉取并处理Container的更新信息,包含新创建的Container与已经完成的Container,然后调用attemptScheduling()方法尝试调度分配Container。
private synchronized void nodeUpdate(RMNode nm) {
long start = getClock().getTime();
if (LOG.isDebugEnabled()) {
LOG.debug("nodeUpdate: " + nm + " cluster capacity: " + clusterResource);
}
eventLog.log("HEARTBEAT", nm.getHostName());
FSSchedulerNode node = getFSSchedulerNode(nm.getNodeID());
List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
for(UpdatedContainerInfo containerInfo : containerInfoList) {
newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers());
completedContainers.addAll(containerInfo.getCompletedContainers());
}
// Processing the newly launched containers
for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
containerLaunchedOnNode(launchedContainer.getContainerId(), node);
}
// Process completed containers
for (ContainerStatus completedContainer : completedContainers) {
ContainerId containerId = completedContainer.getContainerId();
LOG.debug("Container FINISHED: " + containerId);
completedContainer(getRMContainer(containerId),
completedContainer, RMContainerEventType.FINISHED);
}
if (continuousSchedulingEnabled) {
if (!completedContainers.isEmpty()) {
attemptScheduling(node);
}
} else {
attemptScheduling(node);
}
long duration = getClock().getTime() - start;
fsOpDurations.addNodeUpdateDuration(duration);
}
可见,如果同时启用了后台线程分配(continuousSchedulingEnabled),那么随心跳分配就只有在已经完成的Container列表不为空时才会有效。下面来看看后台线程分配。
后台线程分配
如果yarn.scheduler.fair.continuous-scheduling-enabled
参数为true(默认true),就会启动一个后台线程,以yarn.scheduler.fair.continuous-scheduling-sleep-ms
参数的间隔(5毫秒)来分配。部分代码如下。
private class ContinuousSchedulingThread extends Thread {
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
continuousSchedulingAttempt();
Thread.sleep(getContinuousSchedulingSleepMs());
} catch (InterruptedException e) {
LOG.warn("Continuous scheduling thread interrupted. Exiting.", e);
return;
}
}
}
}
void continuousSchedulingAttempt() throws InterruptedException {
long start = getClock().getTime();
List<NodeId> nodeIdList = new ArrayList<NodeId>(nodes.keySet());
// Sort the nodes by space available on them, so that we offer
// containers on emptier nodes first, facilitating an even spread. This
// requires holding the scheduler lock, so that the space available on a
// node doesn't change during the sort.
synchronized (this) {
Collections.sort(nodeIdList, nodeAvailableResourceComparator);
}
// iterate all nodes
for (NodeId nodeId : nodeIdList) {
FSSchedulerNode node = getFSSchedulerNode(nodeId);
try {
if (node != null && Resources.fitsIn(minimumAllocation,
node.getAvailableResource())) {
attemptScheduling(node);
}
} catch (Throwable ex) {
LOG.error("Error while attempting scheduling for node " + node +
": " + ex.toString(), ex);
if ((ex instanceof YarnRuntimeException) &&
(ex.getCause() instanceof InterruptedException)) {
// AsyncDispatcher translates InterruptedException to
// YarnRuntimeException with cause InterruptedException.
// Need to throw InterruptedException to stop schedulingThread.
throw (InterruptedException)ex.getCause();
}
}
}
long duration = getClock().getTime() - start;
fsOpDurations.addContinuousSchedulingRunDuration(duration);
}
在continuousSchedulingAttempt()方法中,会先把所有节点按照可用资源的量排序,使得Container可以优先分配到比较空的节点上去。然后遍历排好序的节点,当节点的资源量满足Container的需求时,调用attemptScheduling()方法尝试调度分配Container。
调度分配Container
@VisibleForTesting
synchronized void attemptScheduling(FSSchedulerNode node) {
if (rmContext.isWorkPreservingRecoveryEnabled()
&& !rmContext.isSchedulerReadyForAllocatingContainers()) {
return;
}
// Assign new containers...
// 1. Check for reserved applications
// 2. Schedule if there are no reservations
FSAppAttempt reservedAppSchedulable = node.getReservedAppSchedulable();
if (reservedAppSchedulable != null) {
Priority reservedPriority = node.getReservedContainer().getReservedPriority();
if (!reservedAppSchedulable.hasContainerForNode(reservedPriority, node)) {
// Don't hold the reservation if app can no longer use it
LOG.info("Releasing reservation that cannot be satisfied for application "
+ reservedAppSchedulable.getApplicationAttemptId()
+ " on node " + node);
reservedAppSchedulable.unreserve(reservedPriority, node);
reservedAppSchedulable = null;
} else {
// Reservation exists; try to fulfill the reservation
if (LOG.isDebugEnabled()) {
LOG.debug("Trying to fulfill reservation for application "
+ reservedAppSchedulable.getApplicationAttemptId()
+ " on node: " + node);
}
node.getReservedAppSchedulable().assignReservedContainer(node);
}
}
if (reservedAppSchedulable == null) {
// No reservation, schedule at queue which is farthest below fair share
int assignedContainers = 0;
while (node.getReservedContainer() == null) {
boolean assignedContainer = false;
if (!queueMgr.getRootQueue().assignContainer(node).equals(
Resources.none())) {
assignedContainers++;
assignedContainer = true;
}
if (!assignedContainer) { break; }
if (!assignMultiple) { break; }
if ((assignedContainers >= maxAssign) && (maxAssign > 0)) { break; }
}
}
updateRootQueueMetrics();
}
该方法首先检查当前是否有为本Application预留的Container资源,如果有且在当前节点上可调度,就直接分配之。如果没有预留资源,就尝试在YARN队列中分配Container。注意最后的那个while循环,可以得知:
- 如果assignMultiple(对应
yarn.scheduler.fair.assignmultiple
参数)为true,那么在成功分配一个Container后不会停止,继续尝试在当前节点上分配; - 在上一条的条件下,最多会连续分配maxAssign(对应
yarn.scheduler.fair.max.assign
参数)个Container后停止。
Hadoop默认的yarn.scheduler.fair.assignmultiple
为false,亦即一次调度只分配一个Container。但是CDH将这个参数默认设为了true,并且yarn.scheduler.fair.max.assign
默认为-1,表示不限制,所以会导致一次调度在单个节点上分配较多的Container。
CDH的帮助文档表示该选项有助于在大量小任务的情况下增加集群吞吐量,但很显然对于我们的情境是适得其反,故果断将yarn.scheduler.fair.assignmultiple
设为false。经过实测,Container分配过于集中的问题不复存在,且性能没有受到影响。
如果确实担心有潜在影响的话,可以仍然将上一个参数保持为true,但是将yarn.scheduler.fair.max.assign
设为较小的值。Cloudera Manager界面中未提供这个参数的显式设定方法,所以我们得手动将其写入yarn-site.xml的高级配置(YARN Service Advanced Configuration Snippet (Safety Valve) for yarn-site.xml)字段中。
解决问题
民那晚安晚安。
网友评论