美文网首页
DPDK stack源码分析

DPDK stack源码分析

作者: fooboo | 来源:发表于2020-01-26 22:39 被阅读0次

    在之前分析过dpdk中的相关一些开源实现,然后当时的版本是16.07.2,现在最新的是19.11,代码方面还是有很大的区别,新增不少内容,不过这里不列举,有兴趣相关可以自己看下。这里把有变化的无锁环形队列在这里分析下,接着分析下stack实现,比较简单。这儿是早期的DPDK中无锁环形队列实现

    因为早期的struct rte_ring中的struct prod和struct cons有几个数据成员是冗余的,完全可以提取出来作为struct rte_ring的成员:

     64 /* structure to hold a pair of head/tail values and other metadata */
     65 struct rte_ring_headtail {
     66     volatile uint32_t head;  /**< Prod/consumer head. */
     67     volatile uint32_t tail;  /**< Prod/consumer tail. */
     68     uint32_t single;         /**< True if single prod/cons */
     69 };
    
     81 struct rte_ring {
     88     int flags;               /**< Flags supplied at creation. */
     91     uint32_t size;           /**< Size of ring. */
     92     uint32_t mask;           /**< Mask (size-1) of ring. */
     93     uint32_t capacity;       /**< Usable size of ring */
     94 
     95     char pad0 __rte_cache_aligned; /**< empty cache line */
     96 
     97     /** Ring producer status. */
     98     struct rte_ring_headtail prod __rte_cache_aligned;
     99     char pad1 __rte_cache_aligned; /**< empty cache line */
    100 
    101     /** Ring consumer status. */
    102     struct rte_ring_headtail cons __rte_cache_aligned;
    103     char pad2 __rte_cache_aligned; /**< empty cache line */
    104 };
    

    这里在创建的时候根据RING_F_EXACT_SZ来决定是否要调整在使用时的大小,在最坏情况会浪费一些存储空间“Worst case, if a power-of-2 size is requested, half the ring space will be wasted.”,所以在使用的过程中,还是需要认真分析下代码和api的说明:

     96     if (flags & RING_F_EXACT_SZ) {
     97         r->size = rte_align32pow2(count + 1);
     98         r->mask = r->size - 1;
     99         r->capacity = count;//实际只有count
    100     } else {
    101         if ((!POWEROF2(count)) || (count > RTE_RING_SZ_MASK)) {
    105             return -EINVAL;
    106         }
    107         r->size = count;
    108         r->mask = count - 1;
    109         r->capacity = r->mask;
    110     }
    
     47 /* return the size of memory occupied by a ring */
     48 ssize_t
     49 rte_ring_get_memsize(unsigned count)
     50 {   
     51     ssize_t sz;
     52     
     53     /* count must be a power of 2 */ 
     54     if ((!POWEROF2(count)) || (count > RTE_RING_SZ_MASK )) {
     58         return -EINVAL;
     59     }
     60     
     61     sz = sizeof(struct rte_ring) + count * sizeof(void *);//实际存储空间在struct rte_ring末尾
     62     sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
     63     return sz;
     64 }
    

    返回ring中元素的个数:

    696 static inline unsigned
    697 rte_ring_count(const struct rte_ring *r)
    698 {   
    699     uint32_t prod_tail = r->prod.tail;
    700     uint32_t cons_tail = r->cons.tail;
    701     uint32_t count = (prod_tail - cons_tail) & r->mask;
    702     return (count > r->capacity) ? r->capacity : count;
    703 }
    

    因为都是无符号数,所以count一定是>=0的,不管prod_tail是在cons_tail后面,还是在前面,算出来的元素个数肯定是<= capacity的,但是因为有RING_F_EXACT_SZ的情况下,会直接返回capacity。

    其他入队和出队等一些操作和原来的实现差不多思想,只不过这里在获取偏移量的时候多了一个屏障原语,在某些平台上效果不一样,感受一下,比如在__rte_ring_move_prod_head的时候:

     65     do {
     66         /* Reset n to the initial burst count */
     67         n = max;
     69         *old_head = r->prod.head;
     70 
     71         /* add rmb barrier to avoid load/load reorder in weak
     72          * memory model. It is noop on x86
     73          */
     74         rte_smp_rmb();
     75 
     76         /*
     77          *  The subtraction is done between two unsigned 32bits value
     78          * (the result is always modulo 32 bits even if we have
     79          * *old_head > cons_tail). So 'free_entries' is always between 0
     80          * and capacity (which is < size).
     81          */
     82         *free_entries = (capacity + r->cons.tail - *old_head);
    
     64     *old_head = __atomic_load_n(&r->prod.head, __ATOMIC_RELAXED);
     65     do {
     66         /* Reset n to the initial burst count */
     67         n = max;
     69         /* Ensure the head is read before tail */
     70         __atomic_thread_fence(__ATOMIC_ACQUIRE);
     71 
     72         /* load-acquire synchronize with store-release of ht->tail
     73          * in update_tail.
     74          */
     75         cons_tail = __atomic_load_n(&r->cons.tail,
     76                     __ATOMIC_ACQUIRE);
     77 
     83         *free_entries = (capacity + cons_tail - *old_head);
    

    stack

    dpdk中的stack有两种,一种是加锁实现的一种是无锁:

     74 /* The RTE stack structure contains the LIFO structure itself, plus metadata
     75  * such as its name and memzone pointer.
     76  */
     77 struct rte_stack {
     82     uint32_t capacity; /**< Usable size of the stack. */
     83     uint32_t flags; /**< Flags supplied at creation. */
     85     union {
     86         struct rte_stack_lf stack_lf; /**< Lock-free LIFO structure. */
     87         struct rte_stack_std stack_std; /**< LIFO structure. */
     88     };
     89 } __rte_cache_aligned;
    

    其中加锁实现的比较简单:

     68 struct rte_stack_std {
     69     rte_spinlock_t lock; /**< LIFO lock */
     70     uint32_t len; /**< LIFO len */
     71     void *objs[]; /**< LIFO pointer table */
     72 };
    

    无锁的实现较复杂些,用list来组织:

     36 struct rte_stack_lf_elem {
     37     void *data;         /**< Data pointer */
     38     struct rte_stack_lf_elem *next; /**< Next pointer */
     39 };
     40 
     41 struct rte_stack_lf_head {
     42     struct rte_stack_lf_elem *top; /**< Stack top */
     43     uint64_t cnt; /**< Modification counter for avoiding ABA problem */
     44 };
     45 
     46 struct rte_stack_lf_list {
     47     /** List head */
     48     struct rte_stack_lf_head head __rte_aligned(16);
     49     /** List len */
     50     uint64_t len;
     51 };
     52 
     53 /* Structure containing two lock-free LIFO lists: the stack itself and a list
     54  * of free linked-list elements.
     55  */
     56 struct rte_stack_lf {
     57     /** LIFO list of elements */
     58     struct rte_stack_lf_list used __rte_cache_aligned;
     59     /** LIFO list of free elements */
     60     struct rte_stack_lf_list free __rte_cache_aligned;
     61     /** LIFO elements */
     62     struct rte_stack_lf_elem elems[] __rte_cache_aligned;
     63 };
    

    这里直接贴上加锁的实现,因为代码比较简单:

     23 static __rte_always_inline unsigned int
     24 __rte_stack_std_push(struct rte_stack *s, void * const *obj_table,
     25              unsigned int n)
     26 {
     27     struct rte_stack_std *stack = &s->stack_std;
     28     unsigned int index;
     29     void **cache_objs;
     30     
     31     rte_spinlock_lock(&stack->lock);
     32     cache_objs = &stack->objs[stack->len];
     33         
     34     /* Is there sufficient space in the stack? */
     35     if ((stack->len + n) > s->capacity) {
     36         rte_spinlock_unlock(&stack->lock);
     37         return 0;
     38     }
     39 
     40     /* Add elements back into the cache */
     41     for (index = 0; index < n; ++index, obj_table++)
     42         cache_objs[index] = *obj_table;
     43 
     44     stack->len += n;
     45 
     46     rte_spinlock_unlock(&stack->lock);
     47     return n;
     48 }
    
     63 static __rte_always_inline unsigned int
     64 __rte_stack_std_pop(struct rte_stack *s, void **obj_table, unsigned int n)
     65 {
     66     struct rte_stack_std *stack = &s->stack_std;
     67     unsigned int index, len;
     68     void **cache_objs;
     69 
     70     rte_spinlock_lock(&stack->lock);
     71 
     72     if (unlikely(n > stack->len)) {
     73         rte_spinlock_unlock(&stack->lock);
     74         return 0;
     75     }
     76         
     77     cache_objs = stack->objs;
     78         
     79     for (index = 0, len = stack->len - 1; index < n;
     80             ++index, len--, obj_table++)
     81         *obj_table = cache_objs[len];
     82 
     83     stack->len -= n;
     84     rte_spinlock_unlock(&stack->lock);
     85 
     86     return n;
     87 }
    

    push元素的时候,直接加spinlock并算出还有多少的空间,最后设置大小并解锁;pop元素,加spinlock并算出能否pop指定数量的元素,最后pop指定元素并减少大小并解锁;因为这里是要指定的n,如果要pop 5个但只有3个,这里并不pop 3个而是直接返回0个。

    无锁的stack实现较复杂,先来看push元素的时候要做些什么:

     30 __rte_experimental
     31 static __rte_always_inline unsigned int
     32 __rte_stack_lf_push(struct rte_stack *s,
     33             void * const *obj_table,
     34             unsigned int n)
     35 {
     36     struct rte_stack_lf_elem *tmp, *first, *last = NULL;
     37     unsigned int i;
     38 
     39     if (unlikely(n == 0))
     40         return 0;
     41 
     42     /* Pop n free elements */
     43     first = __rte_stack_lf_pop_elems(&s->stack_lf.free, n, NULL, &last);
     44     if (unlikely(first == NULL))
     45         return 0;
     46 
     47     /* Construct the list elements */
     48     for (tmp = first, i = 0; i < n; i++, tmp = tmp->next)
     49         tmp->data = obj_table[n - i - 1];
     50 
     51     /* Push them to the used list */
     52     __rte_stack_lf_push_elems(&s->stack_lf.used, first, last, n);
     53 
     54     return n;
     55 }
    
     74 static __rte_always_inline struct rte_stack_lf_elem *
     75 __rte_stack_lf_pop_elems(struct rte_stack_lf_list *list,
     76              unsigned int num,
     77              void **obj_table,
     78              struct rte_stack_lf_elem **last)
     79 {   
     80     struct rte_stack_lf_head old_head;
     81     int success;
     82 
     83     /* Reserve num elements, if available */
     84     while (1) {
     85         uint64_t len = rte_atomic64_read((rte_atomic64_t *)&list->len);
     86     
     87         /* Does the list contain enough elements? */
     88         if (unlikely(len < num))
     89             return NULL;
     90 
     91         if (rte_atomic64_cmpset((volatile uint64_t *)&list->len,
     92                     len, len - num))
     93             break;
     94     }
     95 
     96     old_head = list->head;
     98     /* Pop num elements */
     99     do {
    100         struct rte_stack_lf_head new_head;
    101         struct rte_stack_lf_elem *tmp;
    102         unsigned int i;
    103 
    104         /* An acquire fence (or stronger) is needed for weak memory
    105          * models to ensure the LF LIFO element reads are properly
    106          * ordered with respect to the head pointer read.
    107          */
    108         rte_smp_mb();
    109 
    110         rte_prefetch0(old_head.top);
    111 
    112         tmp = old_head.top;
    113 
    114         /* Traverse the list to find the new head. A next pointer will
    115          * either point to another element or NULL; if a thread
    116          * encounters a pointer that has already been popped, the CAS
    117          * will fail.
    118          */
    119         for (i = 0; i < num && tmp != NULL; i++) {
    120             rte_prefetch0(tmp->next);
    121             if (obj_table)
    122                 obj_table[i] = tmp->data;
    123             if (last)
    124                 *last = tmp;
    125             tmp = tmp->next;
    126         }
    127 
    128         /* If NULL was encountered, the list was modified while
    129          * traversing it. Retry.
    130          */
    131         if (i != num)
    132             continue;
    133 
    134         new_head.top = tmp;
    135         new_head.cnt = old_head.cnt + 1;
    136 
    137         /* old_head is updated on failure */
    138         success = rte_atomic128_cmp_exchange(
    139                 (rte_int128_t *)&list->head,
    140                 (rte_int128_t *)&old_head,
    141                 (rte_int128_t *)&new_head,
    142                 1, __ATOMIC_RELEASE,
    143                 __ATOMIC_RELAXED);
    144     } while (success == 0);
    145 
    146     return old_head.top;
    147 }
    
     33 static __rte_always_inline void
     34 __rte_stack_lf_push_elems(struct rte_stack_lf_list *list,
     35               struct rte_stack_lf_elem *first,
     36               struct rte_stack_lf_elem *last,
     37               unsigned int num)
     38 {
     39     struct rte_stack_lf_head old_head;
     40     int success;
     41 
     42     old_head = list->head;
     43 
     44     do {
     45         struct rte_stack_lf_head new_head;
     46 
     47         /* An acquire fence (or stronger) is needed for weak memory
     48          * models to establish a synchronized-with relationship between
     49          * the list->head load and store-release operations (as part of
     50          * the rte_atomic128_cmp_exchange()).
     51          */
     52         rte_smp_mb();
     53 
     54         /* Swing the top pointer to the first element in the list and
     55          * make the last element point to the old top.
     56          */
     57         new_head.top = first;
     58         new_head.cnt = old_head.cnt + 1;
     59 
     60         last->next = old_head.top;
     62         /* old_head is updated on failure */
     63         success = rte_atomic128_cmp_exchange(
     64                 (rte_int128_t *)&list->head,
     65                 (rte_int128_t *)&old_head,
     66                 (rte_int128_t *)&new_head,
     67                 1, __ATOMIC_RELEASE,
     68                 __ATOMIC_RELAXED);
     69     } while (success == 0);
     70 
     71     rte_atomic64_add((rte_atomic64_t *)&list->len, num);
     72 }
    

    以上push元素时,过程是有些复杂,先从free中pop出可存放一定数量元素的空间:先原子判断len是否达到n,如果不够直接返回null,否则cas设置直到成功,当然可能过程会set失败则继续判断等,这里假设成功后进行后续操作;取出rte_stack_lf_elem并设置,此时会进行预取到cache各层级,并对cnt加1如注释说明中防止aba问题,最后cas设置free新的head。然后把元素依次设置tmp->data = obj_table[n - i - 1]。最后需要更新到used:这里设置新的head,并把老的top更新到last->next,进行cas设置新的head和len。

     69 __rte_experimental
     70 static __rte_always_inline unsigned int
     71 __rte_stack_lf_pop(struct rte_stack *s, void **obj_table, unsigned int n)
     72 {
     73     struct rte_stack_lf_elem *first, *last = NULL;
     74 
     75     if (unlikely(n == 0))
     76         return 0;
     77 
     78     /* Pop n used elements */
     79     first = __rte_stack_lf_pop_elems(&s->stack_lf.used,
     80                      n, obj_table, &last);
     81     if (unlikely(first == NULL))
     82         return 0;
     83 
     84     /* Push the list elements to the free list */
     85     __rte_stack_lf_push_elems(&s->stack_lf.free, first, last, n);
     86 
     87     return n;
     88 }
    

    从stack进行pop元素和push是相反的过程,具体代码一样。

     11 static __rte_always_inline unsigned int
     12 __rte_stack_lf_count(struct rte_stack *s)
     13 {   
     14     /* stack_lf_push() and stack_lf_pop() do not update the list's contents
     15      * and stack_lf->len atomically, which can cause the list to appear
     16      * shorter than it actually is if this function is called while other
     17      * threads are modifying the list.
     18      *
     19      * However, given the inherently approximate nature of the get_count
     20      * callback -- even if the list and its size were updated atomically,
     21      * the size could change between when get_count executes and when the
     22      * value is returned to the caller -- this is acceptable.
     23      *
     24      * The stack_lf->len updates are placed such that the list may appear to
     25      * have fewer elements than it does, but will never appear to have more
     26      * elements. If the mempool is near-empty to the point that this is a
     27      * concern, the user should consider increasing the mempool size.
     28      */
     29     return (unsigned int)rte_atomic64_read((rte_atomic64_t *)
     30             &s->stack_lf.used.len);
     31 }
    

    由于这结构对于多线程同时push或pop或者push/pop同时进行,会出现各种操作序列,包括调用__rte_stack_lf_count时。这里以一个线程push,一个线程pop为例,其他情况同时push或pop时比较简单。而有线程在push或pop时,有另外线程在__rte_stack_lf_count时,因为更新同步有延迟,可能看到的并不一样,看__rte_stack_lf_count调用的具体场合,这里有兴趣自行分析各种情况。

    其他的是创建和初始化:

      7 void
      8 rte_stack_lf_init(struct rte_stack *s, unsigned int count)
      9 {
     10     struct rte_stack_lf_elem *elems = s->stack_lf.elems;
     11     unsigned int i;
     12     
     13     for (i = 0; i < count; i++)
     14         __rte_stack_lf_push_elems(&s->stack_lf.free,
     15                       &elems[i], &elems[i], 1);
     16 }
     17         
     18 ssize_t 
     19 rte_stack_lf_get_memsize(unsigned int count)
     20 {
     21     ssize_t sz = sizeof(struct rte_stack);
     22     
     23     sz += RTE_CACHE_LINE_ROUNDUP(count * sizeof(struct rte_stack_lf_elem));
     24     
     25     /* Add padding to avoid false sharing conflicts caused by
     26      * next-line hardware prefetchers.
     27      */ 
     28     sz += 2 * RTE_CACHE_LINE_SIZE;
     29 
     30     return sz;
     31 }
    

    由于过年在家因为病毒的复杂原因基本不出去,可能有更多的时间学些想学的,希望早点恢复正常。

    相关文章

      网友评论

          本文标题:DPDK stack源码分析

          本文链接:https://www.haomeiwen.com/subject/coaezctx.html