美文网首页
rebalance和reassign,读这篇文章就够了

rebalance和reassign,读这篇文章就够了

作者: 泽林呗 | 来源:发表于2018-11-27 16:05 被阅读0次
rebalance

我们知道,在storm中rebalance可以通过ui、命令行、代码的方式来调用,对Topology的worker数进行重新分配。
rebalance通过rebalance(String name, RebalanceOptions options)、recv_rebalance()方法,向nimbus传输数据和接收数据。

public void send_rebalance(String name, RebalanceOptions options) throws TException {
            Nimbus.rebalance_args args = new Nimbus.rebalance_args();
            args.set_name(name);
            args.set_options(options);
            this.sendBase("rebalance", args);
        }

        public void recv_rebalance() throws NotAliveException, InvalidTopologyException, AuthorizationException, TException {
            Nimbus.rebalance_result result = new Nimbus.rebalance_result();
            this.receiveBase(result, "rebalance");
            if (result.e != null) {
                throw result.e;
            } else if (result.ite != null) {
                throw result.ite;
            } else if (result.aze != null) {
                throw result.aze;
            }
        }

在接收到rebalance信号后,Topology由active状态转换为rebalance状态
rebalance: 实际上是调用了 rebalance-transition 函数,从代码可以看出,会将状态改成 rebalancing, 然后再转换成 do-rebalance 。 do-rebalance 其实也是重新分配任务

(defn rebalance-transition [nimbus storm-id status]  
  (fn [time num-workers executor-overrides]  
    (let [delay (if time  
                  time  
                  (get (read-storm-conf (:conf nimbus) storm-id)  
                       TOPOLOGY-MESSAGE-TIMEOUT-SECS))]  
      (delay-event nimbus  
                   storm-id  
                   delay  
                   :do-rebalance)  
      {:type :rebalancing  
       :delay-secs delay  
       :old-status status  
       :num-workers num-workers  
       :executor-overrides executor-overrides  
       })))  

在do-rebalance中调用mk-assignments重新分配任务

(defn do-rebalance [nimbus storm-id status storm-base]
  (let [rebalance-options (:topology-action-options storm-base)]
    (.update-storm! (:storm-cluster-state nimbus)
      storm-id
        (-> {}
          (assoc-non-nil :component->executors (:component->executors rebalance-options))
          (assoc-non-nil :num-workers (:num-workers rebalance-options)))))
  (mk-assignments nimbus :scratch-topology-id storm-id))
什么是mk-assignment

主要就是产生executor->node+port关系, 将executor分配到哪个node的哪个slot上(port代表slot, 一个slot可以run一个worker进程, 一个worker包含多个executor线程)
mk-assignment源码在此处贴出:mk-assignment源码

在mk-assignment中分为以下几个方面

  1. 读出所有active topology信息
  2. 读出当前的assignemnt情况
  3. 根据取到的Topology和Assignement情况, 对当前topology进行新的assignment
Storm的可靠性保证

其实Storm本身已经提供了该问题可靠性保证。大致的原理是:
spout发出的所有数据,都有一个acker对其进行追踪,无论处理成功、失败或者超时,都会告知spout。如果spout发现消息处理失败或丢失,则会重新发送该消息。
结合Topology rebalance的过程,首先de-active,这时候topology的状态被保存。未被处理的消息由acker追踪。
当topology重新分配后,spout发现已发出的消息未被处理,则重新发射这些消息。


reassign

在mk-assignment中的第三步,找出missing-assignment-topologies, 需要从新assign (当前逻辑没有用到, 在sechduler里面会自己判断(判断逻辑相同))
什么叫missing-assignment, 满足下面任一条件

  • topology->executors, 其中没有该topolgy, 说明该topology没有assignment信息, 新的或scratch
  • topology->executors != topology->alive-executors, 说明有executor dead
  • topology->scheduler-assignment中的实际worker数小于topology配置的worker数 (可能上次assign的时候可用slot不够, 也可能由于dead slot造成)
        missing-assignment-topologies (->> topologies
                                           .getTopologies
                                           (map (memfn getId))
                                           (filter (fn [t]
                                                     (let [alle (get topology->executors t)
                                                           alivee (get topology->alive-executors t)]
                                                       (or (empty? alle)
                                                           (not= alle alivee)
                                                           (< (-> topology->scheduler-assignment
                                                                  (get t)
                                                                  num-used-workers )
                                                              (-> topologies (.getById t) .getNumWorkers)))))))

supervisor会定时从zookeeper获取topologies、已分配的任务分配信息assignments及各类心跳信息,以此为依据进行任务分配。
在supervisor周期性地进行同步时,会根据新的任务分配来启动新的worker或者关闭旧的worker,以响应任务分配和负载均衡。
worker通过定期的更新connections信息,来获知其应该通讯的其它worker。

详见Lifecycle-of-a-topology.md

- Nimbus monitors the topology during its lifetime
   - Schedules recurring task on the timer thread to check the topologies [code](https://github.com/apache/storm/blob/0.7.1/src/clj/org/apache/storm/daemon/nimbus.clj#L623)
   - Nimbus's behavior is represented as a finite state machine [code](https://github.com/apache/storm/blob/0.7.1/src/clj/org/apache/storm/daemon/nimbus.clj#L98)
   - The "monitor" event is called on a topology every "nimbus.monitor.freq.secs", which calls `reassign-topology` through `reassign-transition` [code](https://github.com/apache/storm/blob/0.7.1/src/clj/org/apache/storm/daemon/nimbus.clj#L497)
   - `reassign-topology` calls `mk-assignments`, the same function used to assign the topology the first time. `mk-assignments` is also capable of incrementally updating a topology
      - `mk-assignments` checks heartbeats and reassigns workers as necessary
      - Any reassignments change the state in ZK, which will trigger supervisors to synchronize and start/stop workers
所以关于reassign,其实就是nimbus重新调用了mk-assignment,并且根据负载均衡重新分配任务。

参考文章:

mk-assignment源码解析
storm如何分配任务和负载均衡?
Storm中Topology的状态

相关文章

网友评论

      本文标题:rebalance和reassign,读这篇文章就够了

      本文链接:https://www.haomeiwen.com/subject/pmucfqtx.html