硕士毕业于西安电子科技大学,曾就职于阿里云存储部门,主要从事存储服务相关功能的设计与开发工作。于2016年加入七牛云,主要负责流式计算与离线计算服务pipeline的架构和开发工作。目前pipeline承载公司每天超过千亿、超过百TB的数据处理。
今天的分享主要围绕七牛在最近一年时间里面开发的大数据平台进行展开,目前我们的平台已经承载了公司核心业务的运营;关于我们的产品,主要会从一个场景展开进行介绍,当中包含了我们在设计过程中遇到的挑战以及解决方案。也欢迎大家基于这些问题和我们展开交流与讨论。
场景.产品
对于运维人员来说,在进行每日常规的线上运维时,日志当中的一天内访问量的波动、线上错误分布、其他业务指标这些数据对于运维人员来说并非是一个透明的过程,那么如何将这些东西做到可视化,或是将这些数据收集起来做统一的处理分析,其实是一个比较复杂和较难实现的过程。这就是所谓的运维日志分析,也是我们之前所提及的场景。关于我们产品解决场景的细节,在下面将会进一步进行分析。我们以Nginx-log为例对我们的Pandora产品进行叙述。
数据接入Pandora—logkit配置运行
任何数据分析的第一步都是数据接入。Pandora开发的数据接入工具logkit,可以帮助用户将数据打入Pandora平台内;在最开始需要下载logkit,配置并运行(图1)
图1logkit工具支持多种数据源,比如对Nginx-log、kafka数据进行采集,并打入我们的数据处理平台当中。下面对图 1 进行详解,首先,我们需要查看日志格式,包括日志格式的名称。在图 1 中,我们明确了日志存储的路径以及格式。最后进入配置文件,将需要进行配置的信息进行配置,并指明数据需要打入存放的路径,如果需要打到某一个消息队列中时,需要对密钥进行配置并运行它,那么此时这个数据才会采集收录到我们的平台当中。
日志检索
图2图 2 所示是一个比较直观的可视化界面,它支持拖拽,页面左侧可以看到“数据源”与“日志检索”这两项内容,配置好的 logkit 运行之后,所有数据都会打入“数据源”中。页面右侧则显示了数据源中每个字段的名称、格式等信息。
图3图 3 所示是“日志检索”的内容显示页面,通过“日志检索”我们可以清晰的查看一些业务逻辑,在搜索框中填入你的查询条件,就可以进行全文检索,当需要查看过去某个时刻响应超过3s的所有请求,那么通过“日志检索”页面也可以清楚的查询并显示出来。图 3 仅仅是展示了一个全文搜索的状态,在功能页面还可以查看相关数据分布的柱状图。
日志聚合
图4如图 4 所示,打入到数据源里面的数据,可以通过一段SQL以每分钟为粒度进行计算聚合。可以做聚合的内容很多,如来自某个IP的请求数量,也可以是别的一些相关操作,聚合结束之后,数据便会再次回流到我们的数据源当中。简单来说,我们通过一次计算将数据重新回流到数据源用于下一环节的分析处理,计算、回流的过程是可以不断去进行级联的,可以实现很多相对比较复杂的数据处理。
数据回流至平台
图5上面提及的数据回流至数据源是一种处理方式,用户搭建自己的一套HTTP服务,数据通过HTTP的接口回流至其自己系统内是另外一种数据处理方式。通过这种方式回流的数据,用户可以将分析结果在自己平台进行沉淀,操作页面如图 5 所示。
实时数据展示与监控
图6图 6 所示直观展现了我们的监控页面,监控服务需要开通之后再进行Grafana页面配置,页面的基本配置在我们官方文档中都有提供,用户可以直接下载导入。
图7图 7 展示的是对 Nginx 的日志进行分析之后的数据展示图。左上角橙色的框(visits为0)显示可总访问量,右上角绿色的柱状图则是在过去一段时间内发生的请求数以及响应时间,右下角的饼状图显示了相关用户访问的占比量。这些图的样式及位置都可以进行配置。
架构设计
图8图 8 所示展示了Pandora的业务架构。数据通过Portal/Logkit/SDK/API可以导入我们的平台,进入消息队列当中,消息队列当中的数据可以经过计算反复在计算任务和消息队列之间进行流动,当然,这些数据也可以直接导出。导出后的数据经过下游系统(日志检索/时序数据等)处理最终可以生成数据报表,以上就是数据的整个流向。
Pipeline设计目标及技术选型
每个系统在最初设计时都会拟定设计目标以及相应的需要解决的问题。下面先讲一下我们的设计目标,首先这个系统必须支持数据快速接入、高吞吐量、低延迟;其次作为一个云服务,它必须支持海量用户并发访问以及必须支持海量消息队列;要提供实时计算与离线计算的框架满足计算需求;最终它必须是可视化的操作满足用户操作需求。在设计目标提出之后,我们要对选型进行规划,我们需要选择具备高吞吐量的存储系统,当然目前七牛的存储系统无疑是最满足需求的;其次我们需要强大灵活的大数据处理引擎;最后开发人员必须保证最终设计的产品是可以快速迭代开发的。基于这些要求,我们很轻易选择了相应的技术支撑,使用Kafka来满足我们的对海量消息队列设计的需求;使用Spark作为计算引擎;语言选型上则选用我们底蕴积淀深厚的Golang,最终,在确定这几种技术选型之后,我们便开始搭建系统。
图9图 9 所示,是我们Pipeline的整体架构设计,它负责pandora中数据的接入和处理。数据通过Logkit等方式导入到数据接入层,也就是apiserver。通过apiserver的数据会进入到消息队列里面,之后通过计算引擎的读取和回写操作,最终导入到下游系统中(LogDB/TSDB/HTTP/七牛云存储)我们今天着重关注绿色箭头指引的数据流方向,会提及里面相关的重点进行详解。在整个数据流流动过程中,有几个因素可能会决定这个系统的效率,比如稳定性、性能等。所以我将从用户到消息队列,经过计算任务再返回到消息队列,最终导出数据这整个过程来讲解。
数据接入层
图10图 10 所示显示的是数据接入层。数据通过apiserver导入,调度器用来管理一些用户消息队列的源数据,其中包括数据以何种形式写入到消息队列当中去。logkit这个工具之所以放在这里,不是因为数据会通过apisever流向logkit最终再流向消息队列,而是因为它可以采集各种形式的数据,在这里我们用它采集系统审计日志与监控信息。它很容易进行管理和配置。
容器化
图11在最开始设计这个系统时,扩容是一个比较困扰我们的问题。因为接入的基本是内部用户,接入速度比较快,所以一周之内需要扩容至少一到两次,这在运维上是一个比较重的负担。之后我们采用了容器的方案,因为整个数据接入层是一个无状态的组件,所以我们将它容器化,使用我们的容器云产品解决。如图 11 所示,每一个pod中,我们都将apisever与logkit布局在一起,通过监控数据,我们将每个容器包括这个集群整体的信息全部都汇总在了这个调度器当中。调度器里面承载着整个集群负载及资源总量这些信息,可以及时根据这些信息动态的实现扩容缩容。
数据写入优化
图12图 12 所示是对数据写入进行优化的过程。第一代数据写入流程,采用了串行的方式进行,数据导入之后是一行一行进行解析,全部解析之后再将数据写入到消息队列当中,但是这种方式的处理效率是非常低效的。所以我们利用go语言的特性,采用了line channel,数据源源不断进入channel,然后会在channel下游起多个parser,并行的对数据进行解析。也就是说我们利用channel将处理变成了并发的过程,最终提高了CPU的利用率,降低了用户响应的延迟率,大大优化了性能。
计算
图13如图 13 所示,我们的计算基于Spark 实现,提供了一个比较简单的SQL,对用户屏蔽了底层细节。
导出优化
图14数据流入整个系统中,在系统中不管是做计算还是存储,这些经过处理的数据如果需要发挥作用,都要流入到下游系统中,所以“导出数据”这个过程起到的是一个连接上下游,承上启下的作用。图 14 是这个系统的总架构图,因为当时并未对导出服务做细粒度的任务切分,并且单台server也处理不了过大的用户任务,所以在高峰期时,会导致延迟增大,基于此,我们经过一个月的开发最终推出了一个全新的版本。
图15如图 15 所示,是经过改进后的整体架构图。图的顶层是我们的master,用它来控制所有任务的调度管理。所有任务都是经由调度器转发给master,由master来评估每一台机器上的负载,之后再根据机器本身的一些状态(CPU使用率、网络带宽、执行任务的情况)去做相应的调度,除此之外我们还将任务做了更细粒度的切分。
调度方法的设计首要考虑到的就是面向资源,其次需要充分利用异构机器,并且能满足自动调整。面向资源大家都能够理解,充分利用异构的机器,是因为我们机器规格众多,所能解决的任务强度不一致,我们需要充分利用该机器的资源,而不能让其在处理任务时,有"机器资源"不足或浪费的情况发生;至于自动调整,就是可以保证在面对用户量突增或者突减这种突发情况发生时,我们具备自动调整任务分布的能力,其最终的目的也是为了充分利用资源。
任务分配
图16图 16 是任务分配的过程图。假设最初任务(T1-T7)都相对均匀的分布在三台机器上,此时又有另外两个任务(T8-T9)进入,那么我们就需要寻找一些相对比较空闲的机器(S1 或 S2)优先将这两个任务分配给他们。这只是针对一个相对比较均衡的情况做的一个调整。
自动调整
图17 图18当然也会有不均衡的情况产生(图 17-18)那么此时就需要我们去做一些自动调整,比如有一个用户删除了其很多任务,那么此时的S1与S2相对S3会比较空闲,那么此时我们就需要通过server向master上报心跳,这个内容包括对资源的占用以及任务的分布情况,根据结果对比较空闲的机器做一个调整,保持一个相对平衡的状态。
水平扩展
图19图 19 是进行水平扩展时会产生的一个问题。所有机器目前都处于一个比较繁忙的状态,此时如果过来一个新的任务(T13),但是前12个任务已经全部分布在这三台机器上面处理,腾不出空闲的机器处理新增任务,那么此时就会需要对机器进行扩容。
图20如图 20 所示,在前三台机器都处于“忙碌”状态时,我们需要新增server4,一旦启动S4,它会向master汇报心跳,然后master就会感知到这个任务的存在以及S4的存在,重新对整个资源分布使用情况做一次评估,将T13分配到比较空闲的S4上,甚至可以将在S1、S2、S3等待处理的任务分配到S4上。
资源隔离
图21实际上,不单单是对任务进行自动调整均衡分担机器处理压力是非常重要的,对于一些比较特殊的任务,如何保证这个用户流量在突增时不会影响到其他相对较小的用户,或是当数据导出到云存储进行压缩时(压缩的过程非常耗费CPU资源)如何保证它不会影响其他任务,这些都是我们需要处理的问题。针对这些问题我们提出了资源隔离的概念(图 21)将机器和任务进行隔离,提供调度组(调度组中是相近的一组机器或者是一类任务)功能,通过对他们物理上的隔离,达到相互之间互不影响,并对资源进行充分利用。
master高可用
图22 图23综上我们可以看出我们的系统是一对多的状态(一个master对多个server)那么在这种情况下,如何解决在出现单点故障时仍然保证服务的高可用。如图22到图23所示,是我们设计的一个核心所在,我们可以在图中看到最底端是一个zookeeper集群,我们通过对一个临时文件的创建来模拟一个锁,多台机器可以同时去抢占这把锁,抢占成功的master会成为一个主master,没有抢占成功的则会作为一个备份,在平时会空闲,但一旦S1丢锁,master2就会抢占锁,接过整个调度任务以及一些集群管理任务,这就是master高可用思路。
Server高可用
图24server高可用,我们也是采用类似的思路。我们将master视作一个高可用节点,每一个server都需要向master汇报心跳,心跳的内容包含了机器本身的存活以及相应任务的执行情况。如图 24 所示,master一旦感知到S3宕机,那么此时就会将S3上执行的两个任务(T5-T6)都调走,并且它会认为S1与S2是相对比较合适的选择,并且会将这两个任务调去相应的server上,这样就完成了server的高可用目标。
系统级水平扩展
图25 图26最开始有提及我们的整个消息队列是使用kafka实现的,kafka其实也是有上限的,在最开始我们也是采用了kafka单个集群(图 25)后来发现,一旦业务量上来,消息队列数据一旦多到一定程度,系统会发生雪崩。所以我们对单个集群做了一个扩展(图 26)将单个kafka集群直接拆成多个集群,让每一个kafka集群都保持一个相对比较小的规模,这样性能方面就会得到很大的提升,图 26 所示就是经过扩展后的情况,由三个kafka提供的信息会汇总到我们的调度器上,调度器通过压力或者是消息队列的数量,对用户新创建的任务以及新的数据源进行分配,分配至合适的kafka集群中。
上下游协议优化
图27在实践中还是会出现上下游之间性能较低的情况。在最开始,我们采用Json来做上下游的数据传输,可是在日志检索时暴露出的问题就是,这样做对网络消耗很大,于是我们决定采用Protobuf进行上下游数据传输。图 27 展示了使用Json与Protobuf时,从序列化、反序列化角度进行对比的数据结果展示,从图中可以看出,使用Protobuf消耗的时间都更短,尤其在反序列化时,它的CPU消耗降低了将近一个数量级。因此,采用这种方式,不管从集群计算资源利用还是从网络带宽提升上都将效率提升了数倍。
流水线处理
图28至于对流水线的处理,最开始的设计其实是一个串行的操作,导出服务从消息队列当中拉取数据,经过处理之后做一个推送,持续这样的工作过程,其中处理操作是很快的,但是拉取和推送相对就很慢,这样的一个过程,执行效率其实是很低的,并且由于每种操作的处理时间不一样,有的快有的慢,就导致在监控图上监察到网络的趋势图是时高时低的,也就导致了利用率的降低。鉴于此,我们优化了流水线操作,采用并行化操作(图 28)结果显示,这样做的结果是推送和拉取效率都会比上面一种方式高。
Golang GC
我们的整个语言选型是采用Golang的,而它是一个带GC的语言,实际上出现的情况还是很多的,系统当中会有1/10的时间是不干活而在进行垃圾回收的。因此在代码层面我们做了一些改进,一个是sync.Pool的使用能够降低垃圾回收的频率;其次是重用对象,将一个对象尽可能重复使用,这样一来,每一次GC的量就会减小。之后我们对Golang进行了版本升级,升级至1.8版本后我们再查看了一下GC的耗时,发现提升了将近两个数量级。这就是代码层面的优化。
有限资源假设
最后叙述一下我们关于资源假设方面进行的一个优化,也就是要建立起一个有限资源假设的概念。前段时间由于数据接入量比较大,我们需要自己进行运维,突发接入的客户,会使系统轻易就被跑满,此时我们会想办法加机器或者是说在调度上去做一些调整和优化,但是这样终究不是一个长久的办法。于是我们会在刚开始去做一个资源有限假设,就是要在最开始评估出来资源有限情况下我们该如何去做。比如需要提前预估出10M带宽所能对应多少用户的任务,这个必须是有一个数据存在的,并且在这个基础上我们需要做一个资源的预估和集群资源的规划。根据这个预估的数据的情况,去划定一个水位标准,超过水位标准之后,再考虑是不是需要进行扩容。客户这边也需要沟通清楚我们现有的处理能力,这样,才能保证整个集群/服务是处于一个比较健康的状态。
成果
上面我们提及了我们的架构实现以及整体的一个优化,目前的成果就是:我们支撑了万亿级的数据点,并且每天可以处理几百TB的数据,以及支持海量用户。我们的系统目前保持很低的延迟,较高的处理效率;由于我们实现了自动化运维,所以人力成本也大大减少,我们的期望就是可以在写代码时不被运维事情干扰;至于可用性,目前已经达到3个9(99.9%)的成效。
网友评论