美文网首页
20171103周报

20171103周报

作者: Bill_Lin | 来源:发表于2017-11-03 08:55 被阅读23次

本周工作内容:

  • 继续研究下FLink transfer latency,主要是学习相关的生产者消费者通信机制和数据传输方式,了解相关接口来编写测试topology。
  • 按项目的要求重新部署了集群环境,期间浪费了不少时间,不熟练
  • 了解项目功能模块,尝试编写测试样例
  • 整理下掌握的FLink总体架构与知识点

下面对FLink进程间通信与数据传输所形成的执行图进行一个分析。
FLink的执行图,分为四层:StreamGraph -> JobGraph -> ExecutionGraph -> 物理执行图。

  • StreamGraph:是根据用户通过 Stream API 编写的代码生成的最初的图。用来表示程序的拓扑结构。
  • JobGraph:StreamGraph经过优化后生成了 JobGraph,提交给 JobManager 的数据结构。主要的优化为,将多个符合条件的节点 chain 在一起作为一个节点,这样可以减少数据在节点之间流动所需要的序列化/反序列化/传输消耗。
  • ExecutionGraph:JobManager 根据 JobGraph 生成ExecutionGraph。ExecutionGraph是JobGraph的并行化版本,是调度层最核心的数据结构。
  • 物理执行图:JobManager 根据 ExecutionGraph 对 Job 进行调度后,在各个TaskManager 上部署 Task 后形成的“图”,并不是一个具体的数据结构。用SocketTextStreamWordCount举例,四层执行图的演变过程如下图所示。
FLink执行图

最后涉及到底层代码实现的物理执行图,有两个核心的概念ResultPartition和InputGate,下面分别进行分析。

ResultPartition

在最后演变出来的物理图中可以看到,FLink的Runtime中,使用结果分区(ResultPartition)来表示任务的子任务实例所生产的数据,在执行图中等价于jobmanager的IntermediateResultPartition。IntermediateResultPartition主要用于JobManager组织作业图的一种逻辑数据结构,ResultPartition是运行时的一种逻辑概念,两者处于不同的层面。任何想消费ResultPartition的任务,最终都是请求ResultPartition的某个ResultSubPartition。而请求要么是同一TaskManager中的本地请求,要么是来自另外一个TaskManager中的消费子任务实例发起的远程请求。

其中,结果分区编号(ResultPartitionID)用来标识ResultPartition。ResultPartitionID关联着IntermediateResultPartitionID(也即调度时的分区编号)和ExecutionAttemptID(部署时的生产者子任务实例编号)。在任务失败时,单靠IntermediateResultPartitionID无法鉴别ResultPartition,必须结合ExecutionAttemptID一起鉴别。
结果分区编号(ResultPartitionID)用来标识ResultPartition。在任务失败时,单靠IntermediateResultPartitionID无法鉴别ResultPartition,必须结合ExecutionAttemptID一起鉴别。

InputGate

一个InputGate包含多个输入通道(InputChannel),输入通道用于请求ResultSubpartitionView,并从中消费数据。

所谓的ResultSubpartitionView是由ResultSubpartition所创建的用于供消费者任务消费数据的视图对象。
对于每个InputChannel,消费的生命周期会经历如下的方法调用过程:

requestSubpartition:请求ResultSubpartition;
getNextBuffer:获得下一个Buffer;
releaseAllResources:释放所有的相关资源;
以getNextBufferOrEvent方法为主线来分析SingleInputGate类。SingleInputGate是消费ResultPartition的实体.

