美文网首页
flink 计算资源 solt、Operator Chains、

flink 计算资源 solt、Operator Chains、

作者: 邵红晓 | 来源:发表于2019-10-11 14:46 被阅读0次

    Chains 概念

    • Flink会尽可能地将operator的subtask链接(chain)在一起形成一个task pipline。每个task pipline在一个线程中执行
      优点:它能减少线程之间的切换,减少消息的序列化/反序列化,减少数据在缓冲区的交换,减少了延迟的同时提高整体的吞吐量。
      image.png
    • isChainable条件,满足以下所有条件才能进行chain优化,从而chain到一个JobVertex中
      StreamingJobGraphGenerator#createJobGraph源码中实现了streamGraph->jobGraph转换
      JobGraph 的关键在于将多个 StreamNode 优化为一个 JobVertex, 对应的 StreamEdge 则转化为 JobEdge, 并且 JobVertex 和 JobEdge 之间通过 IntermediateDataSet (中间数据集)形成一个生产者和消费者的连接关系。
    1. 上下游的并行度一致
    2. 下游节点的入度为1 (也就是说下游节点没有来自其他节点的输入)
    3. 上下游节点都在同一个 slot group 中(下面会解释 slot group)
    4. 下游节点的 chain 策略为 ALWAYS(可以与上下游链接,map、flatmap、filter等默认是ALWAYS)
    5. 上游节点的 chain 策略为 ALWAYS 或 HEAD(只能与下游链接,不能与上游链接,Source默认是HEAD)
    6. 两个节点间数据分区方式是 forward(该分区器将记录转发给在本地运行的下游的(归属于subtask)的operattion)
    7. 用户没有禁用 chain
    • flink 分区器:
    1. GlobalPartitioner,全局分区器,默认选择了索引为0的channel进行输出,数据倾斜。
    2. ForwardPartitioner,该分区器将记录转发给在本地运行的下游的(归属于subtask)的operator
    3. ShufflePartitioner,该分区器会在所有output channel中选择一个随机的进行输出。
    public class ShufflePartitioner<T> extends StreamPartitioner<T> 
    @Override
        public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
                int numberOfOutputChannels) {
            returnArray[0] = random.nextInt(numberOfOutputChannels);
            return returnArray;
        }
    
    1. HashPartitioner
      hash分区器,该分区器对key进行hash后计算得到channel索引。它通过构造器获得KeySelector的实例(该实例用来获取当前记录的key)。
    2. BroadcastPartitioner
      广播分区器,用于将该记录广播给下游的所有的subtask
    3. RebalancePartitioner
      重平衡分区器,正儿八经的解决数据倾斜的神器,所有数据都会采用被均衡的通过轮询的方式分配给所有下游channel
    4. RescalePartitioner
      根据平行度对数据进行分区,数据回被平行1分2给下游channel,不存在轮询round-robin

    task manager solt概念

    • solt 是用来对taskmanager内存进行平均分配的,每个solt内存都相同,加起来和等于taskmanager可用内存,但是仅仅对内存做了隔离,并没有对cpu进行隔离
    • 一个task manager 可有多个solt,yarn模式下可以使用 -ys,--yarnslots <arg> Number of slots per TaskManager指定
    • 每个 slot 都能跑由多个连续 task 组成的一个 pipeline


      image.png

    槽位共享组 SlotSharingGroup 与 CoLocationGroup(迭代流使用)

    • 默认情况下,Flink 允许subtasks共享slot,条件是它们都来自同一个Job的不同operator的子任务。结果可能一个slot持有该job的整个pipeline
    • 在同一个JVM进程中所有的task solt,可以共享TCP连接(基于多路复用)和心跳消息,可以减少数据的网络传输。也能共享一些数据结构,一定程度上减少了每个task的消耗。
      image.png
      解释:我们将 WordCount 的并行度从之前的2个增加到6个(Source并行度仍为1),并开启slot共享(所有operator都在default共享组),将得到如上图所示的slot分布图。首先,我们不用去计算这个job会其多少个task,总之该任务最终会占用6个slots(最高并行度为6)。其次,我们可以看到 io密集型操作 keyAggregation/sink 被平均地分配到各个 TaskManager。
    • SlotSharingGroup是Flink中用来实现slot共享的类,它尽可能地让subtasks共享一个slot。相应的,还有一个 CoLocationGroup类用来强制将 subtasks 放到同一个 slot 中。CoLocationGroup主要用于迭代流中,用来保证迭代头与迭代尾的第i个subtask能被调度到同一个TaskManager上。这里我们不会详细讨论CoLocationGroup的实现细节。

    flink计算资源管理

    • 请对照源码看图
    • flink 在ResourceManager 已经实现了动态资源管理,ResourceManager的内部类ResourceActionsImpl,可以做到allocateResource,releaseResource动态资源管理,比如yarn中这两个方法的具体实现就涉及到启动新的 container 和释放已经申请的 container
    private class ResourceActionsImpl implements ResourceActions {
    
            @Override
            public void releaseResource(InstanceID instanceId, Exception cause) {
                validateRunsInMainThread();
    
                ResourceManager.this.releaseResource(instanceId, cause);
            }
    
            @Override
            public Collection<ResourceProfile> allocateResource(ResourceProfile resourceProfile) {
                validateRunsInMainThread();
                return startNewWorker(resourceProfile);
            }
    
            @Override
            public void notifyAllocationFailure(JobID jobId, AllocationID allocationId, Exception cause) {
                validateRunsInMainThread();
    
                JobManagerRegistration jobManagerRegistration = jobManagerRegistrations.get(jobId);
                if (jobManagerRegistration != null) {
                    jobManagerRegistration.getJobManagerGateway().notifyAllocationFailure(allocationId, cause);
                }
            }
        }
    
    • flink 偏好位置信息生成依据
      flink source task 所在节点,并作为consumer task的偏好位置,consumer task优先调度到source task所在节点
      gets the location preferences of the vertex's current task execution, as determined by the locationsof the predecessors from which it receives input data.
      ExecutionVertex.java
    /**
         * Gets the location preferences of the vertex's current task execution, as determined by the locations
         * of the predecessors from which it receives input data.
         * If there are more than MAX_DISTINCT_LOCATIONS_TO_CONSIDER different locations of source data, this
         * method returns {@code null} to indicate no location preference.
         *
         * @return The preferred locations based in input streams, or an empty iterable,
         *         if there is no input-based preference.
         */
        public Collection<CompletableFuture<TaskManagerLocation>> getPreferredLocationsBasedOnInputs() {
            // otherwise, base the preferred locations on the input connections
            if (inputEdges == null) {
                return Collections.emptySet();
            }
            else {
                Set<CompletableFuture<TaskManagerLocation>> locations = new HashSet<>(getTotalNumberOfParallelSubtasks());
                Set<CompletableFuture<TaskManagerLocation>> inputLocations = new HashSet<>(getTotalNumberOfParallelSubtasks());
    
                // go over all inputs
                for (int i = 0; i < inputEdges.length; i++) {
                    inputLocations.clear();
                    ExecutionEdge[] sources = inputEdges[i];
                    if (sources != null) {
                        // go over all input sources
                        for (int k = 0; k < sources.length; k++) {
                            // look-up assigned slot of input source
                            CompletableFuture<TaskManagerLocation> locationFuture = sources[k].getSource().getProducer().getCurrentTaskManagerLocationFuture();
                            // add input location
                            inputLocations.add(locationFuture);
                            // inputs which have too many distinct sources are not considered
                            if (inputLocations.size() > MAX_DISTINCT_LOCATIONS_TO_CONSIDER) {
                                inputLocations.clear();
                                break;
                            }
                        }
                    }
                    // keep the locations of the input with the least preferred locations
                    if (locations.isEmpty() || // nothing assigned yet
                            (!inputLocations.isEmpty() && inputLocations.size() < locations.size())) {
                        // current input has fewer preferred locations
                        locations.clear();
                        locations.addAll(inputLocations);
                    }
                }
    
                return locations.isEmpty() ? Collections.emptyList() : locations;
            }
        }
    
    image.png

    总结:

    1. 建议将 Number of slots per TaskManager 数设置为operator中最高并行度
    2. flink默认开启solt共享可以充分利用cpu和内存资源
      参考
      https://blog.jrwang.me/2019/flink-source-code-jobgraph/
      http://chenyuzhao.me/2017/02/09/flink-scheduler/
      https://blog.jrwang.me/2019/flink-source-code-resource-manager/#scheduler-%E5%92%8C-slotsharingmanager

    相关文章

      网友评论

          本文标题:flink 计算资源 solt、Operator Chains、

          本文链接:https://www.haomeiwen.com/subject/zbmtmctx.html