DPDK rcu源码分析

DPDK rcu源码分析

作者: fooboo | 来源:发表于2020-01-31 00:01 被阅读0次

这篇分析下dpdk中rcu的实现,早期关注的版本中并未有此实现,另外我发现19.11版本中的cuckoo hash相比之前复杂些了,这里暂时不再分析。rcu相关的实现代码在librte_rcu目录中,其中的api都是无锁线程安全的“This is implemented as a lock-free function. It is multi-thread safe.”


RCU(Read-Copy Update)是数据同步的一种方式,在当前的Linux内核中发挥着重要的作用。RCU主要针对的数据对象是链表,目的是提高遍历读取数据的效率,为了达到目的使用RCU机制读取数据的时候不对链表进行耗时的加锁操作。这样在同一时间可以有多个线程同时读取该链表,并且允许一个线程对链表进行修改。RCU适用于需要频繁的读取数据,而相应修改数据并不多的情景。


对于dpdk rcu的使用方式可以参考下test/test_rcu_qsbr,以多读一写为情况,这里贴下注释及基本数据结构:

  8 /**
  9  * @file
 10  * RTE Quiescent State Based Reclamation (QSBR)
 11  *
 12  * Quiescent State (QS) is any point in the thread execution
 13  * where the thread does not hold a reference to a data structure
 14  * in shared memory. While using lock-less data structures, the writer
 15  * can safely free memory once all the reader threads have entered
 16  * quiescent state.
 17  *
 18  * This library provides the ability for the readers to report quiescent
 19  * state and for the writers to identify when all the readers have
 20  * entered quiescent state.
 21  */

 72 /* Worker thread counter */
 73 struct rte_rcu_qsbr_cnt {
 74     uint64_t cnt;
 75     /**< Quiescent state counter. Value 0 indicates the thread is offline
 76      *   64b counter is used to avoid adding more code to address
 77      *   counter overflow. Changing this to 32b would require additional
 78      *   changes to various APIs.
 79      */
 82 } __rte_cache_aligned;

 88 /* RTE Quiescent State variable structure.
 89  * This structure has two elements that vary in size based on the
 90  * 'max_threads' parameter.
 91  * 1) Quiescent state counter array
 92  * 2) Register thread ID array
 93  */
 94 struct rte_rcu_qsbr {
 95     uint64_t token __rte_cache_aligned;
 96     /**< Counter to allow for multiple concurrent quiescent state queries */
 97     uint64_t acked_token;
 98     /**< Least token acked by all the threads in the last call to
 99      *   rte_rcu_qsbr_check API.
100      */
102     uint32_t num_elems __rte_cache_aligned;
103     /**< Number of elements in the thread ID array */
104     uint32_t num_threads;
105     /**< Number of threads currently using this QS variable */
106     uint32_t max_threads;
107     /**< Maximum number of threads using this QS variable */
109     struct rte_rcu_qsbr_cnt qsbr_cnt[0] __rte_cache_aligned;
110     /**< Quiescent state counter array of 'max_threads' elements */
112     /**< Registered thread IDs are stored in a bitmap array,
113      *   after the quiescent state counter array.
114      */
115 } __rte_cache_aligned;


 51 int
 52 rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads)
 53 {
 54     size_t sz;
 56     if (v == NULL) {
 59         rte_errno = EINVAL;
 61         return 1;
 62     }
 64     sz = rte_rcu_qsbr_get_memsize(max_threads);
 65     if (sz == 1)
 66         return 1;
 68     /* Set all the threads to offline */
 69     memset(v, 0, sz);
 70     v->max_threads = max_threads;
 71     v->num_elems = RTE_ALIGN_MUL_CEIL(max_threads,
 72             __RTE_QSBR_THRID_ARRAY_ELM_SIZE) /
 73             __RTE_QSBR_THRID_ARRAY_ELM_SIZE;
 74     v->token = __RTE_QSBR_CNT_INIT;
 75     v->acked_token = __RTE_QSBR_CNT_INIT - 1;
 77     return 0;
 78 }

 25 size_t
 26 rte_rcu_qsbr_get_memsize(uint32_t max_threads)
 27 {   
 28     size_t sz;
 30     if (max_threads == 0) {
 34         rte_errno = EINVAL;     
 36         return 1;
 37     }
 39     sz = sizeof(struct rte_rcu_qsbr);
 41     /* Add the size of quiescent state counter array */
 42     sz += sizeof(struct rte_rcu_qsbr_cnt) * max_threads;
 44     /* Add the size of the registered thread ID bitmap array */
 45     sz += __RTE_QSBR_THRID_ARRAY_SIZE(max_threads);
 47     return sz;
 48 }