public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException {
    //如果已接收到所有EndOfPartitionEvent事件,则说明每个ResultSubpartition中的数据都被消费完成
    if (hasReceivedAllEndOfPartitionEvents) {
        return null;
    }

    //触发所有的输入通道向ResultSubpartition发起请求
    requestPartitions();

    InputChannel currentChannel = null;
    //阻塞并循环等待有可获取数据的通道可用
    while (currentChannel == null) {
        if (isReleased) {
            throw new IllegalStateException("Released");
        }

        //从阻塞队列中请求队首的输入通道,阻塞两秒钟,如果没有获取到则不断请求,直到获取到一个输入通道位置
        currentChannel = inputChannelsWithData.poll(2, TimeUnit.SECONDS);
    }

    //从输入通道中获得下一个Buffer
    final Buffer buffer = currentChannel.getNextBuffer();

    if (buffer == null) {
        throw new IllegalStateException("Bug in input gate/channel logic: input gate got " +
            "notified by channel about available data, but none was available.");
    }

    //如果该Buffer是用户数据,则构建BufferOrEvent对象并返回
    if (buffer.isBuffer()) {
        return new BufferOrEvent(buffer, currentChannel.getChannelIndex());
    }
    //否则把它当作事件来处理
    else {
        final AbstractEvent event = EventSerializer.fromBuffer(buffer, getClass().getClassLoader());

        //如果获取到的是标识某ResultSubpartition已经生产完数据的事件
        if (event.getClass() == EndOfPartitionEvent.class) {
            //对获取该ResultSubpartition的通道进行标记
            channelsWithEndOfPartitionEvents.set(currentChannel.getChannelIndex());
            //如果所有信道都被标记了,置全部通道获取数据完成
            if (channelsWithEndOfPartitionEvents.cardinality() == numberOfInputChannels) {
                hasReceivedAllEndOfPartitionEvents = true;
            }
            //对外发出ResultSubpartition已被消费的通知同时释放资源
            currentChannel.notifySubpartitionConsumed();
            currentChannel.releaseAllResources();
        }
        //以事件来构建BufferOrEvent对象
        return new BufferOrEvent(event, currentChannel.getChannelIndex());
    }
}

InputChannel根据ResultPartitionLocation提供了三种实现:

LocalInputChannel:用于请求同实例中生产者任务所生产的ResultSubpartitionView的输入通道;
RemoteInputChannel:用于请求远程生产者任务所生产的ResultSubpartitionView的输入通道;
UnknownInputChannel:一种用于占位目的的输入通道,需要占位通道是因为暂未确定相对于生产者任务位置,但最终要么被替换为RemoteInputChannel,要么被替换为LocalInputChannel。

小结:结果子分区中存有真正的数据,会根据结果分区来被消费端消费,不同位置的消费方式不一样,有LOCAL、REMOTE和UNKOWN三种。管理结果分区的是结果分区管理器ResultPartitionManager,一个NetworkEnvironment下只有一个。

相关文章

  • 20171103周报

    本周工作内容: 继续研究下FLink transfer latency,主要是学习相关的生产者消费者通信机制和数据...

  • 20171103

    140公交车上,今天的任务没完成。回去继续背,还有36个主观题。 接到了一张健身房和一张减肥中心的广告。 已经慢慢...

  • 20171103

    【感恩有你】20171103学习力践行记录D174 早晨:《手指摇》-点点窝窝 晚上:古诗《静夜思》《山居秋暝》 ...

  • 20171103

    2017年7月14日入职,至今已有三个半月。从一开始满怀热情,对未来充满憧憬,想象经过自己的努力,将来能够有一番作...

  • 20171103

    人的灵魂和矩阵具有类似的性质,是一种变化,运动,透过某些表象(基)才能反映它的特质,但无法直接认识它。矩阵其实也是...

  • 20171103

    先改需要补充算例和后处理的地方单纯改文字的东西可以后面改 现在要补充算例或者后处理的主要是 第二章非定常时间步选取...

  • 20171103

    你是怎样一个人,你很多时候是不知道的,往往是你在与人交往过程中,尤其是亲密的关系中,你才看清了你自己。有时你会看到...

  • 20171103

    如果我爱你,就珍惜在一起的每一天

  • 20171103

    标题是时间是昨天,科二挂了还是很难过的!不因为别的就是太可惜了真的可惜,晚上聊天还要被怼!我现在想存钱想把字写好!

  • 20171103

    【蹦蹦跳跳皮皮猴】20171103学习力践行d23 中午陪小朋友一起玩,我拿了刚买了小学生古诗集,背起了《卜算子,...

网友评论

      本文标题:20171103周报

      本文链接:https://www.haomeiwen.com/subject/vgpapxtx.html