实时计算在点评的使用场景
类别一:Dashboard、实时DAU、新激活用户数、实时交易额等
♦ Dashboard类:北斗(报表平台)、微信(公众号)和云图(流量分析)等
♦ 实时DAU:包括主APP(Android/iPhone/iPad)、团APP、周边快查、PC、M站
♦ 新激活用户数:主APP
♦ 实时交易额:闪惠/团购交易额
以报表平台为例,下图是一张APP UV的实时曲线图,它以分钟级别粒度展现了 实时的DAU数据和曲线。
从图中可以看见一个尖点,这个尖点就是当天push过后带来的用户,这样可以看到实时的运营效率。
类别二:搜索、推荐、安全等
以搜索为例:用户在点评的每一步有价值的操作(包括:搜索、点击、浏览、购买、收藏等),都将实时、智能地影响搜索结果排序,从而显著提升用户搜索体验、搜索转化率。
某用户 搜索“ 火锅 ”,当他 在搜索结果页 点击了“ 重庆高老九火锅 ”后, 再次刷新搜索结果列表时,该商户的排序就会提升到顶部 。
再结合其他的一些实时反馈的个性化推荐策略,最终使团购的交易额有了明显的增加,转化率提升了2个多点。
实时计算在业界的使用场景
场景1:阿里JStorm
♦ 双11实时交易数据
场景2:360Storm
♦ 抢票软件验证码自动识别:大家用360浏览器在12306上买票的时候,验证码自动识别是在Storm上计算完成的。
♦ 网盘图片缩略图生成:360网盘的缩略图也是实时生成出来的,这样可以节约大量的文件数量和存储空间。
♦ 实时入侵检测
♦ 搜索热词推荐
场景3:腾讯TDProcess
分布式K/V存储引擎TDEngine和支持数据流计算的TDProcess,TDProcess是基于Storm的计算引擎,提供了通用的计算模型,如Sum、Count、PV/UV计算和TopK统计等。
场景4:京东Samza
整个业务主要应用订单处理,实时分析统计出待定区域中订单各个状态的量:待定位、待派工、待拣货、待发货、待配送、待妥投等。
点评如何构建实时计算平台
点评的实时计算平台是一个端到端的方案,从下面的平台架构图,可以看出整体架构是一个比较长的过程,包括了数据源、数据的传输通道、计算、存储和对外服务等。
实时计算平台首先解决的问题是,数据怎么获取,如何拿到那些数据。现在做到了几乎所有点评线上产生的数据都可以毫秒级拿到,封装对应的数据输入源Spout。
通过Blackhole支持日志类实时获取,包括打点日志、业务Log、Nginx日志等。 整合Puma Client第一时间获取数据库数据变更。整合Swallow获取应用消息。Blackhole是团队开发的类Kafka系统,主要目标是批量从业务方拉取日志时做到数据的完整性和一致性,然后也提供了实时的消费能力。Puma是以MySQL binlog为基础开发的,这样可以实时拿到数据库的update、delete、insert操作。
Swallow是点评的MQ系统。通过整合各种传输通道,并且封装相应的Spout,做业务开发的同学就完全不用关心数据怎样可靠获取,只需要写自己的业务逻辑就可以了。解决了数据和传输问题后,计算过程则在Storm中完成。
如果在Storm计算过程中或计算出结果后,需要与外部存储系统交互,也提供了一个data-service服务 ,通过点评的RPC框架提供接口,用户不用关心实际Redis/HBase这些系统的细节和部署情况, 以及这个数据到底是在Redis还是HBase中的,可以根据SLA来做自动切换;
同时计算的结果也是通过data-service服务,再反馈到线上系统。就拿刚刚搜索结果的例子,搜索业务在用户再次搜索的时候会根据userId请求一次data-service,然后拿到这个用户的最近浏览记录,并重新排序结果,返回给用户。
这样的好处就是实时计算业务和线上其他业务完全解耦,实时计算这边出现问题,不会导致线上业务出现问题。
Storm基础知识简单介绍
Apache Storm( http://storm.apache.org/)是由Twitter开源的分布式实时计算系统。Storm可以非常容易、可靠地处理无限的数据流。对比Hadoop的批处理,Storm是个实时的、分布式以及具备高容错的计算系统。Storm可以使用何编程语言进行开发。
Storm的集群表面上看和Hadoop的集群非常像,但是在Hadoop上面运行的是MapReduce的Job,而在Storm上面运行的是Topology。
Storm和Hadoop一个非常关键的区别是Hadoop的MapReduce Job最终会结束,而Storm的Topology会一直运行(除非显式地杀掉)。
Storm基本概念:
Nimbus和Supervisor之间的通讯是依靠ZooKeeper来完成,并且Nimbus进程和Supervisor都是快速失败(fail-fast)和无状态的。可以用kill-9来杀死Nimbus和Supervisor进程,然后再重启它们,它们可以继续工作。
在Storm中,Spout是Topology中产生源数据流的组件。通常Spout获取从Kafka、MQ等的数据,然后调用nextTuple函数,发射数据出去供Bolt消费。
图中的Spout就发射出去了两条数据流。而Bolt是在Topology中接受Spout的数据,然后执行处理的组件。Bolt在接收到消息后会调用execute函数,用户可以在其中执行自己想要的操作。
为什么用Storm呢,因为Storm有它的优点:
易用性
只要遵守Topology,Spout,Bolt的编程规范即可开发出一个扩展性极好的应用,像底层RPC,Worker之间冗余,数据分流之类的操作,开发者完全不用考虑。
扩展性
当某一级处理单元速度不够时,直接配置一下并发数,即可线性扩展性能。
健壮性
当Worker失效或机器出现故障时, 自动分配新的Worker替换失效Worker。
准确性
采用Acker机制,保证数据不丢失。采用事务机制,保证数据准确性。刚刚介绍了一些Storm的基础概念和特性,再用一张比较完整的图来回顾一下整个Storm的体系架构:
Storm提交一个作业的时候,是通过Thrift的Client执行相应的命令来完成。Nimbus针对该Topology建立本地的目录,Nimbus中的调度器根据Topology的配置计算Task,并把Task分配到不同的Worker上,调度的结果写入Zookeeper中。
Zookeeper上建立assignments节点,存储Task和Supervisor中Worker的对应关系。在Zookeeper上创建workerbeats节点来监控Worker的心跳。Supervisor去Zookeeper上获取分配的Tasks信息,启动一个或者多个Worker来执行。
每个Worker上运行多个Task,Task由Executor来具体执行。Worker根据Topology信息初始化建立Task之间的连接,相同Worker内的Task通过DisrupterQueue来通信,不同Worker间默认采用Netty来通信,然后整个Topology就运行起来了。
如何保证业务运行可靠性
首先Storm自身有很多容错机制,也加了很多监控信息,方便业务同学监控自己的业务状态。
在Storm上,遇到的一个很基本的问题就是,各个业务是运行的Worker会跑在同一台物理机上。曾经有位同学就在自己的Worker中起了200多个线程来处理json,结果就是这台机器的CPU都被他的Worker吃光了,其他的业务也跟着倒霉。
因此也使用CGroup做了每个Worker的资源隔离,主要限制了CPU和Memory的使用。相对而言JStorm在很多方面要完善一些,JStorm自己就带资源隔离。对应监控来说,基本的主机维度的监控在ganglia上可以看见,比如现在集群的运行状况。下图是现在此时的集群的网络和负载:
这些信息并不能保证业务就OK,因此将Storm上的很多监控信息和点评的开源监控系统Cat集成在了一起,从Cat上可以看见更多的业务运行状态信息。
比如在Cat中我可以看见整个集群的TPS,现在已经从30多万降下来了。 然后我可以设置若干的报警规则, 如:连续N分钟降低了50%可以报警。然后也监控了各个业务Topology的TPS、Spout输入、Storm的可用Slot等的变化。
这个图就是某个业务的TPS信息, 如果TPS同比或者环比出现问题,也可以报警给业务方。
Storm使用经验分享
1.使用组件的并行度代替线程池
Storm自身是一个分布式、多线程的框架,对每个Spout和Bolt,都可以设置其并发度;它也支持通过rebalance命令来动态调整并发度,把负载分摊到多个Worker上。
如果自己在组件内部采用线程池做一些计算密集型的任务,比如JSON解析,有可能使得某些组件的资源消耗特别高,其他组件又很低,导致Worker之间资源消耗不均衡,这种情况在组件并行度比较低的时候更明显。
比如某个Bolt设置了1个并行度,但在Bolt中又启动了线程池,这样导致的一种后果就是,集群中分配了这个Bolt的Worker进程可能会把机器的资源都给消耗光了,影响到其他Topology在这台机器上的任务的运行。如果真有计算密集型的任务,可以把组件的并发度设大,Worker的数量也相应提高,让计算分配到多个节点上。
为了避免某个Topology的某些组件把整个机器的资源都消耗光的情况,除了不在组件内部启动线程池来做计算以外,也可以通过CGroup控制每个Worker的资源使用量。
2.不要用DRPC批量处理大数据
RPC提供了应用程序和StormTopology之间交互的接口,可供其他应用直接调用,使用Storm的并发性来处理数据,然后将结果返回给调用的客户端。这种方式在数据量不大的情况下,通常不会有问题,而当需要处理批量大数据的时候,问题就比较明显了。
(1)处理数据的Topology在超时之前可能无法返回计算的结果。
(2)批量处理数据,可能使得集群的负载短暂偏高,处理完毕后,又降低回来,负载均衡性差。
批量处理大数据不是Storm设计的初衷,Storm考虑的 是时效性和批量之间的均衡,更多地看中前者。需要准实时地处理大数据量,可以考虑Spark Stream等批量框架。
3.不要在Spout中处理耗时的操作
Spout中nextTuple方法会发射数据流,在启用Ack的情况下,fail方法和ack方法会被触发。
需要明确一点,在Storm中Spout是单线程(JStorm的Spout分了3个线程,分别执行nextTuple方法、fail方法和ack方法)。如果nextTuple方法非常耗时,某个消息被成功执行完毕后,Acker会给Spout发送消息,Spout若无法及时消费,可能造成ACK消息超时后被丢弃,然后Spout反而认为这个消息执行失败了,造成逻辑错误。反之若fail方法或者ack方法的操作耗时较多,则会影响Spout发射数据的量,造成Topology吞吐量降低。
4.注意fieldsGrouping的数据均衡性
fieldsGrouping是根据一个或者多个Field对数据进行分组,不同的目标Task收到不同的数据,而同一个Task收到的数据会相同。
假设某个Bolt根据用户ID对数据进行fieldsGrouping,如果某一些用户的数据特别多,而另外一些用户的数据又比较少,那么就可能使得下一级处理Bolt收到的数据不均衡,整个处理的性能就会受制于某些数据量大的节点。可以加入更多的分组条件或者更换分组策略,使得数据具有均衡性。
5.优先使用localOrShuffleGrouping
localOrShuffleGrouping是指如果目标Bolt中的一个或者多个Task和当前产生数据的Task在同一个Worker进程里面,那么就走内部的线程间通信,将Tuple直接发给在当前Worker进程的目的Task。否则,同shuffleGrouping。
localOrShuffleGrouping的数据传输性能优于shuffleGrouping,因为在Worker内部传输,只需要通过Disruptor队列就可以完成,没有网络开销和序列化开销。因此在数据处理的复杂度不高,而网络开销和序列化开销占主要地位的情况下,可以优先使用localOrShuffleGrouping来代替shuffleGrouping。
6.设置合理的MaxSpoutPending值
在启用Ack的情况下,Spout中有个RotatingMap用来保存Spout已经发送出去,但还没有等到Ack结果的消息。RotatingMap的最大个数是有限制的,为p*num-tasks。其中p是topology.max.spout.pending值,也就是MaxSpoutPending(也可以由TopologyBuilder在setSpout通过setMaxSpoutPending方法来设定),num-tasks是Spout的Task数。如果不设置MaxSpoutPending的大小或者设置得太大,可能消耗掉过多的内存导致内存溢出,设置太小则会影响Spout发射Tuple的速度。
7.设置合理的Worker数
Worker数越多,性能越好?先看一张Worker数量和吞吐量对比的曲线(来源于JStorm文档:
https://github.com/alibaba/jstorm/tree/master/docs/ 0.9.4.1jstorm性能测试.docx)。
从图可以看出,在12个Worker的情况下,吞吐量最大,整体性能最优。这是由于一方面,每新增加一个Worker进程,都会将一些原本线程间的内存通信变为进程间的网络通信,这些进程间的网络通信还需要进行序列化与反序列化操作,这些降低了吞吐率。
另一方面,每新增加一个Worker进程,都会额外地增加多个线程(Netty发送和接收线程、心跳线程、SystemBolt线程以及其他系统组件对应的线程等),这些线程切换消耗了不少CPU,sys系统CPU消耗占比增加,在CPU总使用率受限的情况下,降低了业务线程的使用效率。
8.平衡吞吐量和时效性
Storm的数据传输默认使用Netty。在数据传输性能方面,有如下的参数可以调整:
storm.messaging.netty.server_worker_threads和storm.messaging.netty.client_worker_threads分别为接收消息线程和发送消息线程的数量。
netty.transfer.batch.size是指每次 Netty Client向 Netty Server发送的数据的大小,如果需要发送的Tuple消息大于netty.transfer.batch.size,则Tuple消息会按照netty.transfer.batch.size进行切分,然后多次发送。
storm.messaging.netty.buffer_size为每次批量发送的Tuple序列化之后的TaskMessage消息的大storm.messaging.netty.flush.check.interval.ms表示当有TaskMessage需要发送的时候, Netty Client检查可以发送数据的频率。
降低storm.messaging.netty.flush.check.interval.ms的值,可以提高时效性。增加netty.transfer.batch.size和storm.messaging.netty.buffer_size的值,可以提升网络传输的吐吞量,使得网络的有效载荷提升(减少TCP包的数量,并且TCP包中的有效数据量增加),通常时效性就会降低一些。因此需要根据自身的业务情况,合理在吞吐量和时效性直接的平衡。
除了这些参数,怎么找到Storm中性能的瓶颈,可以通过如下的一些途径来进行:
在Storm的UI中,对每个Topology都提供了相应的统计信息,其中有3个参数对性能来说参考意义比较明显,包括Execute latency、Process latency和Capacity。
分别看一下这3个参数的含义和作用。
(1)Execute latency:消息的平均处理时间,单位为毫秒。
(2)Process latency:消息从收到到被ack掉所花的时间,单位为毫秒。如果没有启用Acker机制,那么Process latency的值为0。
(3)Capacity:计算公式为Capacity = Bolt或者Executor调用execute方法处理的消息数量 * 消息平均执行时间 /时间区间。这个值越接近1,说明Bolt或者Executor基本一直在调用execute方法,因此并行度不够,需要扩展这个组件的Executor数量。为了在Storm中达到高性能,在设计和开发Topology的时候,需要注意以下原则。
(1)模块和模块之间解耦,模块之间的层次清晰,每个模块可以独立扩展,并且符合流水线的原则。
(2)无状态设计,无锁设计,水平扩展支持。
(3)为了达到高的吞吐量,延迟会加大;为了低延迟,吞吐量可能降低,需要在二者之间平衡。
(4)性能的瓶颈永远在热点,解决热点问题。
(5)优化的前提是测量,而不是主观臆测。收集相关数据,再动手,事半功倍。
关于计算框架的后续问题
目前Hadoop/Hive专注于离线分析业务,每天点评有1.6万个离线分析任务。Storm专注于实时业务,实时每天会处理100亿+条的数据。
在这两个框架目前有很大的gap,一个是天级别,一个是秒级别,然后有大量的业务是准实时的,比如分钟级别。因此会使用Spark来做中间的补充。
Spark Streaming + Spark SQL也能够降低很大的开发难度。相对而言,目前Storm的学习和开发成本还是偏高。要做一个10万+TPS的业务在Storm上稳定运行,需要对Storm了解比较深入才能做到,不然会发现有这样或者那样的问题。
网友评论