以上会开辟struct rte_rcu_qsbr本身的空间及struct rte_rcu_qsbr尾部的max_threads个struct rte_rcu_qsbr_cnt空间,和需要多少字节的bitmap空间,每个线程占一个struct rte_rcu_qsbr_cnt。初始token为1,类似数据版本号。

后面是线程的注册和取消,主要实现是根据struct rte_rcu_qsbr结构首地址,获取thread_id对应的bitmap空间所在的uint64_t:

 83 int
 84 rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id)
 85 {
 86     unsigned int i, id, success;
 87     uint64_t old_bmap, new_bmap;
100     id = thread_id & __RTE_QSBR_THRID_MASK;
101     i = thread_id >> __RTE_QSBR_THRID_INDEX_SHIFT;
103     /* Make sure that the counter for registered threads does not
104      * go out of sync. Hence, additional checks are required.
105      */
106     /* Check if the thread is already registered */
107     old_bmap = __atomic_load_n(__RTE_QSBR_THRID_ARRAY_ELM(v, i),
108                     __ATOMIC_RELAXED);
109     if (old_bmap & 1UL << id)
110         return 0;
112     do {
113         new_bmap = old_bmap | (1UL << id);
114         success = __atomic_compare_exchange(
115                     __RTE_QSBR_THRID_ARRAY_ELM(v, i),
116                     &old_bmap, &new_bmap, 0,
117                     __ATOMIC_RELEASE, __ATOMIC_RELAXED);
119         if (success)
120             __atomic_fetch_add(&v->num_threads,
121                         1, __ATOMIC_RELAXED);
122         else if (old_bmap & (1UL << id))
123             /* Someone else registered this thread.
124              * Counter should not be incremented.
125              */
126             return 0;
127     } while (success == 0);
129     return 0;
130 }

#define __RTE_QSBR_THRID_ARRAY_ELM(v, i) ((uint64_t *) \
    ((struct rte_rcu_qsbr_cnt *)(v + 1) + v->max_threads) + i)

rte_rcu_qsbr_thread_unregister跟上面差不多,这里换成new_bmap = old_bmap & ~(1UL << id);


 634     do {
 635         rte_rcu_qsbr_thread_register(temp, lcore_id);
 636         rte_rcu_qsbr_thread_online(temp, lcore_id);
 637         for (i = 0; i < TOTAL_ENTRY; i++) {
 638             rte_rcu_qsbr_lock(temp, lcore_id);
 639             if (rte_hash_lookup_data(hash, keys+i,
 640                     (void **)&pdata) != -ENOENT) {
 641                 pdata[lcore_id] = 0;
 642                 while (pdata[lcore_id] < COUNTER_VALUE)
 643                     pdata[lcore_id]++;
 644             }
 645             rte_rcu_qsbr_unlock(temp, lcore_id);
 646         }
 647         /* Update quiescent state counter */
 648         rte_rcu_qsbr_quiescent(temp, lcore_id);
 649         rte_rcu_qsbr_thread_offline(temp, lcore_id);
 650         rte_rcu_qsbr_thread_unregister(temp, lcore_id);
 651     } while (!writer_done);
