这篇分析下dpdk中rcu的实现,早期关注的版本中并未有此实现,另外我发现19.11版本中的cuckoo hash相比之前复杂些了,这里暂时不再分析。rcu相关的实现代码在librte_rcu目录中,其中的api都是无锁线程安全的“This is implemented as a lock-free function. It is multi-thread safe.”
这里简单介绍下rcu是什么,用在什么场景下,以及为什么会有这种实现的,其他更多资料可以搜索。这里复制下已有的解释:
RCU(Read-Copy Update)是数据同步的一种方式,在当前的Linux内核中发挥着重要的作用。RCU主要针对的数据对象是链表,目的是提高遍历读取数据的效率,为了达到目的使用RCU机制读取数据的时候不对链表进行耗时的加锁操作。这样在同一时间可以有多个线程同时读取该链表,并且允许一个线程对链表进行修改。RCU适用于需要频繁的读取数据,而相应修改数据并不多的情景。
这里有大概实现思路LINUX中的RCU机制的分析,实现思路差不多,对读写锁的一种优化,应用在读多写少的情况下。对于不同的应用场景,可能有更好的实现方式,不容易出错,因为无锁实现难度较高且容易出bug,难以调试。
对于dpdk rcu的使用方式可以参考下test/test_rcu_qsbr,以多读一写为情况,这里贴下注释及基本数据结构:
8 /**
9 * @file
10 * RTE Quiescent State Based Reclamation (QSBR)
11 *
12 * Quiescent State (QS) is any point in the thread execution
13 * where the thread does not hold a reference to a data structure
14 * in shared memory. While using lock-less data structures, the writer
15 * can safely free memory once all the reader threads have entered
16 * quiescent state.
17 *
18 * This library provides the ability for the readers to report quiescent
19 * state and for the writers to identify when all the readers have
20 * entered quiescent state.
21 */
72 /* Worker thread counter */
73 struct rte_rcu_qsbr_cnt {
74 uint64_t cnt;
75 /**< Quiescent state counter. Value 0 indicates the thread is offline
76 * 64b counter is used to avoid adding more code to address
77 * counter overflow. Changing this to 32b would require additional
78 * changes to various APIs.
79 */
82 } __rte_cache_aligned;
88 /* RTE Quiescent State variable structure.
89 * This structure has two elements that vary in size based on the
90 * 'max_threads' parameter.
91 * 1) Quiescent state counter array
92 * 2) Register thread ID array
93 */
94 struct rte_rcu_qsbr {
95 uint64_t token __rte_cache_aligned;
96 /**< Counter to allow for multiple concurrent quiescent state queries */
97 uint64_t acked_token;
98 /**< Least token acked by all the threads in the last call to
99 * rte_rcu_qsbr_check API.
100 */
101
102 uint32_t num_elems __rte_cache_aligned;
103 /**< Number of elements in the thread ID array */
104 uint32_t num_threads;
105 /**< Number of threads currently using this QS variable */
106 uint32_t max_threads;
107 /**< Maximum number of threads using this QS variable */
108
109 struct rte_rcu_qsbr_cnt qsbr_cnt[0] __rte_cache_aligned;
110 /**< Quiescent state counter array of 'max_threads' elements */
111
112 /**< Registered thread IDs are stored in a bitmap array,
113 * after the quiescent state counter array.
114 */
115 } __rte_cache_aligned;
创建及初始化:
51 int
52 rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads)
53 {
54 size_t sz;
55
56 if (v == NULL) {
59 rte_errno = EINVAL;
61 return 1;
62 }
63
64 sz = rte_rcu_qsbr_get_memsize(max_threads);
65 if (sz == 1)
66 return 1;
67
68 /* Set all the threads to offline */
69 memset(v, 0, sz);
70 v->max_threads = max_threads;
71 v->num_elems = RTE_ALIGN_MUL_CEIL(max_threads,
72 __RTE_QSBR_THRID_ARRAY_ELM_SIZE) /
73 __RTE_QSBR_THRID_ARRAY_ELM_SIZE;
74 v->token = __RTE_QSBR_CNT_INIT;
75 v->acked_token = __RTE_QSBR_CNT_INIT - 1;
76
77 return 0;
78 }
25 size_t
26 rte_rcu_qsbr_get_memsize(uint32_t max_threads)
27 {
28 size_t sz;
29
30 if (max_threads == 0) {
34 rte_errno = EINVAL;
36 return 1;
37 }
38
39 sz = sizeof(struct rte_rcu_qsbr);
40
41 /* Add the size of quiescent state counter array */
42 sz += sizeof(struct rte_rcu_qsbr_cnt) * max_threads;
43
44 /* Add the size of the registered thread ID bitmap array */
45 sz += __RTE_QSBR_THRID_ARRAY_SIZE(max_threads);
46
47 return sz;
48 }
以上会开辟struct rte_rcu_qsbr本身的空间及struct rte_rcu_qsbr尾部的max_threads个struct rte_rcu_qsbr_cnt空间,和需要多少字节的bitmap空间,每个线程占一个struct rte_rcu_qsbr_cnt。初始token为1,类似数据版本号。
后面是线程的注册和取消,主要实现是根据struct rte_rcu_qsbr结构首地址,获取thread_id对应的bitmap空间所在的uint64_t:
83 int
84 rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id)
85 {
86 unsigned int i, id, success;
87 uint64_t old_bmap, new_bmap;
88
100 id = thread_id & __RTE_QSBR_THRID_MASK;
101 i = thread_id >> __RTE_QSBR_THRID_INDEX_SHIFT;
103 /* Make sure that the counter for registered threads does not
104 * go out of sync. Hence, additional checks are required.
105 */
106 /* Check if the thread is already registered */
107 old_bmap = __atomic_load_n(__RTE_QSBR_THRID_ARRAY_ELM(v, i),
108 __ATOMIC_RELAXED);
109 if (old_bmap & 1UL << id)
110 return 0;
111
112 do {
113 new_bmap = old_bmap | (1UL << id);
114 success = __atomic_compare_exchange(
115 __RTE_QSBR_THRID_ARRAY_ELM(v, i),
116 &old_bmap, &new_bmap, 0,
117 __ATOMIC_RELEASE, __ATOMIC_RELAXED);
118
119 if (success)
120 __atomic_fetch_add(&v->num_threads,
121 1, __ATOMIC_RELAXED);
122 else if (old_bmap & (1UL << id))
123 /* Someone else registered this thread.
124 * Counter should not be incremented.
125 */
126 return 0;
127 } while (success == 0);
128
129 return 0;
130 }
#define __RTE_QSBR_THRID_ARRAY_ELM(v, i) ((uint64_t *) \
((struct rte_rcu_qsbr_cnt *)(v + 1) + v->max_threads) + i)
rte_rcu_qsbr_thread_unregister跟上面差不多,这里换成new_bmap = old_bmap & ~(1UL << id);
这里以例子来说明基本使用方式,这里不讨论其他的:
634 do {
635 rte_rcu_qsbr_thread_register(temp, lcore_id);
636 rte_rcu_qsbr_thread_online(temp, lcore_id);
637 for (i = 0; i < TOTAL_ENTRY; i++) {
638 rte_rcu_qsbr_lock(temp, lcore_id);
639 if (rte_hash_lookup_data(hash, keys+i,
640 (void **)&pdata) != -ENOENT) {
641 pdata[lcore_id] = 0;
642 while (pdata[lcore_id] < COUNTER_VALUE)
643 pdata[lcore_id]++;
644 }
645 rte_rcu_qsbr_unlock(temp, lcore_id);
646 }
647 /* Update quiescent state counter */
648 rte_rcu_qsbr_quiescent(temp, lcore_id);
649 rte_rcu_qsbr_thread_offline(temp, lcore_id);
650 rte_rcu_qsbr_thread_unregister(temp, lcore_id);
651 } while (!writer_done);
206 /**
207 * @warning
216 * Any registered reader thread that wants to report its quiescent state must
217 * call this API before calling rte_rcu_qsbr_quiescent. This can be called
218 * during initialization or as part of the packet processing loop.
219 *
220 * The reader thread must call rte_rcu_qsbr_thread_offline API, before
221 * calling any functions that block, to ensure that rte_rcu_qsbr_check
222 * API does not wait indefinitely for the reader thread to update its QS.
223 *
224 * The reader thread must call rte_rcu_thread_online API, after the blocking
225 * function call returns, to ensure that rte_rcu_qsbr_check API
226 * waits for the reader thread to update its quiescent state.
233 */
235 static __rte_always_inline void
236 rte_rcu_qsbr_thread_online(struct rte_rcu_qsbr *v, unsigned int thread_id)
237 {
238 uint64_t t;
239
240 RTE_ASSERT(v != NULL && thread_id < v->max_threads);
244
245 /* Copy the current value of token.
246 * The fence at the end of the function will ensure that
247 * the following will not move down after the load of any shared
248 * data structure.
249 */
250 t = __atomic_load_n(&v->token, __ATOMIC_RELAXED);
251
252 /* __atomic_store_n(cnt, __ATOMIC_RELAXED) is used to ensure
253 * 'cnt' (64b) is accessed atomically.
254 */
255 __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
256 t, __ATOMIC_RELAXED);
257
258 /* The subsequent load of the data structure should not
259 * move above the store. Hence a store-load barrier
260 * is required.
261 * If the load of the data structure moves above the store,
262 * writer might not see that the reader is online, even though
263 * the reader is referencing the shared data structure.
264 */
265 #ifdef RTE_ARCH_X86_64
266 /* rte_smp_mb() for x86 is lighter */
267 rte_smp_mb();
268 #else
269 __atomic_thread_fence(__ATOMIC_SEQ_CST);
270 #endif
271 }
273 /**
274 * @warning
283 * This can be called during initialization or as part of the packet
284 * processing loop.
285 *
286 * The reader thread must call rte_rcu_qsbr_thread_offline API, before
287 * calling any functions that block, to ensure that rte_rcu_qsbr_check
288 * API does not wait indefinitely for the reader thread to update its QS.
289 *
295 */
297 static __rte_always_inline void
298 rte_rcu_qsbr_thread_offline(struct rte_rcu_qsbr *v, unsigned int thread_id)
299 {
300 RTE_ASSERT(v != NULL && thread_id < v->max_threads);
304
305 /* The reader can go offline only after the load of the
306 * data structure is completed. i.e. any load of the
307 * data strcture can not move after this store.
308 */
309
310 __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
311 __RTE_QSBR_CNT_THR_OFFLINE, __ATOMIC_RELEASE);
312 }
以上registered reader thread在操作某块共享数据时,会调用rte_rcu_qsbr_thread_online,此时会读取共享数据的token,并设置到v->qsbr_cnt[thread_id].cnt中,这里使用C11中的原子指令,注释贴上是方便理解为何要这么使用。rte_rcu_qsbr_thread_offline设置v->qsbr_cnt[thread_id].cnt为0。rte_rcu_qsbr_lock/rte_rcu_qsbr_unlock是debug相关,不分析。
442 static __rte_always_inline void
443 rte_rcu_qsbr_quiescent(struct rte_rcu_qsbr *v, unsigned int thread_id)
444 {
445 uint64_t t;
446
447 RTE_ASSERT(v != NULL && thread_id < v->max_threads);
451
452 /* Acquire the changes to the shared data structure released
453 * by rte_rcu_qsbr_start.
454 * Later loads of the shared data structure should not move
455 * above this load. Hence, use load-acquire.
456 */
457 t = __atomic_load_n(&v->token, __ATOMIC_ACQUIRE);
458
459 /* Check if there are updates available from the writer.
460 * Inform the writer that updates are visible to this reader.
461 * Prior loads of the shared data structure should not move
462 * beyond this store. Hence use store-release.
463 */
464 if (t != __atomic_load_n(&v->qsbr_cnt[thread_id].cnt, __ATOMIC_RELAXED))
465 __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
466 t, __ATOMIC_RELEASE);
470 }
这里如果没有465行的判断会如何,即reader thread线程访问完共享数据后,当前的cnt不等于token的,会导致什么问题?可以多思索下。
当writer thread要更新共享数据时,比如这里只有一个writer thread,更新多次:
788 /* Delete element from the shared data structure */
789 pos[0] = rte_hash_del_key(h[0], keys + 0);
790 if (pos[0] < 0) {
792 goto error;
793 }
794 /* Start the quiescent state query process */
795 token[0] = rte_rcu_qsbr_start(t[0]);
796
797 /* Delete element from the shared data structure */
798 pos[1] = rte_hash_del_key(h[0], keys + 3);
799 if (pos[1] < 0) {
801 goto error;
802 }
803 /* Start the quiescent state query process */
804 token[1] = rte_rcu_qsbr_start(t[0]);
815 /* Check the quiescent state status */
816 rte_rcu_qsbr_check(t[0], token[0], true);
817 for (i = 0; i < 4; i++) {
818 c = hash_data[0][0][enabled_core_ids[i]];
819 if (c != COUNTER_VALUE && c != 0) {
822 goto error;
823 }
824 }
825
826 if (rte_hash_free_key_with_position(h[0], pos[0]) < 0) {
828 goto error;
829 }
830 rte_free(hash_data[0][0]);
831 hash_data[0][0] = NULL;
833 /* Check the quiescent state status */
834 rte_rcu_qsbr_check(t[0], token[1], true);
835 for (i = 0; i < 4; i++) {
836 c = hash_data[0][3][enabled_core_ids[i]];
837 if (c != COUNTER_VALUE && c != 0) {
840 goto error;
841 }
842 }
843
844 if (rte_hash_free_key_with_position(h[0], pos[1]) < 0) {
846 goto error;
847 }
848 rte_free(hash_data[0][3]);
849 hash_data[0][3] = NULL;
409 static __rte_always_inline uint64_t
410 rte_rcu_qsbr_start(struct rte_rcu_qsbr *v)
411 {
412 uint64_t t;
413
414 RTE_ASSERT(v != NULL);
415
416 /* Release the changes to the shared data structure.
417 * This store release will ensure that changes to any data
418 * structure are visible to the workers before the token
419 * update is visible.
420 */
421 t = __atomic_add_fetch(&v->token, 1, __ATOMIC_RELEASE);
422
423 return t;
424 }
有数据更新时,自增token。如注释,当reader thread看到最新token时,能看见shared data structure的更新。
598 /**
599 * @warning
608 * If this API is called with 'wait' set to true, the following
609 * factors must be considered:
610 *
611 * 1) If the calling thread is also reporting the status on the
612 * same QS variable, it must update the quiescent state status, before
613 * calling this API.
614 *
615 * 2) In addition, while calling from multiple threads, only
616 * one of those threads can be reporting the quiescent state status
617 * on a given QS variable.
631 */
633 static __rte_always_inline int
634 rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
635 {
636 RTE_ASSERT(v != NULL);
637
638 /* Check if all the readers have already acknowledged this token */
639 if (likely(t <= v->acked_token))
640 return 1;
641
642 if (likely(v->num_threads == v->max_threads))
643 return __rte_rcu_qsbr_check_all(v, t, wait);
644 else
645 return __rte_rcu_qsbr_check_selective(v, t, wait);
646 }
如果token<= acked_tokenf直接返回,这里假设并没有所有线程都register:
472 /* Check the quiescent state counter for registered threads only, assuming
473 * that not all threads have registered.
474 */
475 static __rte_always_inline int
476 __rte_rcu_qsbr_check_selective(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
477 {
478 uint32_t i, j, id;
479 uint64_t bmap;
480 uint64_t c;
481 uint64_t *reg_thread_id;
482 uint64_t acked_token = __RTE_QSBR_CNT_MAX;
483
484 for (i = 0, reg_thread_id = __RTE_QSBR_THRID_ARRAY_ELM(v, 0);
485 i < v->num_elems;
486 i++, reg_thread_id++) {
487 /* Load the current registered thread bit map before
488 * loading the reader thread quiescent state counters.
489 */
490 bmap = __atomic_load_n(reg_thread_id, __ATOMIC_ACQUIRE);
491 id = i << __RTE_QSBR_THRID_INDEX_SHIFT;
493 while (bmap) {
494 j = __builtin_ctzl(bmap);
498 c = __atomic_load_n(
499 &v->qsbr_cnt[id + j].cnt,
500 __ATOMIC_ACQUIRE);
504
505 /* Counter is not checked for wrap-around condition
506 * as it is a 64b counter.
507 */
508 if (unlikely(c !=
509 __RTE_QSBR_CNT_THR_OFFLINE && c < t)) {
510 /* This thread is not in quiescent state */
511 if (!wait)
512 return 0;
513
514 rte_pause();
515 /* This thread might have unregistered.
516 * Re-read the bitmap.
517 */
518 bmap = __atomic_load_n(reg_thread_id,
519 __ATOMIC_ACQUIRE);
520
521 continue;
522 }
523
524 /* This thread is in quiescent state. Use the counter
525 * to find the least acknowledged token among all the
526 * readers.
527 */
528 if (c != __RTE_QSBR_CNT_THR_OFFLINE && acked_token > c)
529 acked_token = c;
530
531 bmap &= ~(1UL << j);
532 }
533 }
534
535 /* All readers are checked, update least acknowledged token.
536 * There might be multiple writers trying to update this. There is
537 * no need to update this very accurately using compare-and-swap.
538 */
539 if (acked_token != __RTE_QSBR_CNT_MAX)
540 __atomic_store_n(&v->acked_token, acked_token,
541 __ATOMIC_RELAXED);
542
543 return 1;
544 }
以上会遍历bitmap中已注册的thread,如果线程的c != __RTE_QSBR_CNT_THR_OFFLINE && c < t则表示有其他线程在活动,这里则会pause一会,在arm64中是 asm volatile("yield" ::: "memory")[yield指令用来告知硬件系统,本cpu上执行的指令是polling操作,没有那么急迫,如果有任何的资源冲突,本cpu可以让出控制权。],x86是_mm_pause。大致是:PAUSE指令给处理器提了个醒:这段代码序列是个循环等待。处理器利用这个提示可以避免在大多数情况下的内存顺序违规,这将大幅提升性能。因为这个原因,所以推荐在循环等待中使用PAUSE指令。
在等的过程中,可能线程已经unregistered所以会再次加载一下。整个过程找到线程中最小的token,然后更新acked_token。
总结:
对于读多写少的情况,若每次读或写加锁,在有竞争的情况下会有性能影响。早期实现可能是加读锁,这样读之间无竞争,修改时先拷贝一份或者新的数据,然后加写锁去修改老数据。但还是觉得锁有性能,这里使用原子和内存屏障相关的原语,使用版本号去控制,每个reader thread获取数据的时候,使用数据的版本号更新自己的,这样writer thead更新时,需要等reader thread先读完数据后,这里需要遍历检查所有的reader thread已有的版本号,若处于offline状态或版本号最新,这样可以更新数据,这样再更新数据版本号。
网友评论