在分析bthread相关实现时,从使用中发现有LoadBalancerWithNaming
类的使用,这块也比较独立,所以先分析这块负载均衡,命名服务下次再分析。
在brpc/policy/目录下有一些具体的均衡策略的实现,这里只介绍下RoundRobinLoadBalancer/LocalityAwareLoadBalancer/ConsistentHashingLoadBalancer
,都继承自LoadBalancer
。
LoadBalancer
35 // Select a server from a set of servers (in form of ServerId).
36 class LoadBalancer : public NonConstDescribable, public Destroyable {
78 virtual bool AddServer(const ServerId& server) = 0;
82 virtual bool RemoveServer(const ServerId& server) = 0;
96 virtual int SelectServer(const SelectIn& in, SelectOut* out) = 0;
97 //more code...
107 protected:
108 virtual ~LoadBalancer() { }
109 };
RoundRobinLoadBalancer
32 // This LoadBalancer selects server evenly. Selected numbers of servers(added
33 // at the same time) are very close.
34 class RoundRobinLoadBalancer : public LoadBalancer {
35 public:
36 bool AddServer(const ServerId& id);
37 bool RemoveServer(const ServerId& id);
40 int SelectServer(const SelectIn& in, SelectOut* out);
41 RoundRobinLoadBalancer* New(const butil::StringPiece&) const;
45 private:
46 struct Servers {
47 std::vector<ServerId> server_list;
48 std::map<ServerId, size_t> server_map;
49 };
50 struct TLS {
51 TLS() : stride(0), offset(0) { }
52 uint32_t stride;
53 uint32_t offset;
54 };
55 //more code...
61 butil::DoublyBufferedData<Servers, TLS> _db_servers;
63 };
81 bool RoundRobinLoadBalancer::AddServer(const ServerId& id) {
82 return _db_servers.Modify(Add, id);
83 }
84
85 bool RoundRobinLoadBalancer::RemoveServer(const ServerId& id) {
86 return _db_servers.Modify(Remove, id);
87 }
37 bool RoundRobinLoadBalancer::Add(Servers& bg, const ServerId& id) {
38 if (bg.server_list.capacity() < 128) {
39 bg.server_list.reserve(128);
40 }
41 std::map<ServerId, size_t>::iterator it = bg.server_map.find(id);
42 if (it != bg.server_map.end()) {
43 return false;
44 }
45 bg.server_map[id] = bg.server_list.size();
46 bg.server_list.push_back(id);
47 return true;
48 }
49
50 bool RoundRobinLoadBalancer::Remove(Servers& bg, const ServerId& id) {
51 std::map<ServerId, size_t>::iterator it = bg.server_map.find(id);
52 if (it != bg.server_map.end()) {
53 const size_t index = it->second;
54 bg.server_list[index] = bg.server_list.back();
55 bg.server_map[bg.server_list[index]] = index;
56 bg.server_list.pop_back();
57 bg.server_map.erase(it);
58 return true;
59 }
60 return false;
61 }
以上几个成员函数实现,都通过DoublyBufferedData
去更新,这样在后台更新最新的数据,这里之前分析过DoublyBufferedData
原理多线程偏底层的性能优化思考
这里要注意一下,虽然前台数据会被读,后台数据会被写,但是粒度不一样,对于具体的权重Weight类型,读的时候也会修改里面的一些成员变量,所以从大的方向看,前后台共享的这个会在任何一方被修改,但是一些结构都是独立的,互不影响,比如:
148 butil::atomic<int64_t> _total;
149 butil::DoublyBufferedData<Servers> _db_servers;
150 std::deque<int64_t> _left_weights;
151 ServerId2SocketIdMapper _id_mapper;
116 class Servers {
117 public:
118 std::vector<ServerInfo> weight_tree;
119 butil::FlatMap<SocketId, size_t> server_map;
120 //more code...
128 };
主要的还是挑选出一些可用的server,跟其他策略区别在这里:
107 int RoundRobinLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) {
108 butil::DoublyBufferedData<Servers, TLS>::ScopedPtr s;
109 if (_db_servers.Read(&s) != 0) {
110 return ENOMEM;
111 }
112 const size_t n = s->server_list.size();
113 if (n == 0) {
114 return ENODATA;
115 }
116 //more code...
121 TLS tls = s.tls();
122 if (tls.stride == 0) {
123 tls.stride = GenRandomStride();
124 tls.offset = 0;
125 }
126
127 for (size_t i = 0; i < n; ++i) {
128 tls.offset = (tls.offset + tls.stride) % n;
129 const SocketId id = s->server_list[tls.offset].id;
130 if (((i + 1) == n // always take last chance
131 || !ExcludedServers::IsExcluded(in.excluded, id))
132 && Socket::Address(id, out->ptr) == 0
133 && (*out->ptr)->IsAvailable()) {
134 s.tls() = tls;
135 return 0;
136 }
137 }
138 //more code...
141 s.tls() = tls;
142 return EHOSTDOWN;
143 }
以上实现通过ScopedPtr
读前台数据,此时会BeginRead
加锁防止后台线程更新导致不正确,读完后析构调用EndRead
。这里根据TLS
去记录上次选择的,这次从哪儿开始,因为是rr算法,总是选择列表中的下一台服务器,结尾的下一台是开头,所以机会平等,然后选择一个可用的server返回,这里对一些健康检查过程跳过。
LocalityAwareLoadBalancer
LocalityAwareLoadBalancer
是优先选择延时低的下游,直到其延时高于其他机器,即“以下游节点的吞吐除以延时作为分流权值”。
由于相关的算法实现有过多的注释,这里就直接删除,有兴趣的直接定位到相关源码。
53 bool LocalityAwareLoadBalancer::Add(Servers& bg, const Servers& fg,
54 SocketId id,
55 LocalityAwareLoadBalancer* lb) {
56 if (bg.weight_tree.capacity() < INITIAL_WEIGHT_TREE_SIZE) {
57 bg.weight_tree.reserve(INITIAL_WEIGHT_TREE_SIZE);
58 }
59 if (bg.server_map.seek(id) != NULL) {
60 // The id duplicates.
61 return false;
62 }
63 const size_t* pindex = fg.server_map.seek(id);
64 if (pindex == NULL) {
69 const size_t index = bg.weight_tree.size();
74 int64_t initial_weight = WEIGHT_SCALE;
75 if (!bg.weight_tree.empty()) {
76 initial_weight = lb->_total.load(butil::memory_order_relaxed)
77 / bg.weight_tree.size();//取平均值
78 }
82 bg.server_map[id] = index;//新增节点
83
87 ServerInfo info = { id, lb->PushLeft(), new Weight(initial_weight) };//构造权重结构
88 bg.weight_tree.push_back(info);
89
93 const int64_t diff = info.weight->volatile_value();
94 if (diff) {
95 bg.UpdateParentWeights(diff, index);//更新权重二叉树
96 lb->_total.fetch_add(diff, butil::memory_order_relaxed);//累加_total
97 }
98 } else {
101 bg.server_map[id] = bg.weight_tree.size();
102 bg.weight_tree.push_back(fg.weight_tree[*pindex]);
103 }
104 return true;
105 }
110 struct ServerInfo {
111 SocketId server_id;
112 butil::atomic<int64_t>* left;
113 Weight* weight;
114 };
227 bool LocalityAwareLoadBalancer::AddServer(const ServerId& id) {
228 if (_id_mapper.AddServer(id)) {//第一次添加该节点
230 return _db_servers.ModifyWithForeground(Add, id.id, this);
231 } else {
232 return true;
233 }
234 }
以上实现是新增加一个服务节点信息,此时会更新后台节点及权重信息。如连接中说明,“LALB的查找过程是按权值分流,O(N)方法如下:获得所有权值的和total,产生一个间于[0, total-1]的随机数R,逐个遍历权值,直到当前权值之和不大于R,而下一个权值之和大于R。这个方法可以工作,也好理解,但当N达到几百时性能已经很差,这儿的主要因素是cache一致性:LALB是一个基于反馈的算法,RPC结束时信息会被反馈入LALB,被遍历的数据结构也一直在被修改。这意味着前台的O(N)读必须刷新每一行cacheline。当N达到数百时,一次查找过程可能会耗时百微秒,更别提更大的N了,LALB(将)作为brpc的默认分流算法,这个性能开销是无法接受的。
另一个办法是用完全二叉树。每个节点记录了左子树的权值之和,这样我们就能在O(logN)时间内完成查找。当N为1024时,我们最多跳转10次内存,总耗时可控制在1微秒内,这个性能是可接受的。”
回到增加节点的源码实现,因为ModifyWithForeground
会首先更新后台数据:
411 template <typename T, typename TLS>
412 template <typename Fn, typename Arg1, typename Arg2>
413 size_t DoublyBufferedData<T, TLS>::ModifyWithForeground(
414 Fn& fn, const Arg1& arg1, const Arg2& arg2) {
415 WithFG2<Fn, Arg1, Arg2> c(fn, _data, arg1, arg2);
416 return Modify(c);
417 }
131 template <typename Fn, typename Arg1, typename Arg2>
132 struct WithFG2 {
133 WithFG2(Fn& fn, T* data, const Arg1& arg1, const Arg2& arg2)
134 : _fn(fn), _data(data), _arg1(arg1), _arg2(arg2) {}
135 size_t operator()(T& bg) {
136 return _fn(bg, (const T&)_data[&bg == _data], _arg1, _arg2);
137 }
138 private:
139 Fn& _fn;
140 T* _data;
141 const Arg1& _arg1;
142 const Arg2& _arg2;
143 };
353 int bg_index = !_index.load(butil::memory_order_relaxed);
356 const size_t ret = fn(_data[bg_index]);//更新后台
365 _index.store(bg_index, butil::memory_order_release);//更新最新index
366 bg_index = !bg_index;//旧前台
368 //block writer... //直到所有读线程不再读旧前台
377 const size_t ret2 = fn(_data[bg_index]);//更新旧前台
一开始对于新增加的节点,前台数据肯定是空的,第一次更新时,bg是的index为1,fg为0,所以pindex == NULL
,然后bg.weight_tree.push_back(info)
,以所在的index
,即vector
数组的最新位置作为index
,即bg.server_map[id] = index
,并以当前节点中的平均权值作为initial_weight
。
110 struct ServerInfo {
111 SocketId server_id;
112 butil::atomic<int64_t>* left;
113 Weight* weight;
114 };
141 // Add a entry to _left_weights.
142 butil::atomic<int64_t>* PushLeft() {
143 _left_weights.push_back(0);
144 return (butil::atomic<int64_t>*)&_left_weights.back();
145 }
最后累加权重以及更新完全二叉树的权重和:
154 inline void LocalityAwareLoadBalancer::Servers::UpdateParentWeights(
155 int64_t diff, size_t index) const {
156 while (index != 0) {
157 const size_t parent_index = (index - 1) >> 1;
158 if ((parent_index << 1) + 1 == index) { // left child
159 weight_tree[parent_index].left->fetch_add(
160 diff, butil::memory_order_relaxed);
161 }
162 index = parent_index;
163 }
164 }
记录左子树的权重和,举个简单例子,有vector<int> value = {1,4,5,7,8,15,20},因为7/8/15/20为叶子节点,那么weight[3~6]=0,weight[1]=7,weight[2]=15,weight[0]=19。
然后再更新旧前台时,此时index为0,bg是的index为0,fg为1,所以pindex != NULL
,直接用新前台的数据拷贝到旧前台,这里会共用Weight内存,后面删除时要小心。
以下是删除一个节点,这块实现是挺复杂的:
236 bool LocalityAwareLoadBalancer::RemoveServer(const ServerId& id) {
237 if (_id_mapper.RemoveServer(id)) {
239 return _db_servers.Modify(Remove, id.id, this);
240 } else {
241 return true;
242 }
243 }
107 bool LocalityAwareLoadBalancer::Remove(
108 Servers& bg, SocketId id, LocalityAwareLoadBalancer* lb) {
109 size_t* pindex = bg.server_map.seek(id);
110 if (NULL == pindex) {
111 // The id does not exist.
112 return false;
113 }
114 // Save the index and remove mapping from id to the index.
115 const size_t index = *pindex;
116 bg.server_map.erase(id);
117
118 Weight* w = bg.weight_tree[index].weight;
122 const int64_t rm_weight = w->Disable();
123 if (index + 1 == bg.weight_tree.size()) {
124 // last node. Removing is eaiser.
125 bg.weight_tree.pop_back();
126 if (rm_weight) {
132 int64_t diff = -rm_weight;
133 bg.UpdateParentWeights(diff, index);
134 lb->_total.fetch_add(diff, butil::memory_order_relaxed);
135 } else {
136 // the second buffer. clean left stuff.
137 delete w;
138 lb->PopLeft();
139 }
140 } else {
141 // Move last node to position `index' to fill the space.
142 bg.weight_tree[index].server_id = bg.weight_tree.back().server_id;
143 bg.weight_tree[index].weight = bg.weight_tree.back().weight;
144 bg.server_map[bg.weight_tree[index].server_id] = index;
145 bg.weight_tree.pop_back();
146
147 Weight* w2 = bg.weight_tree[index].weight; // previously back()
148 if (rm_weight) {
159 const int64_t add_weight = w2->MarkOld(bg.weight_tree.size());
160
164 const int64_t diff = add_weight - rm_weight;
165 if (diff) {
166 bg.UpdateParentWeights(diff, index);
167 lb->_total.fetch_add(diff, butil::memory_order_relaxed);
168 }
171 } else {
175 const std::pair<int64_t, int64_t> p = w2->ClearOld();
176 // Add the diff to parent nodes of node `index'
177 const int64_t diff = p.second;
178 if (diff) {
179 bg.UpdateParentWeights(diff, index);
180 }
181 // Remove weight from parent nodes of last node.
182 int64_t old_weight = - p.first - p.second;
183 if (old_weight) {
184 bg.UpdateParentWeights(old_weight, bg.weight_tree.size());
185 }
186 lb->_total.fetch_add(- p.first, butil::memory_order_relaxed);
187 // Clear resources.
188 delete w;
189 lb->PopLeft();
190 }
191 }
192 return true;
193 }
先更新后台数据,获取对应index上的Weight* w
,并进行Disable
,这样后面rpc返回时更新权重延时信息时,直接跳过。如果是最后一个节点则删除,即bg.weight_tree.pop_back()
。在该节点权重不为0的情况下(第一次删除,此时前台还在使用中),因为后台存在的话前台肯定也存在,所以此时不能直接delete w
,先进行调整权重树:
126 if (rm_weight) {
132 int64_t diff = -rm_weight;
133 bg.UpdateParentWeights(diff, index);
134 lb->_total.fetch_add(diff, butil::memory_order_relaxed);
135 } else {
137 delete w;
138 lb->PopLeft();
139 }
当更新完后台时,再更新旧前台,对于旧前台来说,上述逻辑还是执行一遍,但此时rm_weight
为0,可以安全直接的释放。对于最后一个节点的删除,调整权重树很简单,只要对相应的权重沿着某节点删除对应的权重值即可。
当要删除的节点非最后一个时,一切变的有些复杂起来,当然也不难。对于后台数据更新时,这里实现是把最后一个节点覆盖到index,并删除最后一个节点,此时要调整权重树(add_weight - rm_weight
)。并记录删除的位置待更新前台时再特殊处理,中间还是要考虑更新权重时的情况:
563 int64_t LocalityAwareLoadBalancer::Weight::MarkOld(size_t index) {
564 BAIDU_SCOPED_LOCK(_mutex);
565 const int64_t saved = _weight;
566 _old_weight = saved;//删除时的权重
567 _old_diff_sum = 0;//重置,用于删除时后面变化的
568 _old_index = index;//删除的位置
569 return saved;
570 }
555 int64_t LocalityAwareLoadBalancer::Weight::Disable() {
556 BAIDU_SCOPED_LOCK(_mutex);
557 const int64_t saved = _weight;
558 _base_weight = -1;
559 _weight = 0;
560 return saved;
561 }
当rm_weight
不为0时即更新后台数据,此时前台还在使用中,故不能delete它,这里通过w记录旧的权重和索引等数据,如上面的代码实现。如注释说明:
150 // We need to remove the weight of last node from its parent
151 // nodes and add the weight to parent nodes of node `index'.
152 // However this process is not atomic. The foreground buffer still
153 // sees w2 as last node and it may change the weight during the
154 // process. To solve this problem, we atomically reset the weight
155 // and remember the preivous index (back()) in _old_index. Later
156 // change to weight will add the diff to _old_diff_sum if _old_index
157 // matches the index which SelectServer is from. In this way we
158 // know the weight diff from foreground before we laterly modify it.
对于前台数据来说,此时还是一棵权重二叉树,符合完全二叉树的权重和,但因后台要删除非最后一个节点,若直接修改与前台共享的w2权重数据则会破坏权重和,导致执行SelectServer
时选出错误的节点。此时前台看到的w2还是完整的。这里在原来的基础之前再加上add_weight - rm_weight
,但这一步为了不破坏权重和会在后面更新旧前台时删除。
161 // Add the weight diff to parent nodes of node `index'. Notice
162 // that we don't touch parent nodes of last node here because
163 // foreground is still sending traffic to last node.
当更新旧前台时,因在之前执行过Disable
则会返回rm_weight
为0:
572 std::pair<int64_t, int64_t> LocalityAwareLoadBalancer::Weight::ClearOld() {
573 BAIDU_SCOPED_LOCK(_mutex);
574 const int64_t old_weight = _old_weight;
575 const int64_t diff = _old_diff_sum;
576 _old_diff_sum = 0;
577 _old_index = (size_t)-1;
578 _old_weight = 0;
579 return std::make_pair(old_weight, diff);
580 }
接着w2->ClearOld
,并更新权重树,这里实现的原因是在其他地方比如SelectServer
会修改Weight
内部数据。因为之前修改后台时,因可能破坏前台的权重树结构,并没有更新last node的父节点们的权重,故这里需要把之前该做的情况处理掉:
173 // Reset _old_* fields and get the weight change by SelectServer()
174 // after MarkOld().
175 const std::pair<int64_t, int64_t> p = w2->ClearOld();
176 // Add the diff to parent nodes of node `index'
177 const int64_t diff = p.second;
178 if (diff) {
179 bg.UpdateParentWeights(diff, index);
180 }
181 // Remove weight from parent nodes of last node.
182 int64_t old_weight = - p.first - p.second;
183 if (old_weight) {
184 bg.UpdateParentWeights(old_weight, bg.weight_tree.size());
185 }
186 lb->_total.fetch_add(- p.first, butil::memory_order_relaxed);
187 // Clear resources.
188 delete w;
189 lb->PopLeft();
266 int LocalityAwareLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) {
267 butil::DoublyBufferedData<Servers>::ScopedPtr s;
268 if (_db_servers.Read(&s) != 0) {
269 return ENOMEM;
270 }
271 const size_t n = s->weight_tree.size();
272 if (n == 0) {
273 return ENODATA;
274 }
275 size_t ntry = 0;
276 size_t nloop = 0;
277 int64_t total = _total.load(butil::memory_order_relaxed);
278 int64_t dice = butil::fast_rand_less_than(total);
279 size_t index = 0;
280 int64_t self = 0;
281 while (total > 0) {
285 if (++nloop > 10000) {//防止死循环
287 return EHOSTDOWN;
288 }
293 const ServerInfo & info = s->weight_tree[index];
294 const int64_t left = info.left->load(butil::memory_order_relaxed);
295 if (dice < left) {//左边
296 index = index * 2 + 1;
297 if (index < n) {
298 continue;
299 }
300 } else if (dice >= left + (self = info.weight->volatile_value())) {//右边
301 dice -= left + self;
302 index = index * 2 + 2;
303 if (index < n) {
304 continue;
305 }
306 } else if (Socket::Address(info.server_id, out->ptr) == 0
307 && (*out->ptr)->IsAvailable()) {
308 //more code...
328 } else if (in.changable_weights) {
329 //more code...
345 }
346 total = _total.load(butil::memory_order_relaxed);
347 dice = butil::fast_rand_less_than(total);
348 index = 0;
349 }
350 return EHOSTDOWN;
351 }
SelectServer
根据随机出来的dice
来判断落在哪个节点上,因为weight_tree
权重和树的结构如上面解释到,如果dice < left
则会在左边,如果dice >= left + (self = info.weight->volatile_value())
在右边,并更新相应的index
和dice
,否则就是当前节点,即 left <= dice < left + self
接着两个if判断处理找到节点的情况,考虑正常情况,若该节点可用状态:
306 if (Socket::Address(info.server_id, out->ptr) == 0 && (*out->ptr)->IsAvailable()) {
308 if ((ntry + 1) == n || !ExcludedServers::IsExcluded(in.excluded, info.server_id)) {
若需要修改权重(是把请求发往该节点后,降低它的权重?后期再更新权重?后面再回答这个问题)则会:
306 } else if (Socket::Address(info.server_id, out->ptr) == 0
307 && (*out->ptr)->IsAvailable()) {
308 if ((ntry + 1) == n // Instead of fail with EHOSTDOWN, we prefer
309 // choosing the server again.
310 || !ExcludedServers::IsExcluded(in.excluded, info.server_id)) {
311 if (!in.changable_weights) {
312 return 0;
313 }
314 const Weight::AddInflightResult r =
315 info.weight->AddInflight(in, index, dice - left);
316 if (r.weight_diff) {
317 s->UpdateParentWeights(r.weight_diff, index);
318 _total.fetch_add(r.weight_diff, butil::memory_order_relaxed);
319 }
320 if (r.chosen) {
321 out->need_feedback = true;
322 return 0;
323 }
324 }
325 if (++ntry >= n) {
326 break;
327 }
190 inline LocalityAwareLoadBalancer::Weight::AddInflightResult
191 LocalityAwareLoadBalancer::Weight::AddInflight(
192 const SelectIn& in, size_t index, int64_t dice) {
193 BAIDU_SCOPED_LOCK(_mutex);
194 if (Disabled()) {
195 AddInflightResult r = { false, 0 };
196 return r;
197 }
198 const int64_t diff = ResetWeight(index, in.begin_time_us);
199 if (_weight < dice) {
200 // inflight delay makes the weight too small to choose.
201 AddInflightResult r = { false, diff };
202 return r;
203 }
204 _begin_time_sum += in.begin_time_us;
205 ++_begin_time_count;
206 AddInflightResult r = { true, diff };//需要rpc返回时的反馈信息
207 return r;
208 }
166 inline int64_t LocalityAwareLoadBalancer::Weight::ResetWeight(
167 size_t index, int64_t now_us) {
168 int64_t new_weight = _base_weight;
169 if (_begin_time_count > 0) {
170 const int64_t inflight_delay =
171 now_us - _begin_time_sum / _begin_time_count;
172 const int64_t punish_latency =
173 (int64_t)(_avg_latency * FLAGS_punish_inflight_ratio);
174 if (inflight_delay >= punish_latency && _avg_latency > 0) {
175 new_weight = new_weight * punish_latency / inflight_delay;
176 }
177 }
178 if (new_weight < FLAGS_min_weight) {
179 new_weight = FLAGS_min_weight;
180 }
181 const int64_t old_weight = _weight;
182 _weight = new_weight;
183 const int64_t diff = new_weight - old_weight;
184 if (_old_index == index && diff != 0) {
185 _old_diff_sum += diff;//累加变化的
186 }
187 return diff;
188 }
当某个节点处于被删除状态时,即被删除时执行Disabled
返回,直接进行下一次选择,否则进行权重的更新ResetWeight
。因为如本文尾说的,比如节点刚启动,不可能一直把请求流量都分到该节点,有些没有响应的rpc请求超时后,是需要更新权重的,这块后面再详细分析。
接着分析SelectServer
,如果选择到某个节点但该节点不可用则尝试进行权重降低_base_weight
到一定的值:
328 } else if (in.changable_weights) {
329 const int64_t diff =
330 info.weight->MarkFailed(index, total / n);
331 if (diff) {
332 s->UpdateParentWeights(diff, index);
333 _total.fetch_add(diff, butil::memory_order_relaxed);
334 }
335 if (dice >= left + self + diff) {
336 dice -= left + self + diff;
337 index = index * 2 + 2;
338 if (index < n) {
339 continue;
340 }
341 }
342 if (++ntry >= n) {
343 break;
344 }
345 }
210 inline int64_t LocalityAwareLoadBalancer::Weight::MarkFailed(
211 size_t index, int64_t avg_weight) {
212 BAIDU_SCOPED_LOCK(_mutex);
213 if (_base_weight <= avg_weight) {
214 return 0;
215 }
216 _base_weight = avg_weight;
217 return ResetWeight(index, 0);
218 }
在controller实现中,每当完成一个rpc请求时[不管是服务器返回的失败还是超时返回的失败],若需要反馈给lb模块则:
784 if (need_feedback) {
785 const LoadBalancer::CallInfo info =
786 { begin_time_us, peer_id, error_code, c };
787 c->_lb->Feedback(info);
788 }
353 void LocalityAwareLoadBalancer::Feedback(const CallInfo& info) {
354 butil::DoublyBufferedData<Servers>::ScopedPtr s;
355 if (_db_servers.Read(&s) != 0) {
356 return;
357 }
358 const size_t* pindex = s->server_map.seek(info.server_id);
359 if (NULL == pindex) {//对应的服务节点已删除
360 return;
361 }
362 const size_t index = *pindex;
363 Weight* w = s->weight_tree[index].weight;
364 const int64_t diff = w->Update(info, index);//尝试更新
365 if (diff != 0) {
366 s->UpdateParentWeights(diff, index);//更新权重树
367 _total.fetch_add(diff, butil::memory_order_relaxed);
368 }
369 }
371 int64_t LocalityAwareLoadBalancer::Weight::Update(
372 const CallInfo& ci, size_t index) {
373 const int64_t end_time_us = butil::gettimeofday_us();
374 const int64_t latency = end_time_us - ci.begin_time_us;
375 BAIDU_SCOPED_LOCK(_mutex);
376 if (Disabled()) {
377 // The weight was disabled and will be removed soon, do nothing
378 // and the diff is 0.
379 return 0;
380 }
381
382 _begin_time_sum -= ci.begin_time_us;
383 --_begin_time_count;
384
385 if (latency <= 0) {
386 // time skews, ignore the sample.
387 return 0;
388 }
389 if (ci.error_code == 0) {
390 // Add a new entry
391 TimeInfo tm_info = { latency, end_time_us };
392 if (!_time_q.empty()) {
393 tm_info.latency_sum += _time_q.bottom()->latency_sum;
394 }
395 _time_q.elim_push(tm_info);
请求回来后,如果该节点即将被删除Disabled
则什么也不做,对成功的请求,会累加tm_info.latency_sum += _time_q.bottom()->latency_sum
,用于后面计算平均时延。
对于其他原因失败的请求则:
396 } else {
410 int ndone = 1;
411 int nleft = 0;
412 if (ci.controller->max_retry() > 0) {
413 ndone = ci.controller->retried_count();
414 nleft = ci.controller->max_retry() - ndone;
415 }
416 const int64_t err_latency =
417 (nleft * (int64_t)(latency * FLAGS_punish_error_ratio)
418 + ndone * ci.controller->timeout_ms() * 1000L) / (ndone + nleft);
419
420 if (!_time_q.empty()) {
421 TimeInfo* ti = _time_q.bottom();
422 ti->latency_sum += err_latency;
423 ti->end_time_us = end_time_us;
424 } else {
425 // If the first response is error, enlarge the latency as timedout
426 // since we know nothing about the normal latency yet.
427 const TimeInfo tm_info = {
428 std::max(err_latency, ci.controller->timeout_ms() * 1000L),
429 end_time_us
430 };
431 _time_q.push(tm_info);
432 }
这里贴上注释:
397 // Accumulate into the last entry so that errors always decrease
398 // the overall QPS and latency.
399 // Note that the latency used is linearly mixed from the real latency
400 // (of an errorous call) and the timeout, so that errors that are more
401 // unlikely to be solved by later retries are punished more.
402 // Examples:
403 // max_retry=0: always use timeout
404 // max_retry=1, retried=0: latency
405 // max_retry=1, retried=1: timeout
406 // max_retry=2, retried=0: latency
407 // max_retry=2, retried=1: (latency + timeout) / 2
408 // max_retry=2, retried=2: timeout
如上,会根据重传次数等参数计算出延迟,累加到最后一个TimeInfo
,或者是第一个则放大延迟。
435 const int64_t top_time_us = _time_q.top()->end_time_us;
436 const size_t n = _time_q.size();
437 int64_t scaled_qps = DEFAULT_QPS * WEIGHT_SCALE;
438 if (end_time_us > top_time_us) {
439 // Only calculate scaled_qps when the queue is full or the elapse
440 // between bottom and top is reasonably large(so that error of the
441 // calculated QPS is probably smaller).
442 if (n == _time_q.capacity() ||
443 end_time_us >= top_time_us + 1000000L/*1s*/) {
444 // will not overflow.
445 scaled_qps = (n - 1) * 1000000L * WEIGHT_SCALE / (end_time_us - top_time_us);
446 if (scaled_qps < WEIGHT_SCALE) {
447 scaled_qps = WEIGHT_SCALE;
448 }
449 }
450 _avg_latency = (_time_q.bottom()->latency_sum -
451 _time_q.top()->latency_sum) / (n - 1);
452 } else if (n == 1) {
453 _avg_latency = _time_q.bottom()->latency_sum;
454 } else {
455 // end_time_us <= top_time_us && n > 1: the QPS is so high that
456 // the time elapse between top and bottom is 0(possible in examples),
457 // or time skews, we don't update the weight for safety.
458 return 0;
459 }
460 _base_weight = scaled_qps / _avg_latency;
461 return ResetWeight(index, end_time_us);
462 }
最后重新计算时延和_base_weight
,_time_q
存储最近128个延迟信息。如果在时间越短内完成的请求越多则scaled_qps
越大,_avg_latency
取这128个的平均值,最后_base_weight = scaled_qps / _avg_latency
并重新调整_weight
。
这里当更新旧前台数据时,可能新前台和旧后台数据有些不一致,这是可以接受的。“我们不追求一致性,只要最终一致即可,这能让我们少加锁。这也意味着“权值之和”,“左子树权值之和”,“节点权值”未必能精确吻合,查找算法要能适应这一点。”
下面贴上lalb.md中关于权重相关的分析及设计原因,值的学习,也算是回答分析源码时的几个疑问。
base_weight
QPS和latency使用一个循环队列统计,默认容量128。我们可以使用这么小的统计窗口,是因为inflight delay能及时纠正过度反应,而128也具备了一定的统计可信度。不过,这么计算latency的缺点是:如果server的性能出现很大的变化,那么我们需要积累一段时间才能看到平均延时的变化。就像上节例子中那样,server反转延时后client需要积累很多秒的数据才能看到的平均延时的变化。目前我们并么有处理这个问题,因为真实生产环境中的server不太会像例子中那样跳变延时,大都是缓缓变慢。当集群有几百台机器时,即使我们反应慢点给个别机器少分流点也不会导致什么问题。如果在产品线中确实出现了性能跳变,并且集群规模不大,我们再处理这个问题。
权值的计算方法是base_weight = QPS * WEIGHT_SCALE / latency ^ p。其中WEIGHT_SCALE是一个放大系数,为了能用整数存储权值,又能让权值有足够的精度,类似定点数。p默认为2,延时的收敛速度大约为p=1时的p倍,选项quadratic_latency=false可使p=1。
权值计算在各个环节都有最小值限制,为了防止某个节点的权值过低而使其完全没有访问机会。即使一些延时远大于平均延时的节点,也应该有足够的权值,以确保它们可以被定期访问,否则即使它们变快了,我们也不会知道。
除了待删除节点,所有节点的权值绝对不会为0。
这也制造了一个问题:即使一个server非常缓慢(但没有断开连接),它的权值也不会为0,所以总会有一些请求被定期送过去而铁定超时。当qps不高时,为了降低影响面,探测间隔必须拉长。比如为了把对qps=1000的影响控制在1%%内,故障server的权值必须低至使其探测间隔为10秒以上,这降低了我们发现server变快的速度。这个问题的解决方法有:
- 什么都不干。这个问题也许没有想象中那么严重,由于持续的资源监控,线上服务很少出现“非常缓慢”的情况,一般性的变慢并不会导致请求超时。
- 保存一些曾经发向缓慢server的请求,用这些请求探测。这个办法的好处是不浪费请求。但实现起来耦合很多,比较麻烦。
- 强制backup request。
- 再选一次。
inflight delay
我们必须追踪还未结束的RPC,否则我们就必须等待到超时或其他错误发生,而这可能会很慢(超时一般会是正常延时的若干倍),在这段时间内我们可能做出了很多错误的分流。最简单的方法是统计未结束RPC的耗时:
- 选择server时累加发出时间和未结束次数。
- 反馈时扣除发出时间和未结束次数。
- 框架保证每个选择总对应一次反馈。
这样“当前时间 - 发出时间之和 / 未结束次数”便是未结束RPC的平均耗时,我们称之为inflight delay。当inflight delay大于平均延时时,我们就线性地惩罚节点权值,即weight = base_weight * avg_latency / inflight_delay。当发向一个节点的请求没有在平均延时内回来时,它的权值就会很快下降,从而纠正我们的行为,这比等待超时快多了。不过这没有考虑延时的正常抖动,我们还得有方差,方差可以来自统计,也可简单线性于平均延时。不管怎样,有了方差bound后,当inflight delay > avg_latency + max(bound * 3, MIN_BOUND)时才会惩罚权值。3是正态分布中的经验数值。
ConsistentHashingLoadBalancer
这里以murmur(multiply and rotate)
为哈希函数,关于它的优缺点有:速度快,比安全散列算法快几十倍;相似的字符串如“abc”和“abd”能够均匀散落在哈希环上。
60 uint32_t MurmurHash32(const void* key, size_t len) {
61 uint32_t hash;
62 butil::MurmurHash3_x86_32(key, (int)len, 0, &hash);
63 return hash;
64 }
83 void MurmurHash3_x86_32 ( const void * key, int len,
84 uint32_t seed, void * out ) {
85 const uint8_t * data = (const uint8_t*)key;
86 const int nblocks = len / 4;
87
88 uint32_t h1 = seed;
90 const uint32_t c1 = 0xcc9e2d51;
91 const uint32_t c2 = 0x1b873593;
92
93 //----------
94 // body
95
96 const uint32_t * blocks = (const uint32_t *)(data + nblocks*4);
97
98 for(int i = -nblocks; i; i++)
99 {
100 uint32_t k1 = blocks[i];
101
102 k1 *= c1;
103 k1 = rotl32(k1,15);
104 k1 *= c2;
105
106 h1 ^= k1;
107 h1 = rotl32(h1,13);
108 h1 = h1*5+0xe6546b64;
109 }
111 //----------
112 // tail
113
114 const uint8_t * tail = (const uint8_t*)(data + nblocks*4);
116 uint32_t k1 = 0;
117
118 switch(len & 3)
119 {
120 case 3: k1 ^= tail[2] << 16;
121 case 2: k1 ^= tail[1] << 8;
122 case 1: k1 ^= tail[0];
123 k1 *= c1; k1 = rotl32(k1,15); k1 *= c2; h1 ^= k1;
124 };
125
126 //----------
127 // finalization
128
129 h1 ^= len;
131 h1 = fmix32(h1);
133 *(uint32_t*)out = h1;
134 }
53 inline uint32_t rotl32 ( uint32_t x, int8_t r )
54 {
55 return (x << r) | (x >> (32 - r));
56 }
49 MURMURHASH_FORCE_INLINE uint32_t fmix32 (uint32_t h) {
50 h ^= h >> 16;
51 h *= 0x85ebca6b;
52 h ^= h >> 13;
53 h *= 0xc2b2ae35;
54 h ^= h >> 16;
55 return h;
56 }
以上是具体的哈希函数实现。
36 enum ConsistentHashingLoadBalancerType {//类型
37 CONS_HASH_LB_MURMUR3 = 0,//以此为例
38 CONS_HASH_LB_MD5 = 1,
39 CONS_HASH_LB_KETAMA = 2,
40
41 // Identify the last one.
42 CONS_HASH_LB_LAST = 3
43 };
45 class ConsistentHashingLoadBalancer : public LoadBalancer {
46 public:
47 struct Node {
48 uint32_t hash;
49 ServerId server_sock;
50 butil::EndPoint server_addr; // To make sorting stable among all clients
51 bool operator<(const Node &rhs) const {
52 if (hash < rhs.hash) { return true; }
53 if (hash > rhs.hash) { return false; }
54 return server_addr < rhs.server_addr;
55 }
56 bool operator<(const uint32_t code) const {
57 return hash < code;
58 }
59 };
70 private:
79 size_t _num_replicas;
80 ConsistentHashingLoadBalancerType _type;
81 butil::DoublyBufferedData<std::vector<Node> > _db_hash_ring;
82 };
由于之前会初始化对应几种策略的工厂类,这里就不贴出具体实现,但murmurhash使用的是默认的DefaultReplicaPolicy
。
220 bool ConsistentHashingLoadBalancer::AddServer(const ServerId& server) {
221 std::vector<Node> add_nodes;
222 add_nodes.reserve(_num_replicas);//虚拟节点数
223 if (!GetReplicaPolicy(_type)->Build(server, _num_replicas, &add_nodes)) {
224 return false;
225 }
226 std::sort(add_nodes.begin(), add_nodes.end());
227 bool executed = false;
228 const size_t ret = _db_hash_ring.ModifyWithForeground(
229 AddBatch, add_nodes, &executed);
230 CHECK(ret == 0 || ret == _num_replicas) << ret;
231 return ret != 0;
232 }
64 bool DefaultReplicaPolicy::Build(ServerId server,
65 size_t num_replicas,
66 std::vector<ConsistentHashingLoadBalancer::Node>* replicas) const {
67 SocketUniquePtr ptr;
68 if (Socket::AddressFailedAsWell(server.id, &ptr) == -1) {
69 return false;
70 }
71 replicas->clear();
72 for (size_t i = 0; i < num_replicas; ++i) {
73 char host[32];
74 int len = snprintf(host, sizeof(host), "%s-%lu",
75 endpoint2str(ptr->remote_side()).c_str(), i);
76 ConsistentHashingLoadBalancer::Node node;
77 node.hash = _hash_func(host, len);
78 node.server_sock = server;
79 node.server_addr = ptr->remote_side();
80 replicas->push_back(node);
81 }
82 return true;
83 }
以上在增加一个节点时,先根据对应的_hash_func
哈希函数算出_num_replicas
个虚拟节点信息并对结果排序,然后调用AddBatch
修改后台和旧前台数据:
152 size_t ConsistentHashingLoadBalancer::AddBatch(
153 std::vector<Node> &bg, const std::vector<Node> &fg,
154 const std::vector<Node> &servers, bool *executed) {
155 if (*executed) {
156 // Hack DBD
157 return fg.size() - bg.size();
158 }
159 *executed = true;
160 bg.resize(fg.size() + servers.size());
161 bg.resize(std::set_union(fg.begin(), fg.end(),
162 servers.begin(), servers.end(), bg.begin())
163 - bg.begin());//去重
164 return bg.size() - fg.size();
165 }
下面是删除节点:
257 bool ConsistentHashingLoadBalancer::RemoveServer(const ServerId& server) {
258 bool executed = false;
259 const size_t ret = _db_hash_ring.ModifyWithForeground(Remove, server, &executed);
260 CHECK(ret == 0 || ret == _num_replicas);
261 return ret != 0;
262 }
204 size_t ConsistentHashingLoadBalancer::Remove(
205 std::vector<Node> &bg, const std::vector<Node> &fg,
206 const ServerId& server, bool *executed) {
207 if (*executed) {
208 return bg.size() - fg.size();
209 }
210 *executed = true;
211 bg.clear();
212 for (size_t i = 0; i < fg.size(); ++i) {
213 if (fg[i].server_sock != server) {
214 bg.push_back(fg[i]);
215 }
216 }
217 return fg.size() - bg.size();
218 }
直接把后台数据清空,并用旧前台的数据重新赋值,其中跳过要删除的节点。
根据hash值选择一个节点:
290 int ConsistentHashingLoadBalancer::SelectServer(
291 const SelectIn &in, SelectOut *out) {
292 if (!in.has_request_code) {
293 //带上hash值
294 return EINVAL;
295 }
296 if (in.request_code > UINT_MAX) {//request_code must be 32-bit currently
298 return EINVAL;
299 }
300 butil::DoublyBufferedData<std::vector<Node> >::ScopedPtr s;
301 if (_db_hash_ring.Read(&s) != 0) {
302 return ENOMEM;
303 }
304 if (s->empty()) {
305 return ENODATA;
306 }
307 std::vector<Node>::const_iterator choice =
308 std::lower_bound(s->begin(), s->end(), (uint32_t)in.request_code);
309 if (choice == s->end()) {
310 choice = s->begin();
311 }
312 for (size_t i = 0; i < s->size(); ++i) {
313 if (((i + 1) == s->size() // always take last chance
314 || !ExcludedServers::IsExcluded(in.excluded, choice->server_sock.id))
315 && Socket::Address(choice->server_sock.id, out->ptr) == 0
316 && (*out->ptr)->IsAvailable()) {
317 return 0;
318 } else {
319 if (++choice == s->end()) {
320 choice = s->begin();
321 }
322 }
323 }
324 return EHOSTDOWN;
325 }
参考资料:
client.md
lalb.md
load_balancing.md
consistent_hashing.md
murmurhash3 学习笔记
set_union
网友评论