206 /**
207  * @warning
216  * Any registered reader thread that wants to report its quiescent state must
217  * call this API before calling rte_rcu_qsbr_quiescent. This can be called
218  * during initialization or as part of the packet processing loop.
219  *
220  * The reader thread must call rte_rcu_qsbr_thread_offline API, before
221  * calling any functions that block, to ensure that rte_rcu_qsbr_check
222  * API does not wait indefinitely for the reader thread to update its QS.
223  *
224  * The reader thread must call rte_rcu_thread_online API, after the blocking
225  * function call returns, to ensure that rte_rcu_qsbr_check API
226  * waits for the reader thread to update its quiescent state.
233  */

235 static __rte_always_inline void
236 rte_rcu_qsbr_thread_online(struct rte_rcu_qsbr *v, unsigned int thread_id)
237 {
238     uint64_t t;
240     RTE_ASSERT(v != NULL && thread_id < v->max_threads);
245     /* Copy the current value of token.
246      * The fence at the end of the function will ensure that
247      * the following will not move down after the load of any shared
248      * data structure.
249      */
250     t = __atomic_load_n(&v->token, __ATOMIC_RELAXED);
252     /* __atomic_store_n(cnt, __ATOMIC_RELAXED) is used to ensure
253      * 'cnt' (64b) is accessed atomically.
254      */
255     __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
256         t, __ATOMIC_RELAXED);
258     /* The subsequent load of the data structure should not
259      * move above the store. Hence a store-load barrier
260      * is required.
261      * If the load of the data structure moves above the store,
262      * writer might not see that the reader is online, even though
263      * the reader is referencing the shared data structure.
264      */
265 #ifdef RTE_ARCH_X86_64
266     /* rte_smp_mb() for x86 is lighter */
267     rte_smp_mb();
268 #else
269     __atomic_thread_fence(__ATOMIC_SEQ_CST);
270 #endif
271 }

273 /**
274  * @warning
283  * This can be called during initialization or as part of the packet
284  * processing loop.
285  *
286  * The reader thread must call rte_rcu_qsbr_thread_offline API, before
287  * calling any functions that block, to ensure that rte_rcu_qsbr_check
288  * API does not wait indefinitely for the reader thread to update its QS.
289  *
295  */
297 static __rte_always_inline void
298 rte_rcu_qsbr_thread_offline(struct rte_rcu_qsbr *v, unsigned int thread_id)
299 {
300     RTE_ASSERT(v != NULL && thread_id < v->max_threads);
305     /* The reader can go offline only after the load of the
306      * data structure is completed. i.e. any load of the
307      * data strcture can not move after this store.
308      */
310     __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
312 }

以上registered reader thread在操作某块共享数据时,会调用rte_rcu_qsbr_thread_online,此时会读取共享数据的token,并设置到v->qsbr_cnt[thread_id].cnt中,这里使用C11中的原子指令,注释贴上是方便理解为何要这么使用。rte_rcu_qsbr_thread_offline设置v->qsbr_cnt[thread_id].cnt为0。rte_rcu_qsbr_lock/rte_rcu_qsbr_unlock是debug相关,不分析。

442 static __rte_always_inline void
443 rte_rcu_qsbr_quiescent(struct rte_rcu_qsbr *v, unsigned int thread_id)
444 {
445     uint64_t t;
447     RTE_ASSERT(v != NULL && thread_id < v->max_threads);
452     /* Acquire the changes to the shared data structure released
453      * by rte_rcu_qsbr_start.
454      * Later loads of the shared data structure should not move
455      * above this load. Hence, use load-acquire.
456      */
457     t = __atomic_load_n(&v->token, __ATOMIC_ACQUIRE);
459     /* Check if there are updates available from the writer.
460      * Inform the writer that updates are visible to this reader.
461      * Prior loads of the shared data structure should not move
462      * beyond this store. Hence use store-release.
463      */
464     if (t != __atomic_load_n(&v->qsbr_cnt[thread_id].cnt, __ATOMIC_RELAXED))
465         __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
466                      t, __ATOMIC_RELEASE);
470 }

这里如果没有465行的判断会如何,即reader thread线程访问完共享数据后,当前的cnt不等于token的,会导致什么问题?可以多思索下。

