美文网首页
brpc之负载均衡源码分析

brpc之负载均衡源码分析

作者: fooboo | 来源:发表于2019-10-06 11:11 被阅读0次

    在分析bthread相关实现时,从使用中发现有LoadBalancerWithNaming类的使用,这块也比较独立,所以先分析这块负载均衡,命名服务下次再分析。

    在brpc/policy/目录下有一些具体的均衡策略的实现,这里只介绍下RoundRobinLoadBalancer/LocalityAwareLoadBalancer/ConsistentHashingLoadBalancer,都继承自LoadBalancer

    LoadBalancer

     35 // Select a server from a set of servers (in form of ServerId).
     36 class LoadBalancer : public NonConstDescribable, public Destroyable {
     78     virtual bool AddServer(const ServerId& server) = 0;
     82     virtual bool RemoveServer(const ServerId& server) = 0;
     96     virtual int SelectServer(const SelectIn& in, SelectOut* out) = 0;
     97     //more code...
    107 protected:
    108     virtual ~LoadBalancer() { }
    109 };
    

    RoundRobinLoadBalancer

     32 // This LoadBalancer selects server evenly. Selected numbers of servers(added
     33 // at the same time) are very close.
     34 class RoundRobinLoadBalancer : public LoadBalancer {
     35 public:
     36     bool AddServer(const ServerId& id);
     37     bool RemoveServer(const ServerId& id);
     40     int SelectServer(const SelectIn& in, SelectOut* out);
     41     RoundRobinLoadBalancer* New(const butil::StringPiece&) const;
     45 private:
     46     struct Servers {
     47         std::vector<ServerId> server_list;
     48         std::map<ServerId, size_t> server_map;
     49     };  
     50     struct TLS {
     51         TLS() : stride(0), offset(0) { }
     52         uint32_t stride;
     53         uint32_t offset;
     54     };  
     55     //more code...
     61     butil::DoublyBufferedData<Servers, TLS> _db_servers;
     63 };
    
     81 bool RoundRobinLoadBalancer::AddServer(const ServerId& id) {
     82     return _db_servers.Modify(Add, id);
     83 }
     84 
     85 bool RoundRobinLoadBalancer::RemoveServer(const ServerId& id) {
     86     return _db_servers.Modify(Remove, id);
     87 }
    
     37 bool RoundRobinLoadBalancer::Add(Servers& bg, const ServerId& id) {
     38     if (bg.server_list.capacity() < 128) {
     39         bg.server_list.reserve(128);
     40     }
     41     std::map<ServerId, size_t>::iterator it = bg.server_map.find(id);
     42     if (it != bg.server_map.end()) {
     43         return false;
     44     }
     45     bg.server_map[id] = bg.server_list.size();
     46     bg.server_list.push_back(id);
     47     return true;
     48 }
     49 
     50 bool RoundRobinLoadBalancer::Remove(Servers& bg, const ServerId& id) {
     51     std::map<ServerId, size_t>::iterator it = bg.server_map.find(id);
     52     if (it != bg.server_map.end()) {
     53         const size_t index = it->second;
     54         bg.server_list[index] = bg.server_list.back();
     55         bg.server_map[bg.server_list[index]] = index;
     56         bg.server_list.pop_back();
     57         bg.server_map.erase(it);
     58         return true;
     59     }
     60     return false;
     61 }
    

    以上几个成员函数实现,都通过DoublyBufferedData去更新,这样在后台更新最新的数据,这里之前分析过DoublyBufferedData原理多线程偏底层的性能优化思考

    这里要注意一下,虽然前台数据会被读,后台数据会被写,但是粒度不一样,对于具体的权重Weight类型,读的时候也会修改里面的一些成员变量,所以从大的方向看,前后台共享的这个会在任何一方被修改,但是一些结构都是独立的,互不影响,比如:

    148     butil::atomic<int64_t> _total;
    149     butil::DoublyBufferedData<Servers> _db_servers;
    150     std::deque<int64_t> _left_weights;
    151     ServerId2SocketIdMapper _id_mapper;
    
    116     class Servers {
    117     public:
    118         std::vector<ServerInfo> weight_tree;
    119         butil::FlatMap<SocketId, size_t> server_map;
    120         //more code...
    128     };
    

    主要的还是挑选出一些可用的server,跟其他策略区别在这里:

    107 int RoundRobinLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) {
    108     butil::DoublyBufferedData<Servers, TLS>::ScopedPtr s;
    109     if (_db_servers.Read(&s) != 0) {
    110         return ENOMEM;
    111     }
    112     const size_t n = s->server_list.size();
    113     if (n == 0) {
    114         return ENODATA;
    115     }
    116     //more code...
    121     TLS tls = s.tls();
    122     if (tls.stride == 0) {
    123         tls.stride = GenRandomStride();
    124         tls.offset = 0;
    125     }
    126 
    127     for (size_t i = 0; i < n; ++i) {
    128         tls.offset = (tls.offset + tls.stride) % n;
    129         const SocketId id = s->server_list[tls.offset].id;
    130         if (((i + 1) == n  // always take last chance
    131              || !ExcludedServers::IsExcluded(in.excluded, id))
    132             && Socket::Address(id, out->ptr) == 0
    133             && (*out->ptr)->IsAvailable()) {
    134             s.tls() = tls;
    135             return 0;
    136         }
    137     }   
    138     //more code...   
    141     s.tls() = tls;
    142     return EHOSTDOWN;
    143 }
    

    以上实现通过ScopedPtr读前台数据,此时会BeginRead加锁防止后台线程更新导致不正确,读完后析构调用EndRead。这里根据TLS去记录上次选择的,这次从哪儿开始,因为是rr算法,总是选择列表中的下一台服务器,结尾的下一台是开头,所以机会平等,然后选择一个可用的server返回,这里对一些健康检查过程跳过。

    LocalityAwareLoadBalancer

    LocalityAwareLoadBalancer是优先选择延时低的下游,直到其延时高于其他机器,即“以下游节点的吞吐除以延时作为分流权值”。

    由于相关的算法实现有过多的注释,这里就直接删除,有兴趣的直接定位到相关源码。

     53 bool LocalityAwareLoadBalancer::Add(Servers& bg, const Servers& fg,
     54                                     SocketId id,
     55                                     LocalityAwareLoadBalancer* lb) {
     56     if (bg.weight_tree.capacity() < INITIAL_WEIGHT_TREE_SIZE) {
     57         bg.weight_tree.reserve(INITIAL_WEIGHT_TREE_SIZE);
     58     }
     59     if (bg.server_map.seek(id) != NULL) {
     60         // The id duplicates.
     61         return false;
     62     }
     63     const size_t* pindex = fg.server_map.seek(id);
     64     if (pindex == NULL) {
     69         const size_t index = bg.weight_tree.size();
     74         int64_t initial_weight = WEIGHT_SCALE;
     75         if (!bg.weight_tree.empty()) {
     76             initial_weight = lb->_total.load(butil::memory_order_relaxed)
     77                 / bg.weight_tree.size();//取平均值
     78         }
     82         bg.server_map[id] = index;//新增节点
     83 
     87         ServerInfo info = { id, lb->PushLeft(), new Weight(initial_weight) };//构造权重结构
     88         bg.weight_tree.push_back(info);
     89 
     93         const int64_t diff = info.weight->volatile_value();
     94         if (diff) {
     95             bg.UpdateParentWeights(diff, index);//更新权重二叉树
     96             lb->_total.fetch_add(diff, butil::memory_order_relaxed);//累加_total
     97         }
     98     } else {
    101         bg.server_map[id] = bg.weight_tree.size();
    102         bg.weight_tree.push_back(fg.weight_tree[*pindex]);
    103     }
    104     return true;
    105 }
    
    110     struct ServerInfo {
    111         SocketId server_id;
    112         butil::atomic<int64_t>* left;
    113         Weight* weight;
    114     };
    
    227 bool LocalityAwareLoadBalancer::AddServer(const ServerId& id) {
    228     if (_id_mapper.AddServer(id)) {//第一次添加该节点
    230         return _db_servers.ModifyWithForeground(Add, id.id, this);
    231     } else {
    232         return true;
    233     }
    234 }
    

    以上实现是新增加一个服务节点信息,此时会更新后台节点及权重信息。如连接中说明,“LALB的查找过程是按权值分流,O(N)方法如下:获得所有权值的和total,产生一个间于[0, total-1]的随机数R,逐个遍历权值,直到当前权值之和不大于R,而下一个权值之和大于R。这个方法可以工作,也好理解,但当N达到几百时性能已经很差,这儿的主要因素是cache一致性:LALB是一个基于反馈的算法,RPC结束时信息会被反馈入LALB,被遍历的数据结构也一直在被修改。这意味着前台的O(N)读必须刷新每一行cacheline。当N达到数百时,一次查找过程可能会耗时百微秒,更别提更大的N了,LALB(将)作为brpc的默认分流算法,这个性能开销是无法接受的。

    另一个办法是用完全二叉树。每个节点记录了左子树的权值之和,这样我们就能在O(logN)时间内完成查找。当N为1024时,我们最多跳转10次内存,总耗时可控制在1微秒内,这个性能是可接受的。”

    回到增加节点的源码实现,因为ModifyWithForeground会首先更新后台数据:

    411 template <typename T, typename TLS>
    412 template <typename Fn, typename Arg1, typename Arg2>
    413 size_t DoublyBufferedData<T, TLS>::ModifyWithForeground(
    414     Fn& fn, const Arg1& arg1, const Arg2& arg2) {
    415     WithFG2<Fn, Arg1, Arg2> c(fn, _data, arg1, arg2);
    416     return Modify(c);
    417 }
    
    131     template <typename Fn, typename Arg1, typename Arg2>
    132     struct WithFG2 {
    133         WithFG2(Fn& fn, T* data, const Arg1& arg1, const Arg2& arg2)
    134             : _fn(fn), _data(data), _arg1(arg1), _arg2(arg2) {}
    135         size_t operator()(T& bg) {
    136             return _fn(bg, (const T&)_data[&bg == _data], _arg1, _arg2);
    137         }
    138     private:
    139         Fn& _fn;
    140         T* _data;
    141         const Arg1& _arg1;
    142         const Arg2& _arg2;
    143     };
    
    353     int bg_index = !_index.load(butil::memory_order_relaxed);
    356     const size_t ret = fn(_data[bg_index]);//更新后台
    365     _index.store(bg_index, butil::memory_order_release);//更新最新index
    366     bg_index = !bg_index;//旧前台
    368     //block writer...  //直到所有读线程不再读旧前台
    377     const size_t ret2 = fn(_data[bg_index]);//更新旧前台
    

    一开始对于新增加的节点,前台数据肯定是空的,第一次更新时,bg是的index为1,fg为0,所以pindex == NULL,然后bg.weight_tree.push_back(info),以所在的index,即vector数组的最新位置作为index,即bg.server_map[id] = index,并以当前节点中的平均权值作为initial_weight

    110     struct ServerInfo {
    111         SocketId server_id;
    112         butil::atomic<int64_t>* left;
    113         Weight* weight;
    114     };
    
    141     // Add a entry to _left_weights.
    142     butil::atomic<int64_t>* PushLeft() {
    143         _left_weights.push_back(0);
    144         return (butil::atomic<int64_t>*)&_left_weights.back();
    145     }
    

    最后累加权重以及更新完全二叉树的权重和:

    154 inline void LocalityAwareLoadBalancer::Servers::UpdateParentWeights(
    155     int64_t diff, size_t index) const {
    156     while (index != 0) {
    157         const size_t parent_index = (index - 1) >> 1;
    158         if ((parent_index << 1) + 1 == index) {  // left child
    159             weight_tree[parent_index].left->fetch_add(
    160                 diff, butil::memory_order_relaxed);
    161         }   
    162         index = parent_index;
    163     }   
    164 }
    

    记录左子树的权重和,举个简单例子,有vector<int> value = {1,4,5,7,8,15,20},因为7/8/15/20为叶子节点,那么weight[3~6]=0,weight[1]=7,weight[2]=15,weight[0]=19。

    然后再更新旧前台时,此时index为0,bg是的index为0,fg为1,所以pindex != NULL,直接用新前台的数据拷贝到旧前台,这里会共用Weight内存,后面删除时要小心。

    以下是删除一个节点,这块实现是挺复杂的:

    236 bool LocalityAwareLoadBalancer::RemoveServer(const ServerId& id) {
    237     if (_id_mapper.RemoveServer(id)) {
    239         return _db_servers.Modify(Remove, id.id, this);
    240     } else {
    241         return true;
    242     }
    243 }
    
    107 bool LocalityAwareLoadBalancer::Remove(
    108     Servers& bg, SocketId id, LocalityAwareLoadBalancer* lb) {
    109     size_t* pindex = bg.server_map.seek(id);
    110     if (NULL == pindex) {
    111         // The id does not exist.
    112         return false;
    113     }
    114     // Save the index and remove mapping from id to the index.
    115     const size_t index = *pindex;
    116     bg.server_map.erase(id);
    117 
    118     Weight* w = bg.weight_tree[index].weight;
    122     const int64_t rm_weight = w->Disable();
    123     if (index + 1 == bg.weight_tree.size()) {
    124         // last node. Removing is eaiser.
    125         bg.weight_tree.pop_back();
    126         if (rm_weight) {
    132             int64_t diff = -rm_weight;
    133             bg.UpdateParentWeights(diff, index);
    134             lb->_total.fetch_add(diff, butil::memory_order_relaxed);
    135         } else {
    136             // the second buffer. clean left stuff.
    137             delete w;
    138             lb->PopLeft();
    139         }
    140     } else {
    141         // Move last node to position `index' to fill the space.
    142         bg.weight_tree[index].server_id = bg.weight_tree.back().server_id;
    143         bg.weight_tree[index].weight = bg.weight_tree.back().weight;
    144         bg.server_map[bg.weight_tree[index].server_id] = index;
    145         bg.weight_tree.pop_back();
    146 
    147         Weight* w2 = bg.weight_tree[index].weight;  // previously back()
    148         if (rm_weight) {
    159             const int64_t add_weight = w2->MarkOld(bg.weight_tree.size());
    160 
    164             const int64_t diff = add_weight - rm_weight;
    165             if (diff) {
    166                 bg.UpdateParentWeights(diff, index);
    167                 lb->_total.fetch_add(diff, butil::memory_order_relaxed);
    168             }
    171         } else {
    175             const std::pair<int64_t, int64_t> p = w2->ClearOld();
    176             // Add the diff to parent nodes of node `index'
    177             const int64_t diff = p.second;
    178             if (diff) {
    179                 bg.UpdateParentWeights(diff, index);
    180             }
    181             // Remove weight from parent nodes of last node.
    182             int64_t old_weight = - p.first - p.second;
    183             if (old_weight) {
    184                 bg.UpdateParentWeights(old_weight, bg.weight_tree.size());
    185             }
    186             lb->_total.fetch_add(- p.first, butil::memory_order_relaxed);
    187             // Clear resources.
    188             delete w;
    189             lb->PopLeft();
    190         }
    191     }
    192     return true;
    193 }
    

    先更新后台数据,获取对应index上的Weight* w,并进行Disable,这样后面rpc返回时更新权重延时信息时,直接跳过。如果是最后一个节点则删除,即bg.weight_tree.pop_back()。在该节点权重不为0的情况下(第一次删除,此时前台还在使用中),因为后台存在的话前台肯定也存在,所以此时不能直接delete w,先进行调整权重树:

    126         if (rm_weight) {
    132             int64_t diff = -rm_weight;
    133             bg.UpdateParentWeights(diff, index);
    134             lb->_total.fetch_add(diff, butil::memory_order_relaxed);
    135         } else {
    137             delete w;
    138             lb->PopLeft();
    139         }
    

    当更新完后台时,再更新旧前台,对于旧前台来说,上述逻辑还是执行一遍,但此时rm_weight为0,可以安全直接的释放。对于最后一个节点的删除,调整权重树很简单,只要对相应的权重沿着某节点删除对应的权重值即可。

    当要删除的节点非最后一个时,一切变的有些复杂起来,当然也不难。对于后台数据更新时,这里实现是把最后一个节点覆盖到index,并删除最后一个节点,此时要调整权重树(add_weight - rm_weight)。并记录删除的位置待更新前台时再特殊处理,中间还是要考虑更新权重时的情况:

    563 int64_t LocalityAwareLoadBalancer::Weight::MarkOld(size_t index) {
    564     BAIDU_SCOPED_LOCK(_mutex);
    565     const int64_t saved = _weight;
    566     _old_weight = saved;//删除时的权重
    567     _old_diff_sum = 0;//重置,用于删除时后面变化的
    568     _old_index = index;//删除的位置
    569     return saved;
    570 }
    
    555 int64_t LocalityAwareLoadBalancer::Weight::Disable() {
    556     BAIDU_SCOPED_LOCK(_mutex);
    557     const int64_t saved = _weight;
    558     _base_weight = -1;
    559     _weight = 0;
    560     return saved;
    561 }
    

    rm_weight不为0时即更新后台数据,此时前台还在使用中,故不能delete它,这里通过w记录旧的权重和索引等数据,如上面的代码实现。如注释说明:

    150 // We need to remove the weight of last node from its parent
    151 // nodes and add the weight to parent nodes of node `index'.
    152 // However this process is not atomic. The foreground buffer still
    153 // sees w2 as last node and it may change the weight during the
    154 // process. To solve this problem, we atomically reset the weight
    155 // and remember the preivous index (back()) in _old_index. Later
    156 // change to weight will add the diff to _old_diff_sum if _old_index
    157 // matches the index which SelectServer is from. In this way we
    158 // know the weight diff from foreground before we laterly modify it.
    

    对于前台数据来说,此时还是一棵权重二叉树,符合完全二叉树的权重和,但因后台要删除非最后一个节点,若直接修改与前台共享的w2权重数据则会破坏权重和,导致执行SelectServer时选出错误的节点。此时前台看到的w2还是完整的。这里在原来的基础之前再加上add_weight - rm_weight,但这一步为了不破坏权重和会在后面更新旧前台时删除。

    161  // Add the weight diff to parent nodes of node `index'. Notice
    162 // that we don't touch parent nodes of last node here because
    163 // foreground is still sending traffic to last node.
    

    当更新旧前台时,因在之前执行过Disable则会返回rm_weight为0:

    572 std::pair<int64_t, int64_t> LocalityAwareLoadBalancer::Weight::ClearOld() {
    573     BAIDU_SCOPED_LOCK(_mutex);
    574     const int64_t old_weight = _old_weight;
    575     const int64_t diff = _old_diff_sum;
    576     _old_diff_sum = 0;
    577     _old_index = (size_t)-1;
    578     _old_weight = 0;
    579     return std::make_pair(old_weight, diff);
    580 }
    

    接着w2->ClearOld,并更新权重树,这里实现的原因是在其他地方比如SelectServer会修改Weight内部数据。因为之前修改后台时,因可能破坏前台的权重树结构,并没有更新last node的父节点们的权重,故这里需要把之前该做的情况处理掉:

    173             // Reset _old_* fields and get the weight change by SelectServer()
    174             // after MarkOld().
    175             const std::pair<int64_t, int64_t> p = w2->ClearOld();
    176             // Add the diff to parent nodes of node `index'
    177             const int64_t diff = p.second;
    178             if (diff) {
    179                 bg.UpdateParentWeights(diff, index);
    180             }
    181             // Remove weight from parent nodes of last node.
    182             int64_t old_weight = - p.first - p.second;
    183             if (old_weight) {
    184                 bg.UpdateParentWeights(old_weight, bg.weight_tree.size());
    185             }
    186             lb->_total.fetch_add(- p.first, butil::memory_order_relaxed);
    187             // Clear resources.
    188             delete w;
    189             lb->PopLeft();
    
    266 int LocalityAwareLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) {
    267     butil::DoublyBufferedData<Servers>::ScopedPtr s;
    268     if (_db_servers.Read(&s) != 0) {
    269         return ENOMEM;
    270     }
    271     const size_t n = s->weight_tree.size();
    272     if (n == 0) {
    273         return ENODATA;
    274     }
    275     size_t ntry = 0;
    276     size_t nloop = 0;
    277     int64_t total = _total.load(butil::memory_order_relaxed);
    278     int64_t dice = butil::fast_rand_less_than(total);
    279     size_t index = 0;
    280     int64_t self = 0;
    281     while (total > 0) {
    285         if (++nloop > 10000) {//防止死循环
    287             return EHOSTDOWN;
    288         }
    293         const ServerInfo & info = s->weight_tree[index];
    294         const int64_t left = info.left->load(butil::memory_order_relaxed);
    295         if (dice < left) {//左边
    296             index = index * 2 + 1;
    297             if (index < n) {
    298                 continue;
    299             }
    300         } else if (dice >= left + (self = info.weight->volatile_value())) {//右边
    301             dice -= left + self;
    302             index = index * 2 + 2;
    303             if (index < n) {
    304                 continue;
    305             }
    306         } else if (Socket::Address(info.server_id, out->ptr) == 0
    307                    && (*out->ptr)->IsAvailable()) {
    308             //more code...
    328         } else if (in.changable_weights) {
    329             //more code...
    345         }
    346         total = _total.load(butil::memory_order_relaxed);
    347         dice = butil::fast_rand_less_than(total);
    348         index = 0;
    349     }
    350     return EHOSTDOWN;
    351 }
    

    SelectServer根据随机出来的dice来判断落在哪个节点上,因为weight_tree权重和树的结构如上面解释到,如果dice < left则会在左边,如果dice >= left + (self = info.weight->volatile_value())在右边,并更新相应的indexdice,否则就是当前节点,即 left <= dice < left + self

    接着两个if判断处理找到节点的情况,考虑正常情况,若该节点可用状态:

    306 if (Socket::Address(info.server_id, out->ptr) == 0 && (*out->ptr)->IsAvailable()) {
    308     if ((ntry + 1) == n  || !ExcludedServers::IsExcluded(in.excluded, info.server_id)) {
    

    若需要修改权重(是把请求发往该节点后,降低它的权重?后期再更新权重?后面再回答这个问题)则会:

    306         } else if (Socket::Address(info.server_id, out->ptr) == 0
    307                    && (*out->ptr)->IsAvailable()) {
    308             if ((ntry + 1) == n  // Instead of fail with EHOSTDOWN, we prefer
    309                                  // choosing the server again.
    310                 || !ExcludedServers::IsExcluded(in.excluded, info.server_id)) {
    311                 if (!in.changable_weights) {
    312                     return 0;
    313                 }
    314                 const Weight::AddInflightResult r =
    315                     info.weight->AddInflight(in, index, dice - left);
    316                 if (r.weight_diff) {
    317                     s->UpdateParentWeights(r.weight_diff, index);
    318                     _total.fetch_add(r.weight_diff, butil::memory_order_relaxed);
    319                 }
    320                 if (r.chosen) { 
    321                     out->need_feedback = true;
    322                     return 0;
    323                 } 
    324             }
    325             if (++ntry >= n) {
    326                 break;
    327             }
    
    190 inline LocalityAwareLoadBalancer::Weight::AddInflightResult
    191 LocalityAwareLoadBalancer::Weight::AddInflight(
    192     const SelectIn& in, size_t index, int64_t dice) {
    193     BAIDU_SCOPED_LOCK(_mutex);
    194     if (Disabled()) {
    195         AddInflightResult r = { false, 0 };
    196         return r;
    197     }
    198     const int64_t diff = ResetWeight(index, in.begin_time_us);
    199     if (_weight < dice) {
    200         // inflight delay makes the weight too small to choose.
    201         AddInflightResult r = { false, diff };
    202         return r;
    203     }
    204     _begin_time_sum += in.begin_time_us;
    205     ++_begin_time_count;
    206     AddInflightResult r = { true, diff };//需要rpc返回时的反馈信息
    207     return r;
    208 }
    
    166 inline int64_t LocalityAwareLoadBalancer::Weight::ResetWeight(
    167     size_t index, int64_t now_us) {
    168     int64_t new_weight = _base_weight;
    169     if (_begin_time_count > 0) {
    170         const int64_t inflight_delay =
    171             now_us - _begin_time_sum / _begin_time_count;
    172         const int64_t punish_latency = 
    173             (int64_t)(_avg_latency * FLAGS_punish_inflight_ratio);
    174         if (inflight_delay >= punish_latency && _avg_latency > 0) {
    175             new_weight = new_weight * punish_latency / inflight_delay;
    176         }
    177     }
    178     if (new_weight < FLAGS_min_weight) {
    179         new_weight = FLAGS_min_weight;
    180     }           
    181     const int64_t old_weight = _weight;
    182     _weight = new_weight;
    183     const int64_t diff = new_weight - old_weight;
    184     if (_old_index == index && diff != 0) {
    185         _old_diff_sum += diff;//累加变化的
    186     }
    187     return diff;
    188 }
    

    当某个节点处于被删除状态时,即被删除时执行Disabled返回,直接进行下一次选择,否则进行权重的更新ResetWeight。因为如本文尾说的,比如节点刚启动,不可能一直把请求流量都分到该节点,有些没有响应的rpc请求超时后,是需要更新权重的,这块后面再详细分析。

    接着分析SelectServer,如果选择到某个节点但该节点不可用则尝试进行权重降低_base_weight到一定的值:

    328         } else if (in.changable_weights) {
    329             const int64_t diff =
    330                 info.weight->MarkFailed(index, total / n);
    331             if (diff) { 
    332                 s->UpdateParentWeights(diff, index);
    333                 _total.fetch_add(diff, butil::memory_order_relaxed);
    334             }
    335             if (dice >= left + self + diff) {
    336                 dice -= left + self + diff;
    337                 index = index * 2 + 2;
    338                 if (index < n) {
    339                     continue;
    340                 }
    341             }
    342             if (++ntry >= n) {
    343                 break;
    344             }
    345         }
    
    210 inline int64_t LocalityAwareLoadBalancer::Weight::MarkFailed(
    211     size_t index, int64_t avg_weight) {
    212     BAIDU_SCOPED_LOCK(_mutex);
    213     if (_base_weight <= avg_weight) {
    214         return 0;   
    215     }           
    216     _base_weight = avg_weight;
    217     return ResetWeight(index, 0);
    218 }
    

    在controller实现中,每当完成一个rpc请求时[不管是服务器返回的失败还是超时返回的失败],若需要反馈给lb模块则:

     784     if (need_feedback) {
     785         const LoadBalancer::CallInfo info =
     786             { begin_time_us, peer_id, error_code, c };
     787         c->_lb->Feedback(info);
     788     }
    
    353 void LocalityAwareLoadBalancer::Feedback(const CallInfo& info) {
    354     butil::DoublyBufferedData<Servers>::ScopedPtr s;
    355     if (_db_servers.Read(&s) != 0) {
    356         return;
    357     }
    358     const size_t* pindex = s->server_map.seek(info.server_id);
    359     if (NULL == pindex) {//对应的服务节点已删除
    360         return;
    361     }
    362     const size_t index = *pindex;
    363     Weight* w = s->weight_tree[index].weight;
    364     const int64_t diff = w->Update(info, index);//尝试更新
    365     if (diff != 0) {
    366         s->UpdateParentWeights(diff, index);//更新权重树
    367         _total.fetch_add(diff, butil::memory_order_relaxed);
    368     }
    369 }
    
    371 int64_t LocalityAwareLoadBalancer::Weight::Update(
    372     const CallInfo& ci, size_t index) {
    373     const int64_t end_time_us = butil::gettimeofday_us();
    374     const int64_t latency = end_time_us - ci.begin_time_us;
    375     BAIDU_SCOPED_LOCK(_mutex);
    376     if (Disabled()) {
    377         // The weight was disabled and will be removed soon, do nothing
    378         // and the diff is 0.
    379         return 0;
    380     }
    381 
    382     _begin_time_sum -= ci.begin_time_us;
    383     --_begin_time_count;
    384 
    385     if (latency <= 0) {
    386         // time skews, ignore the sample.
    387         return 0;
    388     }
    389     if (ci.error_code == 0) {
    390         // Add a new entry 
    391         TimeInfo tm_info = { latency, end_time_us };
    392         if (!_time_q.empty()) { 
    393             tm_info.latency_sum += _time_q.bottom()->latency_sum;
    394         }
    395         _time_q.elim_push(tm_info);
    

    请求回来后,如果该节点即将被删除Disabled则什么也不做,对成功的请求,会累加tm_info.latency_sum += _time_q.bottom()->latency_sum,用于后面计算平均时延。

    对于其他原因失败的请求则:

    396     } else {
    410         int ndone = 1;
    411         int nleft = 0;
    412         if (ci.controller->max_retry() > 0) {
    413             ndone = ci.controller->retried_count();
    414             nleft = ci.controller->max_retry() - ndone;
    415         }   
    416         const int64_t err_latency =
    417             (nleft * (int64_t)(latency * FLAGS_punish_error_ratio)
    418              + ndone * ci.controller->timeout_ms() * 1000L) / (ndone + nleft);
    419              
    420         if (!_time_q.empty()) {
    421             TimeInfo* ti = _time_q.bottom();
    422             ti->latency_sum += err_latency;
    423             ti->end_time_us = end_time_us;
    424         } else {
    425             // If the first response is error, enlarge the latency as timedout
    426             // since we know nothing about the normal latency yet.
    427             const TimeInfo tm_info = {
    428                 std::max(err_latency, ci.controller->timeout_ms() * 1000L),
    429                 end_time_us
    430             };  
    431             _time_q.push(tm_info);
    432         }
    

    这里贴上注释:

    397         // Accumulate into the last entry so that errors always decrease
    398         // the overall QPS and latency.
    399         // Note that the latency used is linearly mixed from the real latency
    400         // (of an errorous call) and the timeout, so that errors that are more
    401         // unlikely to be solved by later retries are punished more.
    402         // Examples:
    403         //   max_retry=0: always use timeout
    404         //   max_retry=1, retried=0: latency
    405         //   max_retry=1, retried=1: timeout
    406         //   max_retry=2, retried=0: latency
    407         //   max_retry=2, retried=1: (latency + timeout) / 2
    408         //   max_retry=2, retried=2: timeout
    

    如上,会根据重传次数等参数计算出延迟,累加到最后一个TimeInfo,或者是第一个则放大延迟。

    435     const int64_t top_time_us = _time_q.top()->end_time_us;
    436     const size_t n = _time_q.size();
    437     int64_t scaled_qps = DEFAULT_QPS * WEIGHT_SCALE;
    438     if (end_time_us > top_time_us) {
    439         // Only calculate scaled_qps when the queue is full or the elapse
    440         // between bottom and top is reasonably large(so that error of the
    441         // calculated QPS is probably smaller).
    442         if (n == _time_q.capacity() ||
    443             end_time_us >= top_time_us + 1000000L/*1s*/) {
    444             // will not overflow.
    445             scaled_qps = (n - 1) * 1000000L * WEIGHT_SCALE / (end_time_us - top_time_us);
    446             if (scaled_qps < WEIGHT_SCALE) {
    447                 scaled_qps = WEIGHT_SCALE;
    448             }
    449         }
    450         _avg_latency = (_time_q.bottom()->latency_sum -
    451                         _time_q.top()->latency_sum) / (n - 1);
    452     } else if (n == 1) {
    453         _avg_latency = _time_q.bottom()->latency_sum;
    454     } else {
    455         // end_time_us <= top_time_us && n > 1: the QPS is so high that
    456         // the time elapse between top and bottom is 0(possible in examples),
    457         // or time skews, we don't update the weight for safety.
    458         return 0;
    459     }
    460     _base_weight = scaled_qps / _avg_latency;
    461     return ResetWeight(index, end_time_us);
    462 }
    

    最后重新计算时延和_base_weight_time_q存储最近128个延迟信息。如果在时间越短内完成的请求越多则scaled_qps越大,_avg_latency取这128个的平均值,最后_base_weight = scaled_qps / _avg_latency并重新调整_weight

    这里当更新旧前台数据时,可能新前台和旧后台数据有些不一致,这是可以接受的。“我们不追求一致性,只要最终一致即可,这能让我们少加锁。这也意味着“权值之和”,“左子树权值之和”,“节点权值”未必能精确吻合,查找算法要能适应这一点。”

    下面贴上lalb.md中关于权重相关的分析及设计原因,值的学习,也算是回答分析源码时的几个疑问。

    base_weight

    QPS和latency使用一个循环队列统计,默认容量128。我们可以使用这么小的统计窗口,是因为inflight delay能及时纠正过度反应,而128也具备了一定的统计可信度。不过,这么计算latency的缺点是:如果server的性能出现很大的变化,那么我们需要积累一段时间才能看到平均延时的变化。就像上节例子中那样,server反转延时后client需要积累很多秒的数据才能看到的平均延时的变化。目前我们并么有处理这个问题,因为真实生产环境中的server不太会像例子中那样跳变延时,大都是缓缓变慢。当集群有几百台机器时,即使我们反应慢点给个别机器少分流点也不会导致什么问题。如果在产品线中确实出现了性能跳变,并且集群规模不大,我们再处理这个问题。

    权值的计算方法是base_weight = QPS * WEIGHT_SCALE / latency ^ p。其中WEIGHT_SCALE是一个放大系数,为了能用整数存储权值,又能让权值有足够的精度,类似定点数。p默认为2,延时的收敛速度大约为p=1时的p倍,选项quadratic_latency=false可使p=1。

    权值计算在各个环节都有最小值限制,为了防止某个节点的权值过低而使其完全没有访问机会。即使一些延时远大于平均延时的节点,也应该有足够的权值,以确保它们可以被定期访问,否则即使它们变快了,我们也不会知道。

    除了待删除节点,所有节点的权值绝对不会为0。
    这也制造了一个问题:即使一个server非常缓慢(但没有断开连接),它的权值也不会为0,所以总会有一些请求被定期送过去而铁定超时。当qps不高时,为了降低影响面,探测间隔必须拉长。比如为了把对qps=1000的影响控制在1%%内,故障server的权值必须低至使其探测间隔为10秒以上,这降低了我们发现server变快的速度。这个问题的解决方法有:

    • 什么都不干。这个问题也许没有想象中那么严重,由于持续的资源监控,线上服务很少出现“非常缓慢”的情况,一般性的变慢并不会导致请求超时。
    • 保存一些曾经发向缓慢server的请求,用这些请求探测。这个办法的好处是不浪费请求。但实现起来耦合很多,比较麻烦。
    • 强制backup request。
    • 再选一次。
    inflight delay

    我们必须追踪还未结束的RPC,否则我们就必须等待到超时或其他错误发生,而这可能会很慢(超时一般会是正常延时的若干倍),在这段时间内我们可能做出了很多错误的分流。最简单的方法是统计未结束RPC的耗时:

    • 选择server时累加发出时间和未结束次数。
    • 反馈时扣除发出时间和未结束次数。
    • 框架保证每个选择总对应一次反馈。

    这样“当前时间 - 发出时间之和 / 未结束次数”便是未结束RPC的平均耗时,我们称之为inflight delay。当inflight delay大于平均延时时,我们就线性地惩罚节点权值,即weight = base_weight * avg_latency / inflight_delay。当发向一个节点的请求没有在平均延时内回来时,它的权值就会很快下降,从而纠正我们的行为,这比等待超时快多了。不过这没有考虑延时的正常抖动,我们还得有方差,方差可以来自统计,也可简单线性于平均延时。不管怎样,有了方差bound后,当inflight delay > avg_latency + max(bound * 3, MIN_BOUND)时才会惩罚权值。3是正态分布中的经验数值。

    ConsistentHashingLoadBalancer

    这里以murmur(multiply and rotate)为哈希函数,关于它的优缺点有:速度快,比安全散列算法快几十倍;相似的字符串如“abc”和“abd”能够均匀散落在哈希环上。

     60 uint32_t MurmurHash32(const void* key, size_t len) {
     61     uint32_t hash;               
     62     butil::MurmurHash3_x86_32(key, (int)len, 0, &hash);
     63     return hash;
     64 }
    
     83 void MurmurHash3_x86_32 ( const void * key, int len,
     84                           uint32_t seed, void * out ) {
     85   const uint8_t * data = (const uint8_t*)key;
     86   const int nblocks = len / 4;
     87 
     88   uint32_t h1 = seed;
     90   const uint32_t c1 = 0xcc9e2d51;
     91   const uint32_t c2 = 0x1b873593;
     92 
     93   //----------
     94   // body
     95 
     96   const uint32_t * blocks = (const uint32_t *)(data + nblocks*4);
     97 
     98   for(int i = -nblocks; i; i++)
     99   {
    100     uint32_t k1 = blocks[i];
    101 
    102     k1 *= c1;
    103     k1 = rotl32(k1,15);
    104     k1 *= c2;
    105 
    106     h1 ^= k1;
    107     h1 = rotl32(h1,13);
    108     h1 = h1*5+0xe6546b64;
    109   }
    111   //----------
    112   // tail
    113 
    114   const uint8_t * tail = (const uint8_t*)(data + nblocks*4);
    116   uint32_t k1 = 0;
    117 
    118   switch(len & 3)
    119   {
    120   case 3: k1 ^= tail[2] << 16;
    121   case 2: k1 ^= tail[1] << 8;
    122   case 1: k1 ^= tail[0];
    123           k1 *= c1; k1 = rotl32(k1,15); k1 *= c2; h1 ^= k1;
    124   };
    125 
    126   //----------
    127   // finalization
    128 
    129   h1 ^= len;
    131   h1 = fmix32(h1);
    133   *(uint32_t*)out = h1;
    134 }
    
     53 inline uint32_t rotl32 ( uint32_t x, int8_t r )
     54 {
     55   return (x << r) | (x >> (32 - r));
     56 }
    
     49 MURMURHASH_FORCE_INLINE uint32_t fmix32 (uint32_t h) {
     50   h ^= h >> 16;
     51   h *= 0x85ebca6b;
     52   h ^= h >> 13;
     53   h *= 0xc2b2ae35;
     54   h ^= h >> 16;
     55   return h;
     56 }
    

    以上是具体的哈希函数实现。

     36 enum ConsistentHashingLoadBalancerType {//类型
     37     CONS_HASH_LB_MURMUR3 = 0,//以此为例
     38     CONS_HASH_LB_MD5 = 1,
     39     CONS_HASH_LB_KETAMA = 2,
     40 
     41     // Identify the last one.    
     42     CONS_HASH_LB_LAST = 3        
     43 };
    
     45 class ConsistentHashingLoadBalancer : public LoadBalancer {
     46 public:
     47     struct Node {
     48         uint32_t hash; 
     49         ServerId server_sock;
     50         butil::EndPoint server_addr;  // To make sorting stable among all clients
     51         bool operator<(const Node &rhs) const {
     52             if (hash < rhs.hash) { return true; }
     53             if (hash > rhs.hash) { return false; }
     54             return server_addr < rhs.server_addr;
     55         }
     56         bool operator<(const uint32_t code) const {
     57             return hash < code;
     58         }
     59     };
     70 private:
     79     size_t _num_replicas;
     80     ConsistentHashingLoadBalancerType _type;
     81     butil::DoublyBufferedData<std::vector<Node> > _db_hash_ring;
     82 };
    

    由于之前会初始化对应几种策略的工厂类,这里就不贴出具体实现,但murmurhash使用的是默认的DefaultReplicaPolicy

    220 bool ConsistentHashingLoadBalancer::AddServer(const ServerId& server) {
    221     std::vector<Node> add_nodes;
    222     add_nodes.reserve(_num_replicas);//虚拟节点数
    223     if (!GetReplicaPolicy(_type)->Build(server, _num_replicas, &add_nodes)) {
    224         return false;
    225     }
    226     std::sort(add_nodes.begin(), add_nodes.end());
    227     bool executed = false;
    228     const size_t ret = _db_hash_ring.ModifyWithForeground(
    229                         AddBatch, add_nodes, &executed);
    230     CHECK(ret == 0 || ret == _num_replicas) << ret;
    231     return ret != 0;
    232 }
    
     64 bool DefaultReplicaPolicy::Build(ServerId server,
     65                                  size_t num_replicas,
     66                                  std::vector<ConsistentHashingLoadBalancer::Node>* replicas) const {
     67     SocketUniquePtr ptr;
     68     if (Socket::AddressFailedAsWell(server.id, &ptr) == -1) {
     69         return false;
     70     }
     71     replicas->clear();
     72     for (size_t i = 0; i < num_replicas; ++i) {
     73         char host[32];  
     74         int len = snprintf(host, sizeof(host), "%s-%lu",
     75                            endpoint2str(ptr->remote_side()).c_str(), i);
     76         ConsistentHashingLoadBalancer::Node node;
     77         node.hash = _hash_func(host, len);
     78         node.server_sock = server;
     79         node.server_addr = ptr->remote_side();
     80         replicas->push_back(node);
     81     }
     82     return true;
     83 }
    

    以上在增加一个节点时,先根据对应的_hash_func哈希函数算出_num_replicas个虚拟节点信息并对结果排序,然后调用AddBatch修改后台和旧前台数据:

    152 size_t ConsistentHashingLoadBalancer::AddBatch(
    153         std::vector<Node> &bg, const std::vector<Node> &fg,
    154         const std::vector<Node> &servers, bool *executed) {
    155     if (*executed) {
    156         // Hack DBD
    157         return fg.size() - bg.size();
    158     }
    159     *executed = true;
    160     bg.resize(fg.size() + servers.size());
    161     bg.resize(std::set_union(fg.begin(), fg.end(),
    162                              servers.begin(), servers.end(), bg.begin())
    163               - bg.begin());//去重
    164     return bg.size() - fg.size();
    165 }
    

    下面是删除节点:

    257 bool ConsistentHashingLoadBalancer::RemoveServer(const ServerId& server) {
    258     bool executed = false;
    259     const size_t ret = _db_hash_ring.ModifyWithForeground(Remove, server, &executed);
    260     CHECK(ret == 0 || ret == _num_replicas);
    261     return ret != 0;
    262 }
    
    204 size_t ConsistentHashingLoadBalancer::Remove(
    205         std::vector<Node> &bg, const std::vector<Node> &fg,
    206         const ServerId& server, bool *executed) {
    207     if (*executed) {
    208         return bg.size() - fg.size();
    209     }
    210     *executed = true;
    211     bg.clear();
    212     for (size_t i = 0; i < fg.size(); ++i) {
    213         if (fg[i].server_sock != server) {
    214             bg.push_back(fg[i]);
    215         }
    216     }
    217     return fg.size() - bg.size();
    218 }
    

    直接把后台数据清空,并用旧前台的数据重新赋值,其中跳过要删除的节点。

    根据hash值选择一个节点:

    290 int ConsistentHashingLoadBalancer::SelectServer(
    291     const SelectIn &in, SelectOut *out) {
    292     if (!in.has_request_code) {
    293         //带上hash值
    294         return EINVAL;
    295     }
    296     if (in.request_code > UINT_MAX) {//request_code must be 32-bit currently
    298         return EINVAL;
    299     }
    300     butil::DoublyBufferedData<std::vector<Node> >::ScopedPtr s;
    301     if (_db_hash_ring.Read(&s) != 0) {
    302         return ENOMEM;
    303     }
    304     if (s->empty()) {
    305         return ENODATA;
    306     }
    307     std::vector<Node>::const_iterator choice =
    308         std::lower_bound(s->begin(), s->end(), (uint32_t)in.request_code);
    309     if (choice == s->end()) {
    310         choice = s->begin();
    311     }
    312     for (size_t i = 0; i < s->size(); ++i) {
    313         if (((i + 1) == s->size() // always take last chance
    314              || !ExcludedServers::IsExcluded(in.excluded, choice->server_sock.id))
    315             && Socket::Address(choice->server_sock.id, out->ptr) == 0
    316             && (*out->ptr)->IsAvailable()) {
    317             return 0;
    318         } else {
    319             if (++choice == s->end()) {
    320                 choice = s->begin();
    321             }
    322         }
    323     }
    324     return EHOSTDOWN;
    325 }
    

    参考资料:
    client.md
    lalb.md
    load_balancing.md
    consistent_hashing.md
    murmurhash3 学习笔记
    set_union

    相关文章

      网友评论

          本文标题:brpc之负载均衡源码分析

          本文链接:https://www.haomeiwen.com/subject/qstuyctx.html