美文网首页
单进程版流计算实现说明

单进程版流计算实现说明

作者: quiterr | 来源:发表于2017-05-19 09:27 被阅读0次

    一、概念

    单进程版与Storm版的流计算实现有许多相似的概念,其中最重要的包括:Topology、Spout、Bolt。Topology是一个由Spout节点和Bolt节点组成的有向无环图(或称有方向的树),一个轻应用可以有一个或多个Topology。Spout是数据的来源,它是Topology的根节点,每个Topology只有一个Spout。Bolt是真正负责处理业务逻辑的节点。

    Storm将父节点传输给子节点的数据称为Tuple,单进程版的流计算与此对应的概念是BoltParameter。一组Tuple或BoltParameter组成Stream,Stream是一个抽象概念,没有具体的实现。

    单进程版相当于Storm版的简化,有一些Storm版的概念这里不会有,比如:

    • 分组(grouping)
    • Worker
    • Task
    • Reliability

    上面列出的概念基本都与Storm的并行处理有关,单进程版的流计算不存在并发问题,也就没有这些概念。

    在编写Storm版的流计算程序时,很多非业务逻辑的功能是Storm负责维护和处理的,而单进程版的程序需要自己来处理,这引入了一些新的概念:

    • SpoutProcessor,这个类负责维护Spout节点
    • BoltProcessor,这个类负责维护Bolt节点

    下文将详细解释SpoutProcessor和BoltProcessor。

    二、Topology的实现

    之前提到Topology树是由Spout和Bolt组成,实际上Spout和Bolt里边是业务逻辑相关的定义,真正让它们组成一棵树的是SpoutProcessor和BoltProcessor,这两个类都实现了ProcessNode接口,该接口定义如下:

    public interface ProcessNode {
    
        /**
         * 提取本处理结点的名字
         */
        String getName();
        /**
         * 提取本结点的Id,Id用来记录节点间的关系,全局唯一
         */
        String getId();
        /**
         * 向本处理结点增加一个儿子结点
         */
        void addchild(ProcessNode child);
        /**
         * 调用本处理结点的处理逻辑对数据进行处理
         */
        void run(Object param);
    
    }
    

    SpoutProcesser的定义:

    public class SpoutProcessor implements ProcessNode {
    
        private static Logger logger = LoggerFactory.getLogger(SpoutProcessor.class);
        /**
         * 本拓扑待处理的数据队列
         */
        KafkaDataQueue queue = new KafkaDataQueue();
        /**
         * 数据生成器
         */
        private StreamSpout spout;
        /**
         * 本处理器对应的下一代处理器
         */
        private LinkedList<ProcessNode> childrens;
    //后面的代码省略
    

    SpoutProcesser的第一个变量是KafkaDataQueue,这个数据队列由KafkaThread类负责写入数据(它的数据来源是Kafka队列),由ProcessThread负责读取数据并处理。关于ProcessThread等线程类后边详细介绍。
    SpoutProcesser的第二个变量是StreamSpout,这就是Storm中Spout的对应实现,是拓扑树的根节点。
    SpoutProcesser第三个变量:private LinkedList<ProcessNode> childrens;,这个变量记录了它的下一级节点有哪些,这是组成Topology树的关键。

    public class BoltProcessor implements ProcessNode {
    
        /**
         * 本处理器对应的Bolt
         */
        private Bolt bolt;
    
        /**
         * 本处理器对应的下一代处理器
         */
        private LinkedList<ProcessNode> childrens;
    //后面的代码省略
    

    BoltProcessor也是一样,维护了Bolt的下一级节点列表。

    三、线程

    单进程版流计算有四个线程:

    • KafkaThread,如前所述,它负责读取Kafka队列,并把数据放到SpoutProcesser的KafkaDataQueue。每个SpoutProcesser都会收到一份完全相同的数据的拷贝,有点类似于Storm的AllGrouping分组方式。全局只有一个KafkaThread。
    • ProcessThread,它通过调用SpoutProcesser的实例来处理KafkaDataQueue中的数据,每个Topology对应一个ProcessThread。
    • OutputThread,负责把流计算的结果存储到MongoDB,全局只有一个OutputThread。
    • ShutdownHookThread,当进程退出时,如kill -15 程序退出时,把未处理的kafka数据退回kafka队列,把已经处理生成的结果存入mongo,减少数据丢失。ShutdownListener类实现了ApplicationListener接口,当监听到ContextClosedEvent事件时启动ShutdownHookThread。

    四、流计算业务逻辑的实现

    关于Groovy脚本是如何在Java程序中运行的,可以参考《Groovy脚本使用方法》、《baas系统脚本说明》。这篇文档主要介绍StreamSpout、ConvertBolt和StatBolt的实现(AlarmBolt与ConvertBolt类似,不再重复介绍)。

    (一)StreamSpout

    StreamSpout是根节点的实现,它实现了Spout接口:

    public interface Spout {
        /**
         * 得到本Spout的名字
         */
        String getName();
        /**
         * 得到本Spout的Id
         */
        String getId();
        /**
         * 准备本Bolt
         */
        void prepare();
        /**
         * 提取设备档案操作对象
         */
        IArchives getArchives();
    
        /**
         * 设置设备档案操作对象
         */
        void setArchives(IArchives archives);
    
        /**
         * 执行Spout内部的判断逻辑,判别是否应该交由本Spout进行处理
         */
        BoltParameter execute(SpoutParameter spoutParameter);
    }
    

    prepare方法只在初始化的时候执行一次,它负责做一些准备工作。
    execute方法每收到一个数据就会运行一次,处理真正的业务逻辑,数据通过参数BoltParameter传递进来,通过返回BoltParameter传递给下一级节点。
    getArchives方法返回一个IArchives接口,通过这个接口提供的方法可以获取设备档案。

    StreamSpout的定义(核心片段,非完整代码):

    public class StreamSpout implements Spout {
        /**
         * 本Spout的配置
         */
        private SpoutConfig config;
    
        /**
         * 访问设备档案的对象
         */
        private Archives archives;
    
        @Override
        public void prepare() {
    
        }
    
        @Override
        public BoltParameter execute(SpoutParameter spoutParameter) {
    //省略具体实现
        }
    
    //省略后面的代码
    

    第一个成员变量是SpoutConfig,这就是用户在轻应用平台上所做的配置,由单进程流计算的入口类Executor从数据库中读取填充。
    第二个成员变量Archives,这个对象包含一个deviceId成员变量,当StreamSpout收到数据时,execute方法会给deviceId赋值,在后面的节点中将用来获取档案信息。
    StreamSpout的prepare方法目前为空,没有任何准备工作要做。
    execute方法首先判断数据流名称是否和用户配置的一致,然后构造BoltParameter对象(由流计算的上下文、内置输入对象、档案操作对象组成),返回该对象给下一级节点。

    (二)ConvertBolt

    ConvertBolt的定义(核心片段,非完整代码):

    public class ConvertBolt extends BaseBolt {
        /**
         * 本Bolt的配置
         */
        private ConvertBoltConfig config;
        /**
         * 脚本对象
         */
        private IConvertProcess process;
        /**
         * 把数据保存到Mongo的队列
         */
        private OutputQueue outputQueue;
    //省略非业务逻辑私有变量
    
        @Override
        public void prepare() {
    //省略具体实现
        }
    
        @Override
        public List<BoltParameter> execute(BoltParameter parameter) {
     //省略具体实现
        }
    }
    

    ConvertBolt继承了BaseBolt,后者非常简单,不影响整体理解,细节请阅读源代码。
    第一个成员变量是ConvertBoltConfig,和SpoutConfig一样,这是用户在轻应用平台上所做的配置,由单进程流计算的入口类Executor从数据库中读取填充。
    第二个成员变量IConvertProcess,用来引用Groovy脚本的实例,执行Groovy脚本的时候用到。
    第三个成员变量OutputQueue,OutputThread会将这个队列的数据存储到MongoDB。
    prepare方法只在初始化的时候执行一次,它负责做一些准备工作,例如解析ConvertBoltConfig并加载Groovy脚本。
    execute方法每收到一个数据就会运行一次,处理真正的业务逻辑,数据通过参数BoltParameter传递进来,通过返回List<BoltParameter>传递给下一级节点。

    (三)StatBolt

    StatBolt的定义(核心片段,非完整代码):

    public class StatBolt extends BaseBolt {
        /**
         * 统计单元的配置
         */
        private StaticsBoltConfig config;
    
        /**
         * 统计脚本对象
         */
        private IStatProcess statProcessor;
    
        /**
         * 统计缓存对象
         */
        private Caches caches;
    
        /**
         * 统计过程中访问Redis的对象,用于保存和提取中间结果
         */
        private RedisClient redisClient;
    
        /**
         * 把数据存储到Mongo中的队列
         */
        private OutputQueue outputQueue;
    
        @Override
        public void prepare() {
    //省略具体实现
        }
    
        @Override
        public List<BoltParameter> execute(BoltParameter parameter) {
     //省略具体实现
        }
    }
    

    StatBolt和ConvertBolt结构基本一致,相同的部分不再重复说明。
    成员变量Caches是内置的统计计算所需的缓存类,在prepare方法中初始化,它实现了ICaches接口,包括单个设备统计用到的group函数和全局统计用到的group(Object group)。虽然名为Caches,但它更多的是作为计算所需的内置对象,实际的缓存功能是由它内部的RedisClient类实现。
    execute方法判断是否到达输出时间(上一次输出时间可通过Caches获取),如果到达则执行输出脚本,如果没有到达则执行计算脚本。

    五、缓存的实现

    采用了两级缓存机制:Redis和Ehcache,前者是远程缓存,后者是本地缓存。事实上正是由于远程缓存性能不够好才引入了本地缓存,但另一方面,如果只使用本地缓存,程序意外终止时会丢失数据,所以两者结合使用。
    CacheUtils类提供了Ehcache的访问,RedisClient类除了提供了Redis的访问,还包含CacheUtils的调用并提供其他类所需的接口方法。

    缓存的细节信息见下表:

    缓存类型 最大数量级 key值构成 Ehcache缓存名字 备注
    设备档案 设备数量 arch-、archiveId、deviceId archiveCache
    单个统计中间结果 设备数量*统计节点数量 cache-、statId、deviceId midStatCache 单个统计时缓存数量远大于全局统计
    单个统计的上一次输出时间 设备数量*统计节点数量 stat_last_time-、statId、deviceId lastStatOutputCache 单个统计时缓存数量远大于全局统计
    全局统计中间结果 档案字段数量*统计节点数量 cache-、statId、group midStatCache group是档案字段的值
    全局统计的上一次输出时间 档案字段数量*统计节点数量 stat_last_time-、statId、group lastStatOutputCache group是档案字段的值
    上一次告警输出的时间(数据来源为非统计节点) 设备数量*告警节点数量 alarm_last-、alarmId、deviceId lastAlarmCache
    上一次告警输出的时间(数据来源为单个统计节点) 设备数量*告警节点数量 alarm_last-、alarmId、deviceId lastAlarmCache key值看起来和上一行相同,但alarmId实际会不一样。
    上一次告警输出的时间(数据来源为全局统计节点) 档案字段数量*告警节点数量 alarm_last-、alarmId、group lastAlarmCache group是档案字段的值
    全局统计的维度值列表 档案字段数量*统计节点数量 statId statDimensionsCache value是Set<String>,Set里边放的group;单线程版本的流计算是存储在Redis

    六、程序的入口:Executor类

    Executor类的init方法是单进程版流计算程序的入口,init方法主要做了两件事:创建Topology树和启动各个线程,当init方法执行完毕之后,单进程流计算程序就开始读取Kafka队列中的数据并处理,这个过程将会一直运行下去,只能用户手动停止。

    相关文章

      网友评论

          本文标题:单进程版流计算实现说明

          本文链接:https://www.haomeiwen.com/subject/ngjpzttx.html