当writer thread要更新共享数据时,比如这里只有一个writer thread,更新多次:

 788     /* Delete element from the shared data structure */
 789     pos[0] = rte_hash_del_key(h[0], keys + 0);
 790     if (pos[0] < 0) {
 792         goto error;
 793     }
 794     /* Start the quiescent state query process */
 795     token[0] = rte_rcu_qsbr_start(t[0]);
 797     /* Delete element from the shared data structure */
 798     pos[1] = rte_hash_del_key(h[0], keys + 3);
 799     if (pos[1] < 0) {
 801         goto error;
 802     }
 803     /* Start the quiescent state query process */
 804     token[1] = rte_rcu_qsbr_start(t[0]);

 815     /* Check the quiescent state status */
 816     rte_rcu_qsbr_check(t[0], token[0], true);
 817     for (i = 0; i < 4; i++) {
 818         c = hash_data[0][0][enabled_core_ids[i]];
 819         if (c != COUNTER_VALUE && c != 0) {
 822             goto error;
 823         }
 824     }
 826     if (rte_hash_free_key_with_position(h[0], pos[0]) < 0) {
 828         goto error;
 829     }
 830     rte_free(hash_data[0][0]);
 831     hash_data[0][0] = NULL;

 833     /* Check the quiescent state status */
 834     rte_rcu_qsbr_check(t[0], token[1], true);
 835     for (i = 0; i < 4; i++) {
 836         c = hash_data[0][3][enabled_core_ids[i]];
 837         if (c != COUNTER_VALUE && c != 0) {
 840             goto error;
 841         }
 842     }
 844     if (rte_hash_free_key_with_position(h[0], pos[1]) < 0) {
 846         goto error;
 847     }
 848     rte_free(hash_data[0][3]);
 849     hash_data[0][3] = NULL;
409 static __rte_always_inline uint64_t
410 rte_rcu_qsbr_start(struct rte_rcu_qsbr *v)
411 {
412     uint64_t t;
414     RTE_ASSERT(v != NULL);
416     /* Release the changes to the shared data structure.
417      * This store release will ensure that changes to any data
418      * structure are visible to the workers before the token
419      * update is visible.
420      */
421     t = __atomic_add_fetch(&v->token, 1, __ATOMIC_RELEASE);
423     return t;
424 }

有数据更新时,自增token。如注释,当reader thread看到最新token时,能看见shared data structure的更新。

598 /**
599  * @warning
608  * If this API is called with 'wait' set to true, the following
609  * factors must be considered:
610  *
611  * 1) If the calling thread is also reporting the status on the
612  * same QS variable, it must update the quiescent state status, before
613  * calling this API.
614  *
615  * 2) In addition, while calling from multiple threads, only
616  * one of those threads can be reporting the quiescent state status
617  * on a given QS variable.
631  */
633 static __rte_always_inline int
634 rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
635 {
636     RTE_ASSERT(v != NULL);
638     /* Check if all the readers have already acknowledged this token */
639     if (likely(t <= v->acked_token))
640         return 1;
642     if (likely(v->num_threads == v->max_threads))
643         return __rte_rcu_qsbr_check_all(v, t, wait);
644     else
645         return __rte_rcu_qsbr_check_selective(v, t, wait);
646 }

如果token<= acked_tokenf直接返回,这里假设并没有所有线程都register:

472 /* Check the quiescent state counter for registered threads only, assuming
473  * that not all threads have registered.
474  */
475 static __rte_always_inline int
476 __rte_rcu_qsbr_check_selective(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
477 {
478     uint32_t i, j, id;
479     uint64_t bmap;
480     uint64_t c;
481     uint64_t *reg_thread_id;
482     uint64_t acked_token = __RTE_QSBR_CNT_MAX;
484     for (i = 0, reg_thread_id = __RTE_QSBR_THRID_ARRAY_ELM(v, 0);
485         i < v->num_elems;
486         i++, reg_thread_id++) {
487         /* Load the current registered thread bit map before
488          * loading the reader thread quiescent state counters.
489          */
490         bmap = __atomic_load_n(reg_thread_id, __ATOMIC_ACQUIRE);
491         id = i << __RTE_QSBR_THRID_INDEX_SHIFT;
493         while (bmap) {
494             j = __builtin_ctzl(bmap);
498             c = __atomic_load_n(
499                     &v->qsbr_cnt[id + j].cnt,
500                     __ATOMIC_ACQUIRE);
505             /* Counter is not checked for wrap-around condition
506              * as it is a 64b counter.
507              */
508             if (unlikely(c !=
509                 __RTE_QSBR_CNT_THR_OFFLINE && c < t)) {
510                 /* This thread is not in quiescent state */
511                 if (!wait)
512                     return 0;
514                 rte_pause();
515                 /* This thread might have unregistered.
516                  * Re-read the bitmap.
517                  */
518                 bmap = __atomic_load_n(reg_thread_id,
519                         __ATOMIC_ACQUIRE);
521                 continue;
522             }
524             /* This thread is in quiescent state. Use the counter
525              * to find the least acknowledged token among all the
526              * readers.
527              */
528             if (c != __RTE_QSBR_CNT_THR_OFFLINE && acked_token > c)
529                 acked_token = c;
531             bmap &= ~(1UL << j);
532         }
533     }
535     /* All readers are checked, update least acknowledged token.
536      * There might be multiple writers trying to update this. There is
537      * no need to update this very accurately using compare-and-swap.
538      */
539     if (acked_token != __RTE_QSBR_CNT_MAX)
540         __atomic_store_n(&v->acked_token, acked_token,
541             __ATOMIC_RELAXED);
543     return 1;
544 }

以上会遍历bitmap中已注册的thread,如果线程的c != __RTE_QSBR_CNT_THR_OFFLINE && c < t则表示有其他线程在活动,这里则会pause一会,在arm64中是 asm volatile("yield" ::: "memory")[yield指令用来告知硬件系统,本cpu上执行的指令是polling操作,没有那么急迫,如果有任何的资源冲突,本cpu可以让出控制权。],x86是_mm_pause。大致是:PAUSE指令给处理器提了个醒:这段代码序列是个循环等待。处理器利用这个提示可以避免在大多数情况下的内存顺序违规,这将大幅提升性能。因为这个原因,所以推荐在循环等待中使用PAUSE指令。


对于读多写少的情况,若每次读或写加锁,在有竞争的情况下会有性能影响。早期实现可能是加读锁,这样读之间无竞争,修改时先拷贝一份或者新的数据,然后加写锁去修改老数据。但还是觉得锁有性能,这里使用原子和内存屏障相关的原语,使用版本号去控制,每个reader thread获取数据的时候,使用数据的版本号更新自己的,这样writer thead更新时,需要等reader thread先读完数据后,这里需要遍历检查所有的reader thread已有的版本号,若处于offline状态或版本号最新,这样可以更新数据,这样再更新数据版本号。


  • DPDK rcu源码分析

    这篇分析下dpdk中rcu的实现,早期关注的版本中并未有此实现,另外我发现19.11版本中的cuckoo hash...

  • DPDK igb_uio驱动分析

    本文整理下之前的学习笔记,基于DPDK17.11版本源码分析。主要分析一下igb_uio驱动源码。 总线-设备-驱...

  • DPDK 收发包流程

    本文整理下之前的学习笔记,基于DPDK17.11版本源码,主要分析一下收发包流程。 使用DPDK的APP收发报文流...

  • DPDK stack源码分析


  • DPDK探测设备并初始化


  • DPDK 中断处理流程


  • DPDK 无锁ring

    本文整理下之前的学习笔记,基于DPDK17.11版本源码,主要分析无锁队列ring的实现。 rte_ring_ta...

  • DPDK Slab Allocator源码分析(上)


  • DPDK 内存管理

    本文整理下之前的学习笔记,基于DPDK17.11版本源码分析。主要分析一下内存管理部分代码。 概述 先看一下下面的...

  • DPDK编程指南(翻译)( 三十)

    30. 源码组织 本节介绍DPDK框架中的源码组织结构。 30.1. Makefiles 和 Config 注意:...


      本文标题:DPDK rcu源码分析
