前景回顾
在还是2015的时候,公司主推 Storm,当时的流式计算似乎是storm的天下, 而现在 Spark Streaming 和 Flink 似乎成为啦主流, 但是Storm凭借其轻量级、易编程,对于体谅不大的业务来说; 应该是首选方案
应用场景
- 商品、媒体画像数据流
公司前期业务是以营销为主,在合作厂商平台上进行js布码,这样客户有浏览行为和点击行为等操作,就会实时产生数据到数据平台当中,随后经过数据流实时处理,实时生成推荐列表,这样客户就可以实时看到生成推荐的商品,从而达到提高点击率和购买率,达到较好效果体验
- 舆情Sass服务的数据预处理
采集数据写入kafka,而后再经过Storm处理,最终再写入持久化数据库;这样似乎成为啦一个标配
核心组件
- Nimbus: 用来进行资源分配和任务调度的,对任务进行监控
- Supervisor:一个Supervisor对应一个物理机,它是当前机器上的管理者,介绍Nimbus分配的任务,按需来启动自己的Worker,而Workder的数量是能通过配置文件来配置的
- Worker-slots:执行具体的任务的组件,其中任务类型有两种,一种是Spout任务,另一种是Bolt任务,一个Worker中可能有多个Spout任务和Bolt任务;
- Executors: Executors are threads in a Worker process.
- Task:worker中每一个spout/bolt的线程称为一个task。每个Task属于某个组件并发度中的一个,一个Task本质上是一个线程。Task 是 Storm 的最小的执行单位,Task 是逻辑概念,不同于 Worker 和 Executor 需要
创建进程或线程。Task 是需要 Executor 来运行,一个 Executor 可以包含一个或者多个 Task。
用户定义的 Spout 或者 Bolt 都会对应到相应的 Task 上,并由 Executor 来执行相应的在 Spout
或者 Bolt 中定义的业务逻辑。Task 由 Executor 在 mk-executor 通过 defn mk-task
[ executor-data task-id] 调用创建出来。
默认情况下Task=executor=thread(A Task is an instance of a Bolt or Spout. The number of Tasks is almost always equal to the number of Executors.)
并行度的问题
在topo运行时,真正起作用的是以下组件:
- Worker processes
- Executors (threads)
- Tasks
Task是真正的数据执行单元(bolt/spout),一个Executor 可以执行多个同类型的Task,满足:#threads ≤ #tasks
eg:topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2) .setNumTasks(4) .shuffleGrouping("blue-spout");
每个线程执行两个Task
slot的设置
slot(worker) 是真正的工作进程,由于Storm也是基于JVM的;所以一个Java进程涉及的堆内存大小、垃圾回收器的选择也是要面临的问题。默认堆内存大小是768M;所以这个要依据具体的使用场景进行调优;还有一个使用的经验值是尽可能为每个slot 分配小的堆内存,这个整个storm集群所能拥有较多的执行资源,且每一个slot不会过度浪费,或者负载沉重
spout size的设置
比如上有对接的是kafka,某一个topic 具有三个partition,理论上spout size设置只要是大于0即可,但是spout的消费线程是和partition 一一对应的,设置过多也是浪费资源;一般建议设置为分区数目;之前有将 spout size设置为1,会轮询消费kafka 分区,但是会出现分区消费不均匀的情况
max.spout.pending
一个 Spout Task 中处于 pending 状态的最大的 Tuple 数量。该配
置应用于单个Task,而不是整个 Spout或Topology,可在 Topology
中进行覆盖
/**
* The maximum number of tuples that can be pending on a spout task at any given time.
* This config applies to individual tasks, not to spouts or topologies as a whole.
*
* A pending tuple is one that has been emitted from a spout but has not been acked or failed yet.
* Note that this config parameter has no effect for unreliable spouts that don't tag
* their tuples with a message id.
*/
public static final String TOPOLOGY_MAX_SPOUT_PENDING="topology.max.spout.pending";
/**
* A class that implements a strategy for what to do when a spout needs to wait. Waiting is
* triggered in one of two conditions:
*
* 1. nextTuple emits no tuples
* 2. The spout has hit maxSpoutPending and can't emit any more tuples
*/
public static final String TOPOLOGY_SPOUT_WAIT_STRATEGY="topology.spout.wait.strategy";
通常情况下 spout 的发射速度会快于下游的 bolt 的消费速度,当下游的 bolt 还有 TOPOLOGY_MAX_SPOUT_PENDING 个 tuple 没有消费完时,spout 会停下来等待,该配置作用于 spout 的每个 task。
偏移量的设置
public boolean forceFromStart = false;//是否强制从Kafka中offset最小的开始读起
public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();//从何时的offset时间开始读,默认为最旧的offset
以上参数是对于第一次消费kafka topic起效,如果已经消费过;重启topo会从zk的偏移量开始读取
Storm UI当中的一些性能指标
- Complete latency (ms): The average time a Tuple "tree" takes to be completely processed by the Topology. A value of 0 is expected if no acking is done.
- Capacity (last 10m): If this is around 1.0, the corresponding Bolt is running as fast as it can, so you may want to increase the Bolt's parallelism. This is (number executed * average execute latency) / measurement time.
- Execute latency (ms):The average time a Tuple spends in the execute method. The execute method may complete without sending an Ack for the tuple.
- Process latency (ms):The average time it takes to Ack a Tuple after it is first received. Bolts that join, aggregate or batch may not Ack a tuple until a number of other Tuples have been received.
Execute latency,Process latency是处理消息的时效性,而Capacity则表示处理能力是否已经饱和。从这3个参数可以知道Topology的瓶颈所在。
动态加载外部配置文件
对于数据流的开发都会涉及一大堆配置参数,通常需要有一个配置文件,推荐使用yaml;在新版本Storm中;提供啦Flux方式:http://storm.apache.org/releases/1.0.6/flux.html
storm jar mytopology.jar org.apache.storm.flux.Flux --local config.yaml
或者
storm jar mytopology.jar org.apache.storm.flux.Flux --remote config.yaml
在 0.9x 版本中,不想将 配置文件加载到jar包里面,就需要在Topo提交前;读取本地配置文件,进行分发即可
Config config = new Config();
config.setDebug(isDebug);
config.setMaxSpoutPending(maxSpoutPending);
config.setMessageTimeoutSecs(yamlEntity.getMessageTimeout());
config.setNumWorkers(workerNums);
config.setNumAckers(ackNums);
/***
yamlEntity 就是配置文件对应的bean,并且一定要支持序列化
*/
config.put("yamlConfig", JSON.toJSONString(yamlEntity));
/**
*Submit topology
*/
TopologySubmitUtils.submitTopology(yamlEntity.getTopologyName(), config,topologyBuilder.createTopology());
关于预警
毕竟Storm UI是静态的,无法达到预警的目的,所以可以使用rest Api:
集群信息概览:
http://ip:port/api/v1/cluster/summary
{
stormVersion: "0.9.7",
nimbusUptime: "532d 3h 29m 33s",
supervisors: 4,
slotsTotal: 200,
slotsUsed: 88,
slotsFree: 112,
executorsTotal: 423,
tasksTotal: 423
}
Topo列表概览:
http://ip:port/api/v1/topology/summary
{
topologies: [
{
id: ----------",
encodedId: "---------",
name: "-----",
status: "ACTIVE",
uptime: "86d 3h 47m 45s",
tasksTotal: 11,
workersTotal: 5,
executorsTotal: 11
}
]
}
关于代码的一些建议
- execute 方法 中要细化 try{......} catch();不要使用一个大而全的异常捕获,期望是每一个异常的可能;都要有对应的处理策略;
- 可以多加数据埋点;eg:比如异常数据写入Kafka topic;从而达到监控的目的
- 应该考虑容错,也就是对于调用外部服务出现异常情况,要有重试机制,可以增加类似离线数据流的分支
网友评论