在之前分析过dpdk中的相关一些开源实现,然后当时的版本是16.07.2,现在最新的是19.11,代码方面还是有很大的区别,新增不少内容,不过这里不列举,有兴趣相关可以自己看下。这里把有变化的无锁环形队列在这里分析下,接着分析下stack实现,比较简单。这儿是早期的DPDK中无锁环形队列实现
因为早期的struct rte_ring中的struct prod和struct cons有几个数据成员是冗余的,完全可以提取出来作为struct rte_ring的成员:
64 /* structure to hold a pair of head/tail values and other metadata */
65 struct rte_ring_headtail {
66 volatile uint32_t head; /**< Prod/consumer head. */
67 volatile uint32_t tail; /**< Prod/consumer tail. */
68 uint32_t single; /**< True if single prod/cons */
69 };
81 struct rte_ring {
88 int flags; /**< Flags supplied at creation. */
91 uint32_t size; /**< Size of ring. */
92 uint32_t mask; /**< Mask (size-1) of ring. */
93 uint32_t capacity; /**< Usable size of ring */
94
95 char pad0 __rte_cache_aligned; /**< empty cache line */
96
97 /** Ring producer status. */
98 struct rte_ring_headtail prod __rte_cache_aligned;
99 char pad1 __rte_cache_aligned; /**< empty cache line */
100
101 /** Ring consumer status. */
102 struct rte_ring_headtail cons __rte_cache_aligned;
103 char pad2 __rte_cache_aligned; /**< empty cache line */
104 };
这里在创建的时候根据RING_F_EXACT_SZ来决定是否要调整在使用时的大小,在最坏情况会浪费一些存储空间“Worst case, if a power-of-2 size is requested, half the ring space will be wasted.”,所以在使用的过程中,还是需要认真分析下代码和api的说明:
96 if (flags & RING_F_EXACT_SZ) {
97 r->size = rte_align32pow2(count + 1);
98 r->mask = r->size - 1;
99 r->capacity = count;//实际只有count
100 } else {
101 if ((!POWEROF2(count)) || (count > RTE_RING_SZ_MASK)) {
105 return -EINVAL;
106 }
107 r->size = count;
108 r->mask = count - 1;
109 r->capacity = r->mask;
110 }
47 /* return the size of memory occupied by a ring */
48 ssize_t
49 rte_ring_get_memsize(unsigned count)
50 {
51 ssize_t sz;
52
53 /* count must be a power of 2 */
54 if ((!POWEROF2(count)) || (count > RTE_RING_SZ_MASK )) {
58 return -EINVAL;
59 }
60
61 sz = sizeof(struct rte_ring) + count * sizeof(void *);//实际存储空间在struct rte_ring末尾
62 sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
63 return sz;
64 }
返回ring中元素的个数:
696 static inline unsigned
697 rte_ring_count(const struct rte_ring *r)
698 {
699 uint32_t prod_tail = r->prod.tail;
700 uint32_t cons_tail = r->cons.tail;
701 uint32_t count = (prod_tail - cons_tail) & r->mask;
702 return (count > r->capacity) ? r->capacity : count;
703 }
因为都是无符号数,所以count一定是>=0的,不管prod_tail是在cons_tail后面,还是在前面,算出来的元素个数肯定是<= capacity的,但是因为有RING_F_EXACT_SZ的情况下,会直接返回capacity。
其他入队和出队等一些操作和原来的实现差不多思想,只不过这里在获取偏移量的时候多了一个屏障原语,在某些平台上效果不一样,感受一下,比如在__rte_ring_move_prod_head的时候:
65 do {
66 /* Reset n to the initial burst count */
67 n = max;
69 *old_head = r->prod.head;
70
71 /* add rmb barrier to avoid load/load reorder in weak
72 * memory model. It is noop on x86
73 */
74 rte_smp_rmb();
75
76 /*
77 * The subtraction is done between two unsigned 32bits value
78 * (the result is always modulo 32 bits even if we have
79 * *old_head > cons_tail). So 'free_entries' is always between 0
80 * and capacity (which is < size).
81 */
82 *free_entries = (capacity + r->cons.tail - *old_head);
64 *old_head = __atomic_load_n(&r->prod.head, __ATOMIC_RELAXED);
65 do {
66 /* Reset n to the initial burst count */
67 n = max;
69 /* Ensure the head is read before tail */
70 __atomic_thread_fence(__ATOMIC_ACQUIRE);
71
72 /* load-acquire synchronize with store-release of ht->tail
73 * in update_tail.
74 */
75 cons_tail = __atomic_load_n(&r->cons.tail,
76 __ATOMIC_ACQUIRE);
77
83 *free_entries = (capacity + cons_tail - *old_head);
stack
dpdk中的stack有两种,一种是加锁实现的一种是无锁:
74 /* The RTE stack structure contains the LIFO structure itself, plus metadata
75 * such as its name and memzone pointer.
76 */
77 struct rte_stack {
82 uint32_t capacity; /**< Usable size of the stack. */
83 uint32_t flags; /**< Flags supplied at creation. */
85 union {
86 struct rte_stack_lf stack_lf; /**< Lock-free LIFO structure. */
87 struct rte_stack_std stack_std; /**< LIFO structure. */
88 };
89 } __rte_cache_aligned;
其中加锁实现的比较简单:
68 struct rte_stack_std {
69 rte_spinlock_t lock; /**< LIFO lock */
70 uint32_t len; /**< LIFO len */
71 void *objs[]; /**< LIFO pointer table */
72 };
无锁的实现较复杂些,用list来组织:
36 struct rte_stack_lf_elem {
37 void *data; /**< Data pointer */
38 struct rte_stack_lf_elem *next; /**< Next pointer */
39 };
40
41 struct rte_stack_lf_head {
42 struct rte_stack_lf_elem *top; /**< Stack top */
43 uint64_t cnt; /**< Modification counter for avoiding ABA problem */
44 };
45
46 struct rte_stack_lf_list {
47 /** List head */
48 struct rte_stack_lf_head head __rte_aligned(16);
49 /** List len */
50 uint64_t len;
51 };
52
53 /* Structure containing two lock-free LIFO lists: the stack itself and a list
54 * of free linked-list elements.
55 */
56 struct rte_stack_lf {
57 /** LIFO list of elements */
58 struct rte_stack_lf_list used __rte_cache_aligned;
59 /** LIFO list of free elements */
60 struct rte_stack_lf_list free __rte_cache_aligned;
61 /** LIFO elements */
62 struct rte_stack_lf_elem elems[] __rte_cache_aligned;
63 };
这里直接贴上加锁的实现,因为代码比较简单:
23 static __rte_always_inline unsigned int
24 __rte_stack_std_push(struct rte_stack *s, void * const *obj_table,
25 unsigned int n)
26 {
27 struct rte_stack_std *stack = &s->stack_std;
28 unsigned int index;
29 void **cache_objs;
30
31 rte_spinlock_lock(&stack->lock);
32 cache_objs = &stack->objs[stack->len];
33
34 /* Is there sufficient space in the stack? */
35 if ((stack->len + n) > s->capacity) {
36 rte_spinlock_unlock(&stack->lock);
37 return 0;
38 }
39
40 /* Add elements back into the cache */
41 for (index = 0; index < n; ++index, obj_table++)
42 cache_objs[index] = *obj_table;
43
44 stack->len += n;
45
46 rte_spinlock_unlock(&stack->lock);
47 return n;
48 }
63 static __rte_always_inline unsigned int
64 __rte_stack_std_pop(struct rte_stack *s, void **obj_table, unsigned int n)
65 {
66 struct rte_stack_std *stack = &s->stack_std;
67 unsigned int index, len;
68 void **cache_objs;
69
70 rte_spinlock_lock(&stack->lock);
71
72 if (unlikely(n > stack->len)) {
73 rte_spinlock_unlock(&stack->lock);
74 return 0;
75 }
76
77 cache_objs = stack->objs;
78
79 for (index = 0, len = stack->len - 1; index < n;
80 ++index, len--, obj_table++)
81 *obj_table = cache_objs[len];
82
83 stack->len -= n;
84 rte_spinlock_unlock(&stack->lock);
85
86 return n;
87 }
push元素的时候,直接加spinlock并算出还有多少的空间,最后设置大小并解锁;pop元素,加spinlock并算出能否pop指定数量的元素,最后pop指定元素并减少大小并解锁;因为这里是要指定的n,如果要pop 5个但只有3个,这里并不pop 3个而是直接返回0个。
无锁的stack实现较复杂,先来看push元素的时候要做些什么:
30 __rte_experimental
31 static __rte_always_inline unsigned int
32 __rte_stack_lf_push(struct rte_stack *s,
33 void * const *obj_table,
34 unsigned int n)
35 {
36 struct rte_stack_lf_elem *tmp, *first, *last = NULL;
37 unsigned int i;
38
39 if (unlikely(n == 0))
40 return 0;
41
42 /* Pop n free elements */
43 first = __rte_stack_lf_pop_elems(&s->stack_lf.free, n, NULL, &last);
44 if (unlikely(first == NULL))
45 return 0;
46
47 /* Construct the list elements */
48 for (tmp = first, i = 0; i < n; i++, tmp = tmp->next)
49 tmp->data = obj_table[n - i - 1];
50
51 /* Push them to the used list */
52 __rte_stack_lf_push_elems(&s->stack_lf.used, first, last, n);
53
54 return n;
55 }
74 static __rte_always_inline struct rte_stack_lf_elem *
75 __rte_stack_lf_pop_elems(struct rte_stack_lf_list *list,
76 unsigned int num,
77 void **obj_table,
78 struct rte_stack_lf_elem **last)
79 {
80 struct rte_stack_lf_head old_head;
81 int success;
82
83 /* Reserve num elements, if available */
84 while (1) {
85 uint64_t len = rte_atomic64_read((rte_atomic64_t *)&list->len);
86
87 /* Does the list contain enough elements? */
88 if (unlikely(len < num))
89 return NULL;
90
91 if (rte_atomic64_cmpset((volatile uint64_t *)&list->len,
92 len, len - num))
93 break;
94 }
95
96 old_head = list->head;
98 /* Pop num elements */
99 do {
100 struct rte_stack_lf_head new_head;
101 struct rte_stack_lf_elem *tmp;
102 unsigned int i;
103
104 /* An acquire fence (or stronger) is needed for weak memory
105 * models to ensure the LF LIFO element reads are properly
106 * ordered with respect to the head pointer read.
107 */
108 rte_smp_mb();
109
110 rte_prefetch0(old_head.top);
111
112 tmp = old_head.top;
113
114 /* Traverse the list to find the new head. A next pointer will
115 * either point to another element or NULL; if a thread
116 * encounters a pointer that has already been popped, the CAS
117 * will fail.
118 */
119 for (i = 0; i < num && tmp != NULL; i++) {
120 rte_prefetch0(tmp->next);
121 if (obj_table)
122 obj_table[i] = tmp->data;
123 if (last)
124 *last = tmp;
125 tmp = tmp->next;
126 }
127
128 /* If NULL was encountered, the list was modified while
129 * traversing it. Retry.
130 */
131 if (i != num)
132 continue;
133
134 new_head.top = tmp;
135 new_head.cnt = old_head.cnt + 1;
136
137 /* old_head is updated on failure */
138 success = rte_atomic128_cmp_exchange(
139 (rte_int128_t *)&list->head,
140 (rte_int128_t *)&old_head,
141 (rte_int128_t *)&new_head,
142 1, __ATOMIC_RELEASE,
143 __ATOMIC_RELAXED);
144 } while (success == 0);
145
146 return old_head.top;
147 }
33 static __rte_always_inline void
34 __rte_stack_lf_push_elems(struct rte_stack_lf_list *list,
35 struct rte_stack_lf_elem *first,
36 struct rte_stack_lf_elem *last,
37 unsigned int num)
38 {
39 struct rte_stack_lf_head old_head;
40 int success;
41
42 old_head = list->head;
43
44 do {
45 struct rte_stack_lf_head new_head;
46
47 /* An acquire fence (or stronger) is needed for weak memory
48 * models to establish a synchronized-with relationship between
49 * the list->head load and store-release operations (as part of
50 * the rte_atomic128_cmp_exchange()).
51 */
52 rte_smp_mb();
53
54 /* Swing the top pointer to the first element in the list and
55 * make the last element point to the old top.
56 */
57 new_head.top = first;
58 new_head.cnt = old_head.cnt + 1;
59
60 last->next = old_head.top;
62 /* old_head is updated on failure */
63 success = rte_atomic128_cmp_exchange(
64 (rte_int128_t *)&list->head,
65 (rte_int128_t *)&old_head,
66 (rte_int128_t *)&new_head,
67 1, __ATOMIC_RELEASE,
68 __ATOMIC_RELAXED);
69 } while (success == 0);
70
71 rte_atomic64_add((rte_atomic64_t *)&list->len, num);
72 }
以上push元素时,过程是有些复杂,先从free中pop出可存放一定数量元素的空间:先原子判断len是否达到n,如果不够直接返回null,否则cas设置直到成功,当然可能过程会set失败则继续判断等,这里假设成功后进行后续操作;取出rte_stack_lf_elem并设置,此时会进行预取到cache各层级,并对cnt加1如注释说明中防止aba问题,最后cas设置free新的head。然后把元素依次设置tmp->data = obj_table[n - i - 1]。最后需要更新到used:这里设置新的head,并把老的top更新到last->next,进行cas设置新的head和len。
69 __rte_experimental
70 static __rte_always_inline unsigned int
71 __rte_stack_lf_pop(struct rte_stack *s, void **obj_table, unsigned int n)
72 {
73 struct rte_stack_lf_elem *first, *last = NULL;
74
75 if (unlikely(n == 0))
76 return 0;
77
78 /* Pop n used elements */
79 first = __rte_stack_lf_pop_elems(&s->stack_lf.used,
80 n, obj_table, &last);
81 if (unlikely(first == NULL))
82 return 0;
83
84 /* Push the list elements to the free list */
85 __rte_stack_lf_push_elems(&s->stack_lf.free, first, last, n);
86
87 return n;
88 }
从stack进行pop元素和push是相反的过程,具体代码一样。
11 static __rte_always_inline unsigned int
12 __rte_stack_lf_count(struct rte_stack *s)
13 {
14 /* stack_lf_push() and stack_lf_pop() do not update the list's contents
15 * and stack_lf->len atomically, which can cause the list to appear
16 * shorter than it actually is if this function is called while other
17 * threads are modifying the list.
18 *
19 * However, given the inherently approximate nature of the get_count
20 * callback -- even if the list and its size were updated atomically,
21 * the size could change between when get_count executes and when the
22 * value is returned to the caller -- this is acceptable.
23 *
24 * The stack_lf->len updates are placed such that the list may appear to
25 * have fewer elements than it does, but will never appear to have more
26 * elements. If the mempool is near-empty to the point that this is a
27 * concern, the user should consider increasing the mempool size.
28 */
29 return (unsigned int)rte_atomic64_read((rte_atomic64_t *)
30 &s->stack_lf.used.len);
31 }
由于这结构对于多线程同时push或pop或者push/pop同时进行,会出现各种操作序列,包括调用__rte_stack_lf_count时。这里以一个线程push,一个线程pop为例,其他情况同时push或pop时比较简单。而有线程在push或pop时,有另外线程在__rte_stack_lf_count时,因为更新同步有延迟,可能看到的并不一样,看__rte_stack_lf_count调用的具体场合,这里有兴趣自行分析各种情况。
其他的是创建和初始化:
7 void
8 rte_stack_lf_init(struct rte_stack *s, unsigned int count)
9 {
10 struct rte_stack_lf_elem *elems = s->stack_lf.elems;
11 unsigned int i;
12
13 for (i = 0; i < count; i++)
14 __rte_stack_lf_push_elems(&s->stack_lf.free,
15 &elems[i], &elems[i], 1);
16 }
17
18 ssize_t
19 rte_stack_lf_get_memsize(unsigned int count)
20 {
21 ssize_t sz = sizeof(struct rte_stack);
22
23 sz += RTE_CACHE_LINE_ROUNDUP(count * sizeof(struct rte_stack_lf_elem));
24
25 /* Add padding to avoid false sharing conflicts caused by
26 * next-line hardware prefetchers.
27 */
28 sz += 2 * RTE_CACHE_LINE_SIZE;
29
30 return sz;
31 }
由于过年在家因为病毒的复杂原因基本不出去,可能有更多的时间学些想学的,希望早点恢复正常。
网友评论