美文网首页
20171103周报

20171103周报

作者: Bill_Lin | 来源:发表于2017-11-03 08:55 被阅读23次

    本周工作内容:

    • 继续研究下FLink transfer latency,主要是学习相关的生产者消费者通信机制和数据传输方式,了解相关接口来编写测试topology。
    • 按项目的要求重新部署了集群环境,期间浪费了不少时间,不熟练
    • 了解项目功能模块,尝试编写测试样例
    • 整理下掌握的FLink总体架构与知识点

    下面对FLink进程间通信与数据传输所形成的执行图进行一个分析。
    FLink的执行图,分为四层:StreamGraph -> JobGraph -> ExecutionGraph -> 物理执行图。

    • StreamGraph:是根据用户通过 Stream API 编写的代码生成的最初的图。用来表示程序的拓扑结构。
    • JobGraph:StreamGraph经过优化后生成了 JobGraph,提交给 JobManager 的数据结构。主要的优化为,将多个符合条件的节点 chain 在一起作为一个节点,这样可以减少数据在节点之间流动所需要的序列化/反序列化/传输消耗。
    • ExecutionGraph:JobManager 根据 JobGraph 生成ExecutionGraph。ExecutionGraph是JobGraph的并行化版本,是调度层最核心的数据结构。
    • 物理执行图:JobManager 根据 ExecutionGraph 对 Job 进行调度后,在各个TaskManager 上部署 Task 后形成的“图”,并不是一个具体的数据结构。用SocketTextStreamWordCount举例,四层执行图的演变过程如下图所示。
    FLink执行图

    最后涉及到底层代码实现的物理执行图,有两个核心的概念ResultPartition和InputGate,下面分别进行分析。

    ResultPartition

    在最后演变出来的物理图中可以看到,FLink的Runtime中,使用结果分区(ResultPartition)来表示任务的子任务实例所生产的数据,在执行图中等价于jobmanager的IntermediateResultPartition。IntermediateResultPartition主要用于JobManager组织作业图的一种逻辑数据结构,ResultPartition是运行时的一种逻辑概念,两者处于不同的层面。任何想消费ResultPartition的任务,最终都是请求ResultPartition的某个ResultSubPartition。而请求要么是同一TaskManager中的本地请求,要么是来自另外一个TaskManager中的消费子任务实例发起的远程请求。

    其中,结果分区编号(ResultPartitionID)用来标识ResultPartition。ResultPartitionID关联着IntermediateResultPartitionID(也即调度时的分区编号)和ExecutionAttemptID(部署时的生产者子任务实例编号)。在任务失败时,单靠IntermediateResultPartitionID无法鉴别ResultPartition,必须结合ExecutionAttemptID一起鉴别。
    结果分区编号(ResultPartitionID)用来标识ResultPartition。在任务失败时,单靠IntermediateResultPartitionID无法鉴别ResultPartition,必须结合ExecutionAttemptID一起鉴别。

    InputGate

    一个InputGate包含多个输入通道(InputChannel),输入通道用于请求ResultSubpartitionView,并从中消费数据。

    所谓的ResultSubpartitionView是由ResultSubpartition所创建的用于供消费者任务消费数据的视图对象。
    对于每个InputChannel,消费的生命周期会经历如下的方法调用过程:

    requestSubpartition:请求ResultSubpartition;
    getNextBuffer:获得下一个Buffer;
    releaseAllResources:释放所有的相关资源;
    以getNextBufferOrEvent方法为主线来分析SingleInputGate类。SingleInputGate是消费ResultPartition的实体.

    public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException {
        //如果已接收到所有EndOfPartitionEvent事件,则说明每个ResultSubpartition中的数据都被消费完成
        if (hasReceivedAllEndOfPartitionEvents) {
            return null;
        }
    
        //触发所有的输入通道向ResultSubpartition发起请求
        requestPartitions();
    
        InputChannel currentChannel = null;
        //阻塞并循环等待有可获取数据的通道可用
        while (currentChannel == null) {
            if (isReleased) {
                throw new IllegalStateException("Released");
            }
    
            //从阻塞队列中请求队首的输入通道,阻塞两秒钟,如果没有获取到则不断请求,直到获取到一个输入通道位置
            currentChannel = inputChannelsWithData.poll(2, TimeUnit.SECONDS);
        }
    
        //从输入通道中获得下一个Buffer
        final Buffer buffer = currentChannel.getNextBuffer();
    
        if (buffer == null) {
            throw new IllegalStateException("Bug in input gate/channel logic: input gate got " +
                "notified by channel about available data, but none was available.");
        }
    
        //如果该Buffer是用户数据,则构建BufferOrEvent对象并返回
        if (buffer.isBuffer()) {
            return new BufferOrEvent(buffer, currentChannel.getChannelIndex());
        }
        //否则把它当作事件来处理
        else {
            final AbstractEvent event = EventSerializer.fromBuffer(buffer, getClass().getClassLoader());
    
            //如果获取到的是标识某ResultSubpartition已经生产完数据的事件
            if (event.getClass() == EndOfPartitionEvent.class) {
                //对获取该ResultSubpartition的通道进行标记
                channelsWithEndOfPartitionEvents.set(currentChannel.getChannelIndex());
                //如果所有信道都被标记了,置全部通道获取数据完成
                if (channelsWithEndOfPartitionEvents.cardinality() == numberOfInputChannels) {
                    hasReceivedAllEndOfPartitionEvents = true;
                }
                //对外发出ResultSubpartition已被消费的通知同时释放资源
                currentChannel.notifySubpartitionConsumed();
                currentChannel.releaseAllResources();
            }
            //以事件来构建BufferOrEvent对象
            return new BufferOrEvent(event, currentChannel.getChannelIndex());
        }
    }
    

    InputChannel根据ResultPartitionLocation提供了三种实现:

    LocalInputChannel:用于请求同实例中生产者任务所生产的ResultSubpartitionView的输入通道;
    RemoteInputChannel:用于请求远程生产者任务所生产的ResultSubpartitionView的输入通道;
    UnknownInputChannel:一种用于占位目的的输入通道,需要占位通道是因为暂未确定相对于生产者任务位置,但最终要么被替换为RemoteInputChannel,要么被替换为LocalInputChannel。

    小结:结果子分区中存有真正的数据,会根据结果分区来被消费端消费,不同位置的消费方式不一样,有LOCAL、REMOTE和UNKOWN三种。管理结果分区的是结果分区管理器ResultPartitionManager,一个NetworkEnvironment下只有一个。

    相关文章

      网友评论

          本文标题:20171103周报

          本文链接:https://www.haomeiwen.com/subject/vgpapxtx.html