美文网首页
brpc之bthread_id源码分析

brpc之bthread_id源码分析

作者: fooboo | 来源:发表于2019-09-28 21:34 被阅读0次

    在之前分析rpc时,发现在代码中使用到此结构,当时看到时候感觉挺复杂的,从名字上看还以为仅仅是个uint64_t的id:

    189 typedef struct {
    190     uint64_t value;
    191 } bthread_id_t;
    

    实际分配是通过Id结构,并make_id生成的:

    325 static int id_create_impl(
    326     bthread_id_t* id, void* data,
    327     int (*on_error)(bthread_id_t, void*, int),
    328     int (*on_error2)(bthread_id_t, void*, int, const std::string&)) {
    329     IdResourceId slot;
    330     Id* const meta = get_resource(&slot);
    331     if (meta) {
    332         meta->data = data;
    333         meta->on_error = on_error;
    334         meta->on_error2 = on_error2;
    335         CHECK(meta->pending_q.empty());
    336         uint32_t* butex = meta->butex;
    337         if (0 == *butex || *butex + ID_MAX_RANGE + 2 < *butex) {
    338             // Skip 0 so that bthread_id_t is never 0
    339             // avoid overflow to make comparisons simpler.
    340             *butex = 1;
    341         }
    342         *meta->join_butex = *butex;
    343         meta->first_ver = *butex;
    344         meta->locked_ver = *butex + 1;
    345         *id = make_id(*butex, slot);
    346         return 0;
    347     }
    348     return ENOMEM;
    349 }
    
    156 inline bthread_id_t make_id(uint32_t version, IdResourceId slot) {
    157     const bthread_id_t tmp =
    158         { (((uint64_t)slot.value) << 32) | (uint64_t)version };
    159     return tmp;
    160 }
    

    以上分配一个Id用于同步rpc各个环节,且该id其实是索引一个Id资源对象。由资源池中的索引id和版本号组成。

    111 struct BAIDU_CACHELINE_ALIGNMENT Id {
    112     // first_ver ~ locked_ver - 1: unlocked versions
    113     // locked_ver: locked
    114     // unlockable_ver: locked and about to be destroyed
    115     // contended_ver: locked and contended
    116     uint32_t first_ver;
    117     uint32_t locked_ver;
    118     internal::FastPthreadMutex mutex;
    119     void* data;
    120     int (*on_error)(bthread_id_t, void*, int);
    121     int (*on_error2)(bthread_id_t, void*, int, const std::string&);
    122     const char *lock_location;
    123     uint32_t* butex;
    124     uint32_t* join_butex;
    125     SmallQueue<PendingError, 2> pending_q; 
    126         
    127     Id() {
    128         // Although value of the butex(as version part of bthread_id_t)
    129         // does not matter, we set it to 0 to make program more deterministic.
    130         butex = bthread::butex_create_checked<uint32_t>();
    131         join_butex = bthread::butex_create_checked<uint32_t>();
    132         *butex = 0;
    133         *join_butex = 0;
    134     }
    141     inline bool has_version(uint32_t id_ver) const {
    142         return id_ver >= first_ver && id_ver < locked_ver;
    143     }
    135     //more code...
    150 };
    

    has_version判断此时的版本号是否合法,因为一个rpc回来时,可能上下文已经没了,在bthread_id_cancel/bthread_id_unlock_and_destroy时重置。SmallQueue类是一个queue,当元素个数小于等于2时使用数组结构,多出来的时候使用堆结构,不贴相关代码:

     96     int _begin;
     97     int _size;
     98     T _c[N];
     99     std::deque<T>* _full;
    

    回到上面,为什么id要这么生成,解决什么问题,这里引用相关说明:
    “bthread_id是一个特殊的同步结构,它可以互斥RPC过程中的不同环节,也可以O(1)时间内找到RPC上下文(即Controller)。注意,这里我们谈论的是bthread_id_t,不是bthread_t(bthread的tid),这个名字起的确实不太好,容易混淆。

    具体来说,bthread_id解决的问题有:

    • 在发送RPC过程中response回来了,处理response的代码和发送代码产生竞争。
    • 设置timer后很快触发了,超时处理代码和发送代码产生竞争。
    • 重试产生的多个response同时回来产生的竞争。
    • 通过correlation_id在O(1)时间内找到对应的RPC上下文,而无需建立从correlation_id到RPC上下文的全局哈希表。
    • 取消RPC。

    上文提到的bug在其他rpc框架中广泛存在,下面我们来看下brpc是如何通过bthread_id解决这些问题的。

    bthread_id包括两部分,一个是用户可见的64位id,另一个是对应的不可见的bthread::Id结构体。用户接口都是操作id的。从id映射到结构体的方式和brpc中的其他结构类似:32位是内存池的偏移量,32位是version。前者O(1)时间定位,后者防止ABA问题。”

    在发起rpc前,会通过call_id返回一个id:

    398     const CallId correlation_id = cntl->call_id();
    401     const int rc = bthread_id_lock_and_reset_range(
    402                     correlation_id, NULL, 2 + cntl->max_retry());
    
    1224 CallId Controller::call_id() {
    1225     butil::atomic<uint64_t>* target =
    1226         (butil::atomic<uint64_t>*)&_correlation_id.value;
    1227     uint64_t loaded = target->load(butil::memory_order_relaxed);
    1228     if (loaded) {
    1229         const CallId id = { loaded };
    1230         return id;           
    1231     }
    1232     // Optimistic locking.                                                   
    1233     CallId cid = { 0 };
    1234     // The range of this id will be reset in Channel::CallMethod
    1235     CHECK_EQ(0, bthread_id_create2(&cid, this, HandleSocketFailed));
    1236     if (!target->compare_exchange_strong(loaded, cid.value,
    1237                                          butil::memory_order_relaxed)) {
    1238         bthread_id_cancel(cid);
    1239         cid.value = loaded;
    1240     }   
    1241     return cid;
    1242 }
    
    695 int bthread_id_create2(
    696     bthread_id_t* id, void* data,
    697     int (*on_error)(bthread_id_t, void*, int, const std::string&)) {
    698     return bthread::id_create_impl(
    699         id, data, NULL,
    700         (on_error ? on_error : bthread::default_bthread_id_on_error2));
    701 }
    

    这里比如retry次数为3,那么这里bthread_id_lock_and_reset_range

    405 int bthread_id_lock_and_reset_range_verbose(
    406     bthread_id_t id, void **pdata, int range, const char *location) {
    407     bthread::Id* const meta = address_resource(bthread::get_slot(id));
    408     if (!meta) {
    409         return EINVAL;
    410     }
    411     const uint32_t id_ver = bthread::get_version(id);
    412     uint32_t* butex = meta->butex;
    413     bool ever_contended = false;
    414     meta->mutex.lock();
    415     while (meta->has_version(id_ver)) {
    416         if (*butex == meta->first_ver) {
    417             // contended locker always wakes up the butex at unlock.
    418             meta->lock_location = location;
    419             if (range == 0) {
    420                 // fast path
    421             } else if (range < 0 ||
    422                        range > bthread::ID_MAX_RANGE ||
    423                        range + meta->first_ver <= meta->locked_ver) {
    429             } else {
    430                 meta->locked_ver = meta->first_ver + range;
    431             }
    432             *butex = (ever_contended ? meta->contended_ver() : meta->locked_ver);
    433             meta->mutex.unlock();
    434             if (pdata) {
    435                 *pdata = meta->data;
    436             }
    437             return 0;
    438         } else if (*butex != meta->unlockable_ver()) {
    439             *butex = meta->contended_ver();
    440             uint32_t expected_ver = *butex;
    441             meta->mutex.unlock();
    442             ever_contended = true;
    443             if (bthread::butex_wait(butex, expected_ver, NULL) < 0 &&
    444                 errno != EWOULDBLOCK && errno != EINTR) {
    445                 return errno;
    446             }
    447             meta->mutex.lock();
    448         } else { // bthread_id_about_to_destroy was called.
    449             meta->mutex.unlock();
    450             return EPERM;
    451         }
    452     }
    453     meta->mutex.unlock();
    454     return EINVAL;
    455 }
    
    167 inline uint32_t get_version(bthread_id_t id) {
    168     return (uint32_t)(id.value & 0xFFFFFFFFul);
    169 }
    

    当rpc的响应回来时会进行bthread_id_lock,那么可能会执行if (*butex != meta->unlockable_ver()),执行*butex = meta->contended_ver(),因为后面别处unlock时可能会进行wake。接着尝试butex_wait

     974     // Make versioned correlation_id.
     975     // call_id         : unversioned, mainly for ECANCELED and ERPCTIMEDOUT
     976     // call_id + 1     : first try.
     977     // call_id + 2     : retry 1
     978     // ...
     979     // call_id + N + 1 : retry N
     980     // All ids except call_id are versioned. Say if we've sent retry 1 and
     981     // a failed response of first try comes back, it will be ignored.
    595     CallId current_id() const {
    596         CallId id = { _correlation_id.value + _current_call.nretry + 1 };
    597         return id;
    598     }
    

    并不会有竞争,此时*first_ver=1不变,*locked_ver=2-->*locked_ver=6*butex=2-->*butex=6;如果有竞争可能要butex_wait

    接着发起正常的rpc请求(非retry的请求),此时的id在原来的基础上+1,但get_slot获取到的位移量还是同一个,这里假设这个请求超时,此时bthread_id_unlock,但并不需要wake:

    568 int bthread_id_unlock(bthread_id_t id) {
    570     bthread::Id* const meta = address_resource(bthread::get_slot(id));
    571     if (!meta) {
    572         return EINVAL;
    573     }
    574     uint32_t* butex = meta->butex;
    575     // Release fence makes sure all changes made before signal visible to
    576     // woken-up waiters.
    577     const uint32_t id_ver = bthread::get_version(id);
    578     meta->mutex.lock();
    579     if (!meta->has_version(id_ver)) {
    580         meta->mutex.unlock();
    582         return EINVAL;
    583     }
    584     if (*butex == meta->first_ver) {
    585         meta->mutex.unlock();
    587         return EPERM;
    588     }
    589     bthread::PendingError front;
    590     if (meta->pending_q.pop(&front)) {
    592         meta->lock_location = front.location;
    593         meta->mutex.unlock();
    594         if (meta->on_error) {
    595             return meta->on_error(front.id, meta->data, front.error_code);
    596         } else {
    597             return meta->on_error2(front.id, meta->data, front.error_code,
    598                                    front.error_text);
    599         }
    600     } else {
    602         const bool contended = (*butex == meta->contended_ver());
    603         *butex = meta->first_ver;
    604         meta->mutex.unlock();
    605         if (contended) {
    606             // We may wake up already-reused id, but that's OK.
    607             bthread::butex_wake(butex);
    608         }
    609         return 0;
    610     }
    611 }
    

    bthread_id_unlock中因pending_q无其他任务,故*butex = meta->first_ver最后*butex=1,如果之前有其他rpc进行wait则可能唤醒,否则pop一个任务并执行。

    若这个正常的请求回来的时候,在baidu_std协议中ProcessRpcResponse执行bthread_id_lock(cid, (void**)&cntl)

    563 int bthread_id_lock_verbose(bthread_id_t id, void** pdata,
    564                             const char *location) {
    566     return bthread_id_lock_and_reset_range_verbose(id, pdata, 0, location);
    567 }
    

    如果此时发送请求的已经超时,此时回调HandleTimeout,并:

    458 int bthread_id_error_verbose(bthread_id_t id, int error_code, 
    459                              const char *location) {
    460     return bthread_id_error2_verbose(id, error_code, std::string(), location);
    461 }
    
    714 int bthread_id_error2_verbose(bthread_id_t id, int error_code,
    715                               const std::string& error_text,
    716                               const char *location) {
    717     bthread::Id* const meta = address_resource(bthread::get_slot(id));
    718     if (!meta) {             
    719         return EINVAL;
    720     }
    721     const uint32_t id_ver = bthread::get_version(id);
    722     uint32_t* butex = meta->butex;
    723     meta->mutex.lock();
    724     if (!meta->has_version(id_ver)) {
    725         meta->mutex.unlock();
    726         return EINVAL;
    727     }
    728     if (*butex == meta->first_ver) {
    729         *butex = meta->locked_ver;
    730         meta->lock_location = location;
    731         meta->mutex.unlock();
    732         if (meta->on_error) {
    733             return meta->on_error(id, meta->data, error_code);
    734         } else {
    735             return meta->on_error2(id, meta->data, error_code, error_text);
    736         }
    737     } else {
    738         bthread::PendingError e;
    739         e.id = id;
    740         e.error_code = error_code;
    741         e.error_text = error_text;
    742         e.location = location;
    743         meta->pending_q.push(e);
    744         meta->mutex.unlock();
    745         return 0;
    746     }
    747 }
    

    若rpc上下文已经销毁等原因,直接返回EINVAL,否则若没有同一个rpc其他retry加锁则,加锁并执行错误处理函数,否则push一个PendingError由其他unlock的bthread执行。在执行某个rpc一些代码时,是需要阻止其他bthread执行同一个rpc的代码段。

    当没有其他rpc互斥时,即*butex == meta->first_ver,加锁*butex = (ever_contended ? meta->contended_ver() : meta->locked_ver),这里要判断是否有contended,因为在后面解锁时const bool contended = (*butex == meta->contended_ver())可能要唤醒其他同步的rpc:

    482     const bool contended = (*butex == meta->contended_ver());
    483     *butex = meta->unlockable_ver();
    484     meta->mutex.unlock();
    485     if (contended) {
    486         // wake up all waiting lockers.
    487         bthread::butex_wake_except(butex, 0);
    488     }
    

    剩下其他接口:

    516 int bthread_id_join(bthread_id_t id) {
    517     const bthread::IdResourceId slot = bthread::get_slot(id);
    518     bthread::Id* const meta = address_resource(slot);
    519     if (!meta) {
    520         // The id is not created yet, this join is definitely wrong.
    521         return EINVAL;
    522     }
    523     const uint32_t id_ver = bthread::get_version(id);
    524     uint32_t* join_butex = meta->join_butex;
    525     while (1) {
    526         meta->mutex.lock();
    527         const bool has_ver = meta->has_version(id_ver);
    528         const uint32_t expected_ver = *join_butex;
    529         meta->mutex.unlock();
    530         if (!has_ver) {
    531             break;           
    532         }
    533         if (bthread::butex_wait(join_butex, expected_ver, NULL) < 0 &&
    534             errno != EWOULDBLOCK && errno != EINTR) {
    535             return errno;
    536         }
    537     }
    538     return 0;
    539 }
    
    492 int bthread_id_cancel(bthread_id_t id) {
    493     bthread::Id* const meta = address_resource(bthread::get_slot(id));
    494     if (!meta) {
    495         return EINVAL;
    496     }
    497     uint32_t* butex = meta->butex; 
    498     const uint32_t id_ver = bthread::get_version(id);
    499     meta->mutex.lock();
    500     if (!meta->has_version(id_ver)) {
    501         meta->mutex.unlock();
    502         return EINVAL;
    503     }
    504     if (*butex != meta->first_ver) {
    505         meta->mutex.unlock();
    506         return EPERM;
    507     }       
    508     *butex = meta->end_ver();
    509     meta->first_ver = *butex;
    510     meta->locked_ver = *butex;
    511     meta->mutex.unlock();
    512     return_resource(bthread::get_slot(id));
    513     return 0;
    514 }
    
    616 int bthread_id_unlock_and_destroy(bthread_id_t id) {
    617     bthread::Id* const meta = address_resource(bthread::get_slot(id));
    618     if (!meta) { 
    619         return EINVAL;
    620     }   
    622     uint32_t* butex = meta->butex;
    623     uint32_t* join_butex = meta->join_butex;
    624     const uint32_t id_ver = bthread::get_version(id);
    625     meta->mutex.lock();
    626     if (!meta->has_version(id_ver)) {
    627         meta->mutex.unlock();
    629         return EINVAL;
    630     }   
    631     if (*butex == meta->first_ver) {
    632         meta->mutex.unlock();
    634         return EPERM; 
    635     }
    636     const uint32_t next_ver = meta->end_ver();
    637     *butex = next_ver;
    638     *join_butex = next_ver;
    639     meta->first_ver = next_ver;
    640     meta->locked_ver = next_ver;
    641     meta->pending_q.clear();
    642     meta->mutex.unlock();
    643     // Notice that butex_wake* returns # of woken-up, not successful or not.
    644     bthread::butex_wake_except(butex, 0);
    645     bthread::butex_wake_all(join_butex);
    646     return_resource(bthread::get_slot(id));
    647     return 0;
    648 }
    

    参考
    bthread_id.md

    相关文章

      网友评论

          本文标题:brpc之bthread_id源码分析

          本文链接:https://www.haomeiwen.com/subject/cjtyuctx.html