美文网首页
回顾下storm的一些知识点

回顾下storm的一些知识点

作者: A_You | 来源:发表于2019-06-07 21:46 被阅读0次

    前景回顾

    在还是2015的时候,公司主推 Storm,当时的流式计算似乎是storm的天下, 而现在 Spark Streaming 和 Flink 似乎成为啦主流, 但是Storm凭借其轻量级、易编程,对于体谅不大的业务来说; 应该是首选方案

    应用场景

    • 商品、媒体画像数据流
      公司前期业务是以营销为主,在合作厂商平台上进行js布码,这样客户有浏览行为和点击行为等操作,就会实时产生数据到数据平台当中,随后经过数据流实时处理,实时生成推荐列表,这样客户就可以实时看到生成推荐的商品,从而达到提高点击率和购买率,达到较好效果体验
    • 舆情Sass服务的数据预处理
      采集数据写入kafka,而后再经过Storm处理,最终再写入持久化数据库;这样似乎成为啦一个标配

    核心组件

    • Nimbus: 用来进行资源分配和任务调度的,对任务进行监控
    • Supervisor:一个Supervisor对应一个物理机,它是当前机器上的管理者,介绍Nimbus分配的任务,按需来启动自己的Worker,而Workder的数量是能通过配置文件来配置的
    • Worker-slots:执行具体的任务的组件,其中任务类型有两种,一种是Spout任务,另一种是Bolt任务,一个Worker中可能有多个Spout任务和Bolt任务;
    • Executors: Executors are threads in a Worker process.
    • Task:worker中每一个spout/bolt的线程称为一个task。每个Task属于某个组件并发度中的一个,一个Task本质上是一个线程。Task 是 Storm 的最小的执行单位,Task 是逻辑概念,不同于 Worker 和 Executor 需要
      创建进程或线程。Task 是需要 Executor 来运行,一个 Executor 可以包含一个或者多个 Task。
      用户定义的 Spout 或者 Bolt 都会对应到相应的 Task 上,并由 Executor 来执行相应的在 Spout
      或者 Bolt 中定义的业务逻辑。Task 由 Executor 在 mk-executor 通过 defn mk-task
      [ executor-data task-id] 调用创建出来。
      默认情况下Task=executor=thread(A Task is an instance of a Bolt or Spout. The number of Tasks is almost always equal to the number of Executors.)

    并行度的问题

    在topo运行时,真正起作用的是以下组件:

    • Worker processes
    • Executors (threads)
    • Tasks
      Task是真正的数据执行单元(bolt/spout),一个Executor 可以执行多个同类型的Task,满足:#threads ≤ #tasks
      eg:topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2) .setNumTasks(4) .shuffleGrouping("blue-spout");
      每个线程执行两个Task

    slot的设置

    slot(worker) 是真正的工作进程,由于Storm也是基于JVM的;所以一个Java进程涉及的堆内存大小、垃圾回收器的选择也是要面临的问题。默认堆内存大小是768M;所以这个要依据具体的使用场景进行调优;还有一个使用的经验值是尽可能为每个slot 分配小的堆内存,这个整个storm集群所能拥有较多的执行资源,且每一个slot不会过度浪费,或者负载沉重

    spout size的设置

    比如上有对接的是kafka,某一个topic 具有三个partition,理论上spout size设置只要是大于0即可,但是spout的消费线程是和partition 一一对应的,设置过多也是浪费资源;一般建议设置为分区数目;之前有将 spout size设置为1,会轮询消费kafka 分区,但是会出现分区消费不均匀的情况

    max.spout.pending

    一个 Spout Task 中处于 pending 状态的最大的 Tuple 数量。该配
    置应用于单个Task,而不是整个 Spout或Topology,可在 Topology
    中进行覆盖

    /**
         * The maximum number of tuples that can be pending on a spout task at any given time. 
         * This config applies to individual tasks, not to spouts or topologies as a whole. 
         * 
         * A pending tuple is one that has been emitted from a spout but has not been acked or failed yet.
         * Note that this config parameter has no effect for unreliable spouts that don't tag 
         * their tuples with a message id.
         */
        public static final String TOPOLOGY_MAX_SPOUT_PENDING="topology.max.spout.pending"; 
     /**
         * A class that implements a strategy for what to do when a spout needs to wait. Waiting is
         * triggered in one of two conditions:
         * 
         * 1. nextTuple emits no tuples
         * 2. The spout has hit maxSpoutPending and can't emit any more tuples
         */
        public static final String TOPOLOGY_SPOUT_WAIT_STRATEGY="topology.spout.wait.strategy"; 
    

    通常情况下 spout 的发射速度会快于下游的 bolt 的消费速度,当下游的 bolt 还有 TOPOLOGY_MAX_SPOUT_PENDING 个 tuple 没有消费完时,spout 会停下来等待,该配置作用于 spout 的每个 task。

    偏移量的设置

     public boolean forceFromStart = false;//是否强制从Kafka中offset最小的开始读起
     public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();//从何时的offset时间开始读,默认为最旧的offset
    

    以上参数是对于第一次消费kafka topic起效,如果已经消费过;重启topo会从zk的偏移量开始读取

    Storm UI当中的一些性能指标

    • Complete latency (ms): The average time a Tuple "tree" takes to be completely processed by the Topology. A value of 0 is expected if no acking is done.
    • Capacity (last 10m): If this is around 1.0, the corresponding Bolt is running as fast as it can, so you may want to increase the Bolt's parallelism. This is (number executed * average execute latency) / measurement time.
    • Execute latency (ms):The average time a Tuple spends in the execute method. The execute method may complete without sending an Ack for the tuple.
    • Process latency (ms):The average time it takes to Ack a Tuple after it is first received. Bolts that join, aggregate or batch may not Ack a tuple until a number of other Tuples have been received.

    Execute latency,Process latency是处理消息的时效性,而Capacity则表示处理能力是否已经饱和。从这3个参数可以知道Topology的瓶颈所在。

    动态加载外部配置文件

    对于数据流的开发都会涉及一大堆配置参数,通常需要有一个配置文件,推荐使用yaml;在新版本Storm中;提供啦Flux方式:http://storm.apache.org/releases/1.0.6/flux.html

    storm jar mytopology.jar org.apache.storm.flux.Flux --local config.yaml
    或者
    storm jar mytopology.jar org.apache.storm.flux.Flux --remote config.yaml
    

    在 0.9x 版本中,不想将 配置文件加载到jar包里面,就需要在Topo提交前;读取本地配置文件,进行分发即可

    Config config = new Config();
    config.setDebug(isDebug);
    config.setMaxSpoutPending(maxSpoutPending);
    config.setMessageTimeoutSecs(yamlEntity.getMessageTimeout());
    config.setNumWorkers(workerNums);
    config.setNumAckers(ackNums);
    /***
    yamlEntity 就是配置文件对应的bean,并且一定要支持序列化
    */
    config.put("yamlConfig", JSON.toJSONString(yamlEntity));
    /**
      *Submit topology
      */
     TopologySubmitUtils.submitTopology(yamlEntity.getTopologyName(), config,topologyBuilder.createTopology());
    
    

    关于预警

    毕竟Storm UI是静态的,无法达到预警的目的,所以可以使用rest Api:

    集群信息概览:
    http://ip:port/api/v1/cluster/summary
    {
    stormVersion: "0.9.7",
    nimbusUptime: "532d 3h 29m 33s",
    supervisors: 4,
    slotsTotal: 200,
    slotsUsed: 88,
    slotsFree: 112,
    executorsTotal: 423,
    tasksTotal: 423
    }
    Topo列表概览:
    http://ip:port/api/v1/topology/summary
    {
    topologies: [
    {
    id: ----------",
    encodedId: "---------",
    name: "-----",
    status: "ACTIVE",
    uptime: "86d 3h 47m 45s",
    tasksTotal: 11,
    workersTotal: 5,
    executorsTotal: 11
    }
    ]
    }
    

    关于代码的一些建议

    • execute 方法 中要细化 try{......} catch();不要使用一个大而全的异常捕获,期望是每一个异常的可能;都要有对应的处理策略;
    • 可以多加数据埋点;eg:比如异常数据写入Kafka topic;从而达到监控的目的
    • 应该考虑容错,也就是对于调用外部服务出现异常情况,要有重试机制,可以增加类似离线数据流的分支

    相关文章

      网友评论

          本文标题:回顾下storm的一些知识点

          本文链接:https://www.haomeiwen.com/subject/nknpxctx.html