在之前分析rpc时,发现在代码中使用到此结构,当时看到时候感觉挺复杂的,从名字上看还以为仅仅是个uint64_t的id:
189 typedef struct {
190 uint64_t value;
191 } bthread_id_t;
实际分配是通过Id
结构,并make_id
生成的:
325 static int id_create_impl(
326 bthread_id_t* id, void* data,
327 int (*on_error)(bthread_id_t, void*, int),
328 int (*on_error2)(bthread_id_t, void*, int, const std::string&)) {
329 IdResourceId slot;
330 Id* const meta = get_resource(&slot);
331 if (meta) {
332 meta->data = data;
333 meta->on_error = on_error;
334 meta->on_error2 = on_error2;
335 CHECK(meta->pending_q.empty());
336 uint32_t* butex = meta->butex;
337 if (0 == *butex || *butex + ID_MAX_RANGE + 2 < *butex) {
338 // Skip 0 so that bthread_id_t is never 0
339 // avoid overflow to make comparisons simpler.
340 *butex = 1;
341 }
342 *meta->join_butex = *butex;
343 meta->first_ver = *butex;
344 meta->locked_ver = *butex + 1;
345 *id = make_id(*butex, slot);
346 return 0;
347 }
348 return ENOMEM;
349 }
156 inline bthread_id_t make_id(uint32_t version, IdResourceId slot) {
157 const bthread_id_t tmp =
158 { (((uint64_t)slot.value) << 32) | (uint64_t)version };
159 return tmp;
160 }
以上分配一个Id
用于同步rpc各个环节,且该id其实是索引一个Id
资源对象。由资源池中的索引id和版本号组成。
111 struct BAIDU_CACHELINE_ALIGNMENT Id {
112 // first_ver ~ locked_ver - 1: unlocked versions
113 // locked_ver: locked
114 // unlockable_ver: locked and about to be destroyed
115 // contended_ver: locked and contended
116 uint32_t first_ver;
117 uint32_t locked_ver;
118 internal::FastPthreadMutex mutex;
119 void* data;
120 int (*on_error)(bthread_id_t, void*, int);
121 int (*on_error2)(bthread_id_t, void*, int, const std::string&);
122 const char *lock_location;
123 uint32_t* butex;
124 uint32_t* join_butex;
125 SmallQueue<PendingError, 2> pending_q;
126
127 Id() {
128 // Although value of the butex(as version part of bthread_id_t)
129 // does not matter, we set it to 0 to make program more deterministic.
130 butex = bthread::butex_create_checked<uint32_t>();
131 join_butex = bthread::butex_create_checked<uint32_t>();
132 *butex = 0;
133 *join_butex = 0;
134 }
141 inline bool has_version(uint32_t id_ver) const {
142 return id_ver >= first_ver && id_ver < locked_ver;
143 }
135 //more code...
150 };
has_version
判断此时的版本号是否合法,因为一个rpc回来时,可能上下文已经没了,在bthread_id_cancel/bthread_id_unlock_and_destroy
时重置。SmallQueue
类是一个queue,当元素个数小于等于2时使用数组结构,多出来的时候使用堆结构,不贴相关代码:
96 int _begin;
97 int _size;
98 T _c[N];
99 std::deque<T>* _full;
回到上面,为什么id要这么生成,解决什么问题,这里引用相关说明:
“bthread_id是一个特殊的同步结构,它可以互斥RPC过程中的不同环节,也可以O(1)时间内找到RPC上下文(即Controller)。注意,这里我们谈论的是bthread_id_t,不是bthread_t(bthread的tid),这个名字起的确实不太好,容易混淆。
具体来说,bthread_id解决的问题有:
- 在发送RPC过程中response回来了,处理response的代码和发送代码产生竞争。
- 设置timer后很快触发了,超时处理代码和发送代码产生竞争。
- 重试产生的多个response同时回来产生的竞争。
- 通过correlation_id在O(1)时间内找到对应的RPC上下文,而无需建立从correlation_id到RPC上下文的全局哈希表。
- 取消RPC。
上文提到的bug在其他rpc框架中广泛存在,下面我们来看下brpc是如何通过bthread_id解决这些问题的。
bthread_id包括两部分,一个是用户可见的64位id,另一个是对应的不可见的bthread::Id结构体。用户接口都是操作id的。从id映射到结构体的方式和brpc中的其他结构类似:32位是内存池的偏移量,32位是version。前者O(1)时间定位,后者防止ABA问题。”
在发起rpc前,会通过call_id
返回一个id:
398 const CallId correlation_id = cntl->call_id();
401 const int rc = bthread_id_lock_and_reset_range(
402 correlation_id, NULL, 2 + cntl->max_retry());
1224 CallId Controller::call_id() {
1225 butil::atomic<uint64_t>* target =
1226 (butil::atomic<uint64_t>*)&_correlation_id.value;
1227 uint64_t loaded = target->load(butil::memory_order_relaxed);
1228 if (loaded) {
1229 const CallId id = { loaded };
1230 return id;
1231 }
1232 // Optimistic locking.
1233 CallId cid = { 0 };
1234 // The range of this id will be reset in Channel::CallMethod
1235 CHECK_EQ(0, bthread_id_create2(&cid, this, HandleSocketFailed));
1236 if (!target->compare_exchange_strong(loaded, cid.value,
1237 butil::memory_order_relaxed)) {
1238 bthread_id_cancel(cid);
1239 cid.value = loaded;
1240 }
1241 return cid;
1242 }
695 int bthread_id_create2(
696 bthread_id_t* id, void* data,
697 int (*on_error)(bthread_id_t, void*, int, const std::string&)) {
698 return bthread::id_create_impl(
699 id, data, NULL,
700 (on_error ? on_error : bthread::default_bthread_id_on_error2));
701 }
这里比如retry次数为3,那么这里bthread_id_lock_and_reset_range
:
405 int bthread_id_lock_and_reset_range_verbose(
406 bthread_id_t id, void **pdata, int range, const char *location) {
407 bthread::Id* const meta = address_resource(bthread::get_slot(id));
408 if (!meta) {
409 return EINVAL;
410 }
411 const uint32_t id_ver = bthread::get_version(id);
412 uint32_t* butex = meta->butex;
413 bool ever_contended = false;
414 meta->mutex.lock();
415 while (meta->has_version(id_ver)) {
416 if (*butex == meta->first_ver) {
417 // contended locker always wakes up the butex at unlock.
418 meta->lock_location = location;
419 if (range == 0) {
420 // fast path
421 } else if (range < 0 ||
422 range > bthread::ID_MAX_RANGE ||
423 range + meta->first_ver <= meta->locked_ver) {
429 } else {
430 meta->locked_ver = meta->first_ver + range;
431 }
432 *butex = (ever_contended ? meta->contended_ver() : meta->locked_ver);
433 meta->mutex.unlock();
434 if (pdata) {
435 *pdata = meta->data;
436 }
437 return 0;
438 } else if (*butex != meta->unlockable_ver()) {
439 *butex = meta->contended_ver();
440 uint32_t expected_ver = *butex;
441 meta->mutex.unlock();
442 ever_contended = true;
443 if (bthread::butex_wait(butex, expected_ver, NULL) < 0 &&
444 errno != EWOULDBLOCK && errno != EINTR) {
445 return errno;
446 }
447 meta->mutex.lock();
448 } else { // bthread_id_about_to_destroy was called.
449 meta->mutex.unlock();
450 return EPERM;
451 }
452 }
453 meta->mutex.unlock();
454 return EINVAL;
455 }
167 inline uint32_t get_version(bthread_id_t id) {
168 return (uint32_t)(id.value & 0xFFFFFFFFul);
169 }
当rpc的响应回来时会进行bthread_id_lock
,那么可能会执行if (*butex != meta->unlockable_ver())
,执行*butex = meta->contended_ver()
,因为后面别处unlock时可能会进行wake。接着尝试butex_wait
。
974 // Make versioned correlation_id.
975 // call_id : unversioned, mainly for ECANCELED and ERPCTIMEDOUT
976 // call_id + 1 : first try.
977 // call_id + 2 : retry 1
978 // ...
979 // call_id + N + 1 : retry N
980 // All ids except call_id are versioned. Say if we've sent retry 1 and
981 // a failed response of first try comes back, it will be ignored.
595 CallId current_id() const {
596 CallId id = { _correlation_id.value + _current_call.nretry + 1 };
597 return id;
598 }
并不会有竞争,此时*first_ver=1
不变,*locked_ver=2-->*locked_ver=6
,*butex=2-->*butex=6
;如果有竞争可能要butex_wait
。
接着发起正常的rpc请求(非retry的请求),此时的id在原来的基础上+1,但get_slot
获取到的位移量还是同一个,这里假设这个请求超时,此时bthread_id_unlock
,但并不需要wake:
568 int bthread_id_unlock(bthread_id_t id) {
570 bthread::Id* const meta = address_resource(bthread::get_slot(id));
571 if (!meta) {
572 return EINVAL;
573 }
574 uint32_t* butex = meta->butex;
575 // Release fence makes sure all changes made before signal visible to
576 // woken-up waiters.
577 const uint32_t id_ver = bthread::get_version(id);
578 meta->mutex.lock();
579 if (!meta->has_version(id_ver)) {
580 meta->mutex.unlock();
582 return EINVAL;
583 }
584 if (*butex == meta->first_ver) {
585 meta->mutex.unlock();
587 return EPERM;
588 }
589 bthread::PendingError front;
590 if (meta->pending_q.pop(&front)) {
592 meta->lock_location = front.location;
593 meta->mutex.unlock();
594 if (meta->on_error) {
595 return meta->on_error(front.id, meta->data, front.error_code);
596 } else {
597 return meta->on_error2(front.id, meta->data, front.error_code,
598 front.error_text);
599 }
600 } else {
602 const bool contended = (*butex == meta->contended_ver());
603 *butex = meta->first_ver;
604 meta->mutex.unlock();
605 if (contended) {
606 // We may wake up already-reused id, but that's OK.
607 bthread::butex_wake(butex);
608 }
609 return 0;
610 }
611 }
bthread_id_unlock
中因pending_q
无其他任务,故*butex = meta->first_ver
最后*butex=1
,如果之前有其他rpc进行wait则可能唤醒,否则pop一个任务并执行。
若这个正常的请求回来的时候,在baidu_std协议中ProcessRpcResponse执行bthread_id_lock(cid, (void**)&cntl)
:
563 int bthread_id_lock_verbose(bthread_id_t id, void** pdata,
564 const char *location) {
566 return bthread_id_lock_and_reset_range_verbose(id, pdata, 0, location);
567 }
如果此时发送请求的已经超时,此时回调HandleTimeout,并:
458 int bthread_id_error_verbose(bthread_id_t id, int error_code,
459 const char *location) {
460 return bthread_id_error2_verbose(id, error_code, std::string(), location);
461 }
714 int bthread_id_error2_verbose(bthread_id_t id, int error_code,
715 const std::string& error_text,
716 const char *location) {
717 bthread::Id* const meta = address_resource(bthread::get_slot(id));
718 if (!meta) {
719 return EINVAL;
720 }
721 const uint32_t id_ver = bthread::get_version(id);
722 uint32_t* butex = meta->butex;
723 meta->mutex.lock();
724 if (!meta->has_version(id_ver)) {
725 meta->mutex.unlock();
726 return EINVAL;
727 }
728 if (*butex == meta->first_ver) {
729 *butex = meta->locked_ver;
730 meta->lock_location = location;
731 meta->mutex.unlock();
732 if (meta->on_error) {
733 return meta->on_error(id, meta->data, error_code);
734 } else {
735 return meta->on_error2(id, meta->data, error_code, error_text);
736 }
737 } else {
738 bthread::PendingError e;
739 e.id = id;
740 e.error_code = error_code;
741 e.error_text = error_text;
742 e.location = location;
743 meta->pending_q.push(e);
744 meta->mutex.unlock();
745 return 0;
746 }
747 }
若rpc上下文已经销毁等原因,直接返回EINVAL
,否则若没有同一个rpc其他retry加锁则,加锁并执行错误处理函数,否则push一个PendingError
由其他unlock的bthread执行。在执行某个rpc一些代码时,是需要阻止其他bthread执行同一个rpc的代码段。
当没有其他rpc互斥时,即*butex == meta->first_ver
,加锁*butex = (ever_contended ? meta->contended_ver() : meta->locked_ver)
,这里要判断是否有contended
,因为在后面解锁时const bool contended = (*butex == meta->contended_ver())
可能要唤醒其他同步的rpc:
482 const bool contended = (*butex == meta->contended_ver());
483 *butex = meta->unlockable_ver();
484 meta->mutex.unlock();
485 if (contended) {
486 // wake up all waiting lockers.
487 bthread::butex_wake_except(butex, 0);
488 }
剩下其他接口:
516 int bthread_id_join(bthread_id_t id) {
517 const bthread::IdResourceId slot = bthread::get_slot(id);
518 bthread::Id* const meta = address_resource(slot);
519 if (!meta) {
520 // The id is not created yet, this join is definitely wrong.
521 return EINVAL;
522 }
523 const uint32_t id_ver = bthread::get_version(id);
524 uint32_t* join_butex = meta->join_butex;
525 while (1) {
526 meta->mutex.lock();
527 const bool has_ver = meta->has_version(id_ver);
528 const uint32_t expected_ver = *join_butex;
529 meta->mutex.unlock();
530 if (!has_ver) {
531 break;
532 }
533 if (bthread::butex_wait(join_butex, expected_ver, NULL) < 0 &&
534 errno != EWOULDBLOCK && errno != EINTR) {
535 return errno;
536 }
537 }
538 return 0;
539 }
492 int bthread_id_cancel(bthread_id_t id) {
493 bthread::Id* const meta = address_resource(bthread::get_slot(id));
494 if (!meta) {
495 return EINVAL;
496 }
497 uint32_t* butex = meta->butex;
498 const uint32_t id_ver = bthread::get_version(id);
499 meta->mutex.lock();
500 if (!meta->has_version(id_ver)) {
501 meta->mutex.unlock();
502 return EINVAL;
503 }
504 if (*butex != meta->first_ver) {
505 meta->mutex.unlock();
506 return EPERM;
507 }
508 *butex = meta->end_ver();
509 meta->first_ver = *butex;
510 meta->locked_ver = *butex;
511 meta->mutex.unlock();
512 return_resource(bthread::get_slot(id));
513 return 0;
514 }
616 int bthread_id_unlock_and_destroy(bthread_id_t id) {
617 bthread::Id* const meta = address_resource(bthread::get_slot(id));
618 if (!meta) {
619 return EINVAL;
620 }
622 uint32_t* butex = meta->butex;
623 uint32_t* join_butex = meta->join_butex;
624 const uint32_t id_ver = bthread::get_version(id);
625 meta->mutex.lock();
626 if (!meta->has_version(id_ver)) {
627 meta->mutex.unlock();
629 return EINVAL;
630 }
631 if (*butex == meta->first_ver) {
632 meta->mutex.unlock();
634 return EPERM;
635 }
636 const uint32_t next_ver = meta->end_ver();
637 *butex = next_ver;
638 *join_butex = next_ver;
639 meta->first_ver = next_ver;
640 meta->locked_ver = next_ver;
641 meta->pending_q.clear();
642 meta->mutex.unlock();
643 // Notice that butex_wake* returns # of woken-up, not successful or not.
644 bthread::butex_wake_except(butex, 0);
645 bthread::butex_wake_all(join_butex);
646 return_resource(bthread::get_slot(id));
647 return 0;
648 }
网友评论