美文网首页
DPDK rcu源码分析

DPDK rcu源码分析

作者: fooboo | 来源:发表于2020-01-31 00:01 被阅读0次

    这篇分析下dpdk中rcu的实现,早期关注的版本中并未有此实现,另外我发现19.11版本中的cuckoo hash相比之前复杂些了,这里暂时不再分析。rcu相关的实现代码在librte_rcu目录中,其中的api都是无锁线程安全的“This is implemented as a lock-free function. It is multi-thread safe.”

    这里简单介绍下rcu是什么,用在什么场景下,以及为什么会有这种实现的,其他更多资料可以搜索。这里复制下已有的解释:

    RCU(Read-Copy Update)是数据同步的一种方式,在当前的Linux内核中发挥着重要的作用。RCU主要针对的数据对象是链表,目的是提高遍历读取数据的效率,为了达到目的使用RCU机制读取数据的时候不对链表进行耗时的加锁操作。这样在同一时间可以有多个线程同时读取该链表,并且允许一个线程对链表进行修改。RCU适用于需要频繁的读取数据,而相应修改数据并不多的情景。

    这里有大概实现思路LINUX中的RCU机制的分析,实现思路差不多,对读写锁的一种优化,应用在读多写少的情况下。对于不同的应用场景,可能有更好的实现方式,不容易出错,因为无锁实现难度较高且容易出bug,难以调试。

    对于dpdk rcu的使用方式可以参考下test/test_rcu_qsbr,以多读一写为情况,这里贴下注释及基本数据结构:

      8 /**
      9  * @file
     10  * RTE Quiescent State Based Reclamation (QSBR)
     11  *
     12  * Quiescent State (QS) is any point in the thread execution
     13  * where the thread does not hold a reference to a data structure
     14  * in shared memory. While using lock-less data structures, the writer
     15  * can safely free memory once all the reader threads have entered
     16  * quiescent state.
     17  *
     18  * This library provides the ability for the readers to report quiescent
     19  * state and for the writers to identify when all the readers have
     20  * entered quiescent state.
     21  */
    
     72 /* Worker thread counter */
     73 struct rte_rcu_qsbr_cnt {
     74     uint64_t cnt;
     75     /**< Quiescent state counter. Value 0 indicates the thread is offline
     76      *   64b counter is used to avoid adding more code to address
     77      *   counter overflow. Changing this to 32b would require additional
     78      *   changes to various APIs.
     79      */
     82 } __rte_cache_aligned;
    
     88 /* RTE Quiescent State variable structure.
     89  * This structure has two elements that vary in size based on the
     90  * 'max_threads' parameter.
     91  * 1) Quiescent state counter array
     92  * 2) Register thread ID array
     93  */
     94 struct rte_rcu_qsbr {
     95     uint64_t token __rte_cache_aligned;
     96     /**< Counter to allow for multiple concurrent quiescent state queries */
     97     uint64_t acked_token;
     98     /**< Least token acked by all the threads in the last call to
     99      *   rte_rcu_qsbr_check API.
    100      */
    101 
    102     uint32_t num_elems __rte_cache_aligned;
    103     /**< Number of elements in the thread ID array */
    104     uint32_t num_threads;
    105     /**< Number of threads currently using this QS variable */
    106     uint32_t max_threads;
    107     /**< Maximum number of threads using this QS variable */
    108 
    109     struct rte_rcu_qsbr_cnt qsbr_cnt[0] __rte_cache_aligned;
    110     /**< Quiescent state counter array of 'max_threads' elements */
    111 
    112     /**< Registered thread IDs are stored in a bitmap array,
    113      *   after the quiescent state counter array.
    114      */
    115 } __rte_cache_aligned;
    

    创建及初始化:

     51 int
     52 rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads)
     53 {
     54     size_t sz;
     55 
     56     if (v == NULL) {
     59         rte_errno = EINVAL;
     61         return 1;
     62     }
     63 
     64     sz = rte_rcu_qsbr_get_memsize(max_threads);
     65     if (sz == 1)
     66         return 1;
     67 
     68     /* Set all the threads to offline */
     69     memset(v, 0, sz);
     70     v->max_threads = max_threads;
     71     v->num_elems = RTE_ALIGN_MUL_CEIL(max_threads,
     72             __RTE_QSBR_THRID_ARRAY_ELM_SIZE) /
     73             __RTE_QSBR_THRID_ARRAY_ELM_SIZE;
     74     v->token = __RTE_QSBR_CNT_INIT;
     75     v->acked_token = __RTE_QSBR_CNT_INIT - 1;
     76 
     77     return 0;
     78 }
    
     25 size_t
     26 rte_rcu_qsbr_get_memsize(uint32_t max_threads)
     27 {   
     28     size_t sz;
     29     
     30     if (max_threads == 0) {
     34         rte_errno = EINVAL;     
     36         return 1;
     37     }
     38     
     39     sz = sizeof(struct rte_rcu_qsbr);
     40     
     41     /* Add the size of quiescent state counter array */
     42     sz += sizeof(struct rte_rcu_qsbr_cnt) * max_threads;
     43     
     44     /* Add the size of the registered thread ID bitmap array */
     45     sz += __RTE_QSBR_THRID_ARRAY_SIZE(max_threads);
     46     
     47     return sz;
     48 }
    

    以上会开辟struct rte_rcu_qsbr本身的空间及struct rte_rcu_qsbr尾部的max_threads个struct rte_rcu_qsbr_cnt空间,和需要多少字节的bitmap空间,每个线程占一个struct rte_rcu_qsbr_cnt。初始token为1,类似数据版本号。

    后面是线程的注册和取消,主要实现是根据struct rte_rcu_qsbr结构首地址,获取thread_id对应的bitmap空间所在的uint64_t:

     83 int
     84 rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id)
     85 {
     86     unsigned int i, id, success;
     87     uint64_t old_bmap, new_bmap;
     88 
    100     id = thread_id & __RTE_QSBR_THRID_MASK;
    101     i = thread_id >> __RTE_QSBR_THRID_INDEX_SHIFT;
    103     /* Make sure that the counter for registered threads does not
    104      * go out of sync. Hence, additional checks are required.
    105      */
    106     /* Check if the thread is already registered */
    107     old_bmap = __atomic_load_n(__RTE_QSBR_THRID_ARRAY_ELM(v, i),
    108                     __ATOMIC_RELAXED);
    109     if (old_bmap & 1UL << id)
    110         return 0;
    111 
    112     do {
    113         new_bmap = old_bmap | (1UL << id);
    114         success = __atomic_compare_exchange(
    115                     __RTE_QSBR_THRID_ARRAY_ELM(v, i),
    116                     &old_bmap, &new_bmap, 0,
    117                     __ATOMIC_RELEASE, __ATOMIC_RELAXED);
    118 
    119         if (success)
    120             __atomic_fetch_add(&v->num_threads,
    121                         1, __ATOMIC_RELAXED);
    122         else if (old_bmap & (1UL << id))
    123             /* Someone else registered this thread.
    124              * Counter should not be incremented.
    125              */
    126             return 0;
    127     } while (success == 0);
    128 
    129     return 0;
    130 }
    
    #define __RTE_QSBR_THRID_ARRAY_ELM(v, i) ((uint64_t *) \
        ((struct rte_rcu_qsbr_cnt *)(v + 1) + v->max_threads) + i)
    

    rte_rcu_qsbr_thread_unregister跟上面差不多,这里换成new_bmap = old_bmap & ~(1UL << id);

    这里以例子来说明基本使用方式,这里不讨论其他的:

     634     do {
     635         rte_rcu_qsbr_thread_register(temp, lcore_id);
     636         rte_rcu_qsbr_thread_online(temp, lcore_id);
     637         for (i = 0; i < TOTAL_ENTRY; i++) {
     638             rte_rcu_qsbr_lock(temp, lcore_id);
     639             if (rte_hash_lookup_data(hash, keys+i,
     640                     (void **)&pdata) != -ENOENT) {
     641                 pdata[lcore_id] = 0;
     642                 while (pdata[lcore_id] < COUNTER_VALUE)
     643                     pdata[lcore_id]++;
     644             }
     645             rte_rcu_qsbr_unlock(temp, lcore_id);
     646         }
     647         /* Update quiescent state counter */
     648         rte_rcu_qsbr_quiescent(temp, lcore_id);
     649         rte_rcu_qsbr_thread_offline(temp, lcore_id);
     650         rte_rcu_qsbr_thread_unregister(temp, lcore_id);
     651     } while (!writer_done);
    
    206 /**
    207  * @warning
    216  * Any registered reader thread that wants to report its quiescent state must
    217  * call this API before calling rte_rcu_qsbr_quiescent. This can be called
    218  * during initialization or as part of the packet processing loop.
    219  *
    220  * The reader thread must call rte_rcu_qsbr_thread_offline API, before
    221  * calling any functions that block, to ensure that rte_rcu_qsbr_check
    222  * API does not wait indefinitely for the reader thread to update its QS.
    223  *
    224  * The reader thread must call rte_rcu_thread_online API, after the blocking
    225  * function call returns, to ensure that rte_rcu_qsbr_check API
    226  * waits for the reader thread to update its quiescent state.
    233  */
    
    235 static __rte_always_inline void
    236 rte_rcu_qsbr_thread_online(struct rte_rcu_qsbr *v, unsigned int thread_id)
    237 {
    238     uint64_t t;
    239 
    240     RTE_ASSERT(v != NULL && thread_id < v->max_threads);
    244 
    245     /* Copy the current value of token.
    246      * The fence at the end of the function will ensure that
    247      * the following will not move down after the load of any shared
    248      * data structure.
    249      */
    250     t = __atomic_load_n(&v->token, __ATOMIC_RELAXED);
    251 
    252     /* __atomic_store_n(cnt, __ATOMIC_RELAXED) is used to ensure
    253      * 'cnt' (64b) is accessed atomically.
    254      */
    255     __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
    256         t, __ATOMIC_RELAXED);
    257 
    258     /* The subsequent load of the data structure should not
    259      * move above the store. Hence a store-load barrier
    260      * is required.
    261      * If the load of the data structure moves above the store,
    262      * writer might not see that the reader is online, even though
    263      * the reader is referencing the shared data structure.
    264      */
    265 #ifdef RTE_ARCH_X86_64
    266     /* rte_smp_mb() for x86 is lighter */
    267     rte_smp_mb();
    268 #else
    269     __atomic_thread_fence(__ATOMIC_SEQ_CST);
    270 #endif
    271 }
    
    273 /**
    274  * @warning
    283  * This can be called during initialization or as part of the packet
    284  * processing loop.
    285  *
    286  * The reader thread must call rte_rcu_qsbr_thread_offline API, before
    287  * calling any functions that block, to ensure that rte_rcu_qsbr_check
    288  * API does not wait indefinitely for the reader thread to update its QS.
    289  *
    295  */
    297 static __rte_always_inline void
    298 rte_rcu_qsbr_thread_offline(struct rte_rcu_qsbr *v, unsigned int thread_id)
    299 {
    300     RTE_ASSERT(v != NULL && thread_id < v->max_threads);
    304 
    305     /* The reader can go offline only after the load of the
    306      * data structure is completed. i.e. any load of the
    307      * data strcture can not move after this store.
    308      */
    309 
    310     __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
    311         __RTE_QSBR_CNT_THR_OFFLINE, __ATOMIC_RELEASE);
    312 }
    

    以上registered reader thread在操作某块共享数据时,会调用rte_rcu_qsbr_thread_online,此时会读取共享数据的token,并设置到v->qsbr_cnt[thread_id].cnt中,这里使用C11中的原子指令,注释贴上是方便理解为何要这么使用。rte_rcu_qsbr_thread_offline设置v->qsbr_cnt[thread_id].cnt为0。rte_rcu_qsbr_lock/rte_rcu_qsbr_unlock是debug相关,不分析。

    442 static __rte_always_inline void
    443 rte_rcu_qsbr_quiescent(struct rte_rcu_qsbr *v, unsigned int thread_id)
    444 {
    445     uint64_t t;
    446 
    447     RTE_ASSERT(v != NULL && thread_id < v->max_threads);
    451 
    452     /* Acquire the changes to the shared data structure released
    453      * by rte_rcu_qsbr_start.
    454      * Later loads of the shared data structure should not move
    455      * above this load. Hence, use load-acquire.
    456      */
    457     t = __atomic_load_n(&v->token, __ATOMIC_ACQUIRE);
    458 
    459     /* Check if there are updates available from the writer.
    460      * Inform the writer that updates are visible to this reader.
    461      * Prior loads of the shared data structure should not move
    462      * beyond this store. Hence use store-release.
    463      */
    464     if (t != __atomic_load_n(&v->qsbr_cnt[thread_id].cnt, __ATOMIC_RELAXED))
    465         __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
    466                      t, __ATOMIC_RELEASE);
    470 }
    

    这里如果没有465行的判断会如何,即reader thread线程访问完共享数据后,当前的cnt不等于token的,会导致什么问题?可以多思索下。

    当writer thread要更新共享数据时,比如这里只有一个writer thread,更新多次:

     788     /* Delete element from the shared data structure */
     789     pos[0] = rte_hash_del_key(h[0], keys + 0);
     790     if (pos[0] < 0) {
     792         goto error;
     793     }
     794     /* Start the quiescent state query process */
     795     token[0] = rte_rcu_qsbr_start(t[0]);
     796 
     797     /* Delete element from the shared data structure */
     798     pos[1] = rte_hash_del_key(h[0], keys + 3);
     799     if (pos[1] < 0) {
     801         goto error;
     802     }
     803     /* Start the quiescent state query process */
     804     token[1] = rte_rcu_qsbr_start(t[0]);
    
     815     /* Check the quiescent state status */
     816     rte_rcu_qsbr_check(t[0], token[0], true);
     817     for (i = 0; i < 4; i++) {
     818         c = hash_data[0][0][enabled_core_ids[i]];
     819         if (c != COUNTER_VALUE && c != 0) {
     822             goto error;
     823         }
     824     }
     825 
     826     if (rte_hash_free_key_with_position(h[0], pos[0]) < 0) {
     828         goto error;
     829     }
     830     rte_free(hash_data[0][0]);
     831     hash_data[0][0] = NULL;
    
     833     /* Check the quiescent state status */
     834     rte_rcu_qsbr_check(t[0], token[1], true);
     835     for (i = 0; i < 4; i++) {
     836         c = hash_data[0][3][enabled_core_ids[i]];
     837         if (c != COUNTER_VALUE && c != 0) {
     840             goto error;
     841         }
     842     }
     843 
     844     if (rte_hash_free_key_with_position(h[0], pos[1]) < 0) {
     846         goto error;
     847     }
     848     rte_free(hash_data[0][3]);
     849     hash_data[0][3] = NULL;
    
    409 static __rte_always_inline uint64_t
    410 rte_rcu_qsbr_start(struct rte_rcu_qsbr *v)
    411 {
    412     uint64_t t;
    413 
    414     RTE_ASSERT(v != NULL);
    415 
    416     /* Release the changes to the shared data structure.
    417      * This store release will ensure that changes to any data
    418      * structure are visible to the workers before the token
    419      * update is visible.
    420      */
    421     t = __atomic_add_fetch(&v->token, 1, __ATOMIC_RELEASE);
    422 
    423     return t;
    424 }
    

    有数据更新时,自增token。如注释,当reader thread看到最新token时,能看见shared data structure的更新。

    598 /**
    599  * @warning
    608  * If this API is called with 'wait' set to true, the following
    609  * factors must be considered:
    610  *
    611  * 1) If the calling thread is also reporting the status on the
    612  * same QS variable, it must update the quiescent state status, before
    613  * calling this API.
    614  *
    615  * 2) In addition, while calling from multiple threads, only
    616  * one of those threads can be reporting the quiescent state status
    617  * on a given QS variable.
    631  */
    633 static __rte_always_inline int
    634 rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
    635 {
    636     RTE_ASSERT(v != NULL);
    637 
    638     /* Check if all the readers have already acknowledged this token */
    639     if (likely(t <= v->acked_token))
    640         return 1;
    641 
    642     if (likely(v->num_threads == v->max_threads))
    643         return __rte_rcu_qsbr_check_all(v, t, wait);
    644     else
    645         return __rte_rcu_qsbr_check_selective(v, t, wait);
    646 }
    

    如果token<= acked_tokenf直接返回,这里假设并没有所有线程都register:

    472 /* Check the quiescent state counter for registered threads only, assuming
    473  * that not all threads have registered.
    474  */
    475 static __rte_always_inline int
    476 __rte_rcu_qsbr_check_selective(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
    477 {
    478     uint32_t i, j, id;
    479     uint64_t bmap;
    480     uint64_t c;
    481     uint64_t *reg_thread_id;
    482     uint64_t acked_token = __RTE_QSBR_CNT_MAX;
    483 
    484     for (i = 0, reg_thread_id = __RTE_QSBR_THRID_ARRAY_ELM(v, 0);
    485         i < v->num_elems;
    486         i++, reg_thread_id++) {
    487         /* Load the current registered thread bit map before
    488          * loading the reader thread quiescent state counters.
    489          */
    490         bmap = __atomic_load_n(reg_thread_id, __ATOMIC_ACQUIRE);
    491         id = i << __RTE_QSBR_THRID_INDEX_SHIFT;
    493         while (bmap) {
    494             j = __builtin_ctzl(bmap);
    498             c = __atomic_load_n(
    499                     &v->qsbr_cnt[id + j].cnt,
    500                     __ATOMIC_ACQUIRE);
    504 
    505             /* Counter is not checked for wrap-around condition
    506              * as it is a 64b counter.
    507              */
    508             if (unlikely(c !=
    509                 __RTE_QSBR_CNT_THR_OFFLINE && c < t)) {
    510                 /* This thread is not in quiescent state */
    511                 if (!wait)
    512                     return 0;
    513 
    514                 rte_pause();
    515                 /* This thread might have unregistered.
    516                  * Re-read the bitmap.
    517                  */
    518                 bmap = __atomic_load_n(reg_thread_id,
    519                         __ATOMIC_ACQUIRE);
    520 
    521                 continue;
    522             }
    523 
    524             /* This thread is in quiescent state. Use the counter
    525              * to find the least acknowledged token among all the
    526              * readers.
    527              */
    528             if (c != __RTE_QSBR_CNT_THR_OFFLINE && acked_token > c)
    529                 acked_token = c;
    530 
    531             bmap &= ~(1UL << j);
    532         }
    533     }
    534 
    535     /* All readers are checked, update least acknowledged token.
    536      * There might be multiple writers trying to update this. There is
    537      * no need to update this very accurately using compare-and-swap.
    538      */
    539     if (acked_token != __RTE_QSBR_CNT_MAX)
    540         __atomic_store_n(&v->acked_token, acked_token,
    541             __ATOMIC_RELAXED);
    542 
    543     return 1;
    544 }
    

    以上会遍历bitmap中已注册的thread,如果线程的c != __RTE_QSBR_CNT_THR_OFFLINE && c < t则表示有其他线程在活动,这里则会pause一会,在arm64中是 asm volatile("yield" ::: "memory")[yield指令用来告知硬件系统,本cpu上执行的指令是polling操作,没有那么急迫,如果有任何的资源冲突,本cpu可以让出控制权。],x86是_mm_pause。大致是:PAUSE指令给处理器提了个醒:这段代码序列是个循环等待。处理器利用这个提示可以避免在大多数情况下的内存顺序违规,这将大幅提升性能。因为这个原因,所以推荐在循环等待中使用PAUSE指令。

    在等的过程中,可能线程已经unregistered所以会再次加载一下。整个过程找到线程中最小的token,然后更新acked_token。

    总结:
    对于读多写少的情况,若每次读或写加锁,在有竞争的情况下会有性能影响。早期实现可能是加读锁,这样读之间无竞争,修改时先拷贝一份或者新的数据,然后加写锁去修改老数据。但还是觉得锁有性能,这里使用原子和内存屏障相关的原语,使用版本号去控制,每个reader thread获取数据的时候,使用数据的版本号更新自己的,这样writer thead更新时,需要等reader thread先读完数据后,这里需要遍历检查所有的reader thread已有的版本号,若处于offline状态或版本号最新,这样可以更新数据,这样再更新数据版本号。

    相关文章

      网友评论

          本文标题:DPDK rcu源码分析

          本文链接:https://www.haomeiwen.com/subject/zknmthtx.html