美文网首页
DPDK中的多线程模型

DPDK中的多线程模型

作者: fooboo | 来源:发表于2017-02-22 23:00 被阅读2007次

最近在做dpdk相关的开发和调优,有点小压力,但还是能成长的,打算用两篇博客来介绍下dpdk中的多线程模型,然后下一篇介绍它里面的一些优化方法和NUMA架构[高性能的原因]。
主要是截取相关的源代码进行分析,以及开发过程中可能会遇到的一些坑[一些点参考书《深入dpdk》]。

这里每个线程运行在一个核上,在启动时就绑定了,防止上下文切换等性能开销。从helloworld例子开始吧〜

比如启动参数为-c 0xff,在main入口处,会调用rte_eal_init初始化运行相关的参数,其中对启动线程相关的参数分析eal_parse_coremask,主要实现如下:

 274 static int
 275 eal_parse_coremask(const char *coremask)
 276 {
 277     struct rte_config *cfg = rte_eal_get_configuration();
 278     int i, j, idx = 0;
 279     unsigned count = 0;
 280     char c;
 281     int val;
 282 
 283     if (coremask == NULL)
 284         return -1;
 285     /* Remove all blank characters ahead and after .
 286      * Remove 0x/0X if exists.
 287      */
 288     while (isblank(*coremask))
 289         coremask++;
 290     if (coremask[0] == '0' && ((coremask[1] == 'x')
 291         || (coremask[1] == 'X')))
 292         coremask += 2;
 293     i = strlen(coremask);
 294     while ((i > 0) && isblank(coremask[i - 1]))
 295         i--;
 296     if (i == 0)
 297         return -1;
 299     for (i = i - 1; i >= 0 && idx < RTE_MAX_LCORE; i--) {
 300         c = coremask[i];
 301         if (isxdigit(c) == 0) {
 302             /* invalid characters */
 303             return -1;
 304         }
 305         val = xdigit2val(c);
 306         for (j = 0; j < BITS_PER_HEX && idx < RTE_MAX_LCORE; j++, idx++)
 307         {   
 308             if ((1 << j) & val) {
 309                 if (!lcore_config[idx].detected) {
 310                     RTE_LOG(ERR, EAL, "lcore %u "
 311                             "unavailable\n", idx);
 312                     return -1;
 313                 }
 314                 cfg->lcore_role[idx] = ROLE_RTE;
 315                 lcore_config[idx].core_index = count;
 316                 count++;
 317             } else {
 318                 cfg->lcore_role[idx] = ROLE_OFF;
 319                 lcore_config[idx].core_index = -1;
 320             }
 321         }
 322     }
 323     for (; i >= 0; i--)
 324         if (coremask[i] != '0')
 325             return -1;
 326     for (; idx < RTE_MAX_LCORE; idx++) {
 327         cfg->lcore_role[idx] = ROLE_OFF;
 328         lcore_config[idx].core_index = -1;
 329     }
 330     if (count == 0)
 331         return -1;
 332     /* Update the count of enabled logical cores of the EAL configuration */
 333     cfg->lcore_count = count;
 334     return 0;
 335 }

 79 struct rte_config {
 80     uint32_t master_lcore;       /**< Id of the master lcore */
 81     uint32_t lcore_count;        /**< Number of available logical cores. */
 82     enum rte_lcore_role_t lcore_role[RTE_MAX_LCORE]; /**< State of cores. */
 83 
 84     /** Primary or secondary configuration */
 85     enum rte_proc_type_t process_type;
 86 
 87     /**  
 88      * Pointer to memory configuration, which may be shared across multiple
 89      * DPDK instances
 90      */
 91     struct rte_mem_config *mem_config;
 92 } __attribute__((__packed__));

 63 struct lcore_config {
 64     unsigned detected;         /**< true if lcore was detected */
 65     pthread_t thread_id;       /**< pthread identifier */
 66     int pipe_master2slave[2];  /**< communication pipe with master */
 67     int pipe_slave2master[2];  /**< communication pipe with master */
 68     lcore_function_t * volatile f;         /**< function to call */
 69     void * volatile arg;       /**< argument of function */
 70     volatile int ret;          /**< return value of function */
 71     volatile enum rte_lcore_state_t state; /**< lcore state */
 72     unsigned socket_id;        /**< physical socket id for this lcore */
 73     unsigned core_id;          /**< core number on socket for this lcore */
 74     int core_index;            /**< relative index, starting from 0 */
 75     rte_cpuset_t cpuset;       /**< cpu set which the lcore affinity to */
 76 };

行288~297是跳过启动参数-c 0xff中-c和0xff之间的空白符,跳过0x,并跳过ff右边的空白符,即执行完此语句后只剩下ff;
行299~322是对ff从右往左依次判断是否是十六进制数字,然后再转换成int型,比如f对应的int的二进制为000...1111,如果相应的位为1则执行cfg->lcore_role[idx] = ROLE_RTE[在core_config[idx].detected为true的情况下],否则cfg->lcore_role[idx] = ROLE_OFF
行323~334分别对其它位置的参数检测,因为RTE_MAX_LCORE在此源码中为32,跳出行299的循环要么是return -1,要么是idx < RTE_MAX_LCORE,如果输入的参数比如0x444ffffff是有问题的,再如0x00000000ffffff则合法,并对未设置的核进行cfg->lcore_role[idx] = ROLE_OFF
struct lcore_config是核的配置结构数据,而struct rte_config是全局rte配置结构数据;

 53 int
 54 rte_eal_cpu_init(void)
 55 {
 56     /* pointer to global configuration */
 57     struct rte_config *config = rte_eal_get_configuration();
 58     unsigned lcore_id;
 59     unsigned count = 0;
 60 
 61     /*
 62      * Parse the maximum set of logical cores, detect the subset of running
 63      * ones and enable them by default.
 64      */
 65     for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) {
 66         lcore_config[lcore_id].core_index = count;
 67 
 68         /* init cpuset for per lcore config */
 69         CPU_ZERO(&lcore_config[lcore_id].cpuset);
 70 
 71         /* in 1:1 mapping, record related cpu detected state */
 72         lcore_config[lcore_id].detected = eal_cpu_detected(lcore_id);
 73         if (lcore_config[lcore_id].detected == 0) {
 74             config->lcore_role[lcore_id] = ROLE_OFF;
 75             lcore_config[lcore_id].core_index = -1;
 76             continue;
 77         }
 78 
 79         /* By default, lcore 1:1 map to cpu id */
 80         CPU_SET(lcore_id, &lcore_config[lcore_id].cpuset);
 81 
 82         /* By default, each detected core is enabled */
 83         config->lcore_role[lcore_id] = ROLE_RTE;
 84         lcore_config[lcore_id].core_id = eal_cpu_core_id(lcore_id);
 85         lcore_config[lcore_id].socket_id = eal_cpu_socket_id(lcore_id);
100         count++;
101     }
102     /* Set the count of enabled logical cores of the EAL configuration */
103     config->lcore_count = count;
108     //more code...
109     return 0;
110 }

rte_eal_cpu_init函数主要设置每个线程lcore_config相关信息,其中CPU_ZEROCPU_SET是设置相关cpu亲和性的接口,这里指的是硬亲和性;简单解释下这样做的好处:使得线/进程在某个给定的 CPU 上尽量长时间地运行而不被迁移到其他处理器,迁移的频率小就意味着产生的负载小,提高了cpu cache的命中率,从而减少内存访问损耗,提高程序的速度,每个核心可以更专注地处理一件事情,资源体系被充分使用,减少了同步的损耗;

99 /* set affinity for current thread */
100 static int
101 eal_thread_set_affinity(void)
102 {   
103     unsigned lcore_id = rte_lcore_id();
104 
105     /* acquire system unique id  */
106     rte_gettid();
107         
108     /* update EAL thread core affinity */
109     return rte_thread_set_affinity(&lcore_config[lcore_id].cpuset);
110 }
112 void eal_thread_init_master(unsigned lcore_id)
113 {       
114     /* set the lcore ID in per-lcore memory area */
115     RTE_PER_LCORE(_lcore_id) = lcore_id;
116         
117     /* set CPU affinity */
118     if (eal_thread_set_affinity() < 0)
119         rte_panic("cannot set affinity\n");
120 } 

上面是设置主线程的cpu亲和性,绑定指定核掩码的第一个;

586     RTE_LCORE_FOREACH_SLAVE(i) {
587 
588         /*
589          * create communication pipes between master thread
590          * and children
591          */
592         if (pipe(lcore_config[i].pipe_master2slave) < 0)
593             rte_panic("Cannot create pipe\n");
594         if (pipe(lcore_config[i].pipe_slave2master) < 0)
595             rte_panic("Cannot create pipe\n");
596 
597         lcore_config[i].state = WAIT;
598 
599         /* create a thread for each lcore */
600         ret = pthread_create(&lcore_config[i].thread_id, NULL,
601                      eal_thread_loop, NULL);
602         if (ret != 0)
603             rte_panic("Cannot create thread\n");
604 
605         /* Set thread_name for aid in debugging. */
606         snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN,
607                 "lcore-slave-%d", i);
608         rte_thread_setname(lcore_config[i].thread_id, thread_name);
609     }

上面主要是创建每个工作线程,设置初始状态,初始化pipe[以前只知道用于进程间通信];

122 /* main loop of threads */
123 __attribute__((noreturn)) void *
124 eal_thread_loop(__attribute__((unused)) void *arg)
125 {
126     char c;
127     int n, ret;
128     unsigned lcore_id;
129     pthread_t thread_id;
130     int m2s, s2m;
131     char cpuset[RTE_CPU_AFFINITY_STR_LEN];
132 
133     thread_id = pthread_self();
134 
135     /* retrieve our lcore_id from the configuration structure */
136     RTE_LCORE_FOREACH_SLAVE(lcore_id) {
137         if (thread_id == lcore_config[lcore_id].thread_id)
138             break;
139     }
140     if (lcore_id == RTE_MAX_LCORE)
141         rte_panic("cannot retrieve lcore id\n");
142 
143     m2s = lcore_config[lcore_id].pipe_master2slave[0];
144     s2m = lcore_config[lcore_id].pipe_slave2master[1];
145 
146     /* set the lcore ID in per-lcore memory area */
147     RTE_PER_LCORE(_lcore_id) = lcore_id;
148 
149     /* set CPU affinity */
150     if (eal_thread_set_affinity() < 0)
151         rte_panic("cannot set affinity\n");
152 
153     ret = eal_thread_dump_affinity(cpuset, RTE_CPU_AFFINITY_STR_LEN);
154 
155     RTE_LOG(DEBUG, EAL, "lcore %u is ready (tid=%p;cpuset=[%s%s])\n",
156         lcore_id, thread_id, cpuset, ret == 0 ? "" : "...");
157 
158     /* read on our pipe to get commands */
159     while (1) {
160         void *fct_arg;
161 
162         /* wait command */
163         do {
164             n = read(m2s, &c, 1);
165         } while (n < 0 && errno == EINTR);
166 
167         if (n <= 0)
168             rte_panic("cannot read on configuration pipe\n");
169 
170         lcore_config[lcore_id].state = RUNNING;
171 
172         /* send ack */
173         n = 0;
174         while (n == 0 || (n < 0 && errno == EINTR))
175             n = write(s2m, &c, 1);
176         if (n < 0)
177             rte_panic("cannot write on configuration pipe\n");
178 
179         if (lcore_config[lcore_id].f == NULL)
180             rte_panic("NULL function pointer\n");
181 
182         /* call the function and store the return value */
183         fct_arg = lcore_config[lcore_id].arg;
184         ret = lcore_config[lcore_id].f(fct_arg);
185         lcore_config[lcore_id].ret = ret;
186         rte_wmb();
187         lcore_config[lcore_id].state = FINISHED;
188     }
189 
190     /* never reached */
191     /* pthread_exit(NULL); */
192     /* return NULL; */
193 }

上面这段代码主要是从pipe读到命令,并写入pipe确认,然后执行线程工作函数;即行184,参数是行183,然后保存结果并更新状态;rte_wmb,rte_rmb作用会在后面介绍;
例子中的main部分如下:

 67 /* call lcore_hello() on every slave lcore */
 68     RTE_LCORE_FOREACH_SLAVE(lcore_id) {
 69         rte_eal_remote_launch(lcore_hello, NULL, lcore_id);
 70     }
 71     
 72     /* call it on master lcore too */
 73     lcore_hello(NULL);
 74 
 75     rte_eal_mp_wait_lcore();
110 void
111 rte_eal_mp_wait_lcore(void)
112 {
113     unsigned lcore_id;
114 
115     RTE_LCORE_FOREACH_SLAVE(lcore_id) {
116         rte_eal_wait_lcore(lcore_id);
117     }
118 }
 47 /*
 48  * Wait until a lcore finished its job.
 49  */
 50 int
 51 rte_eal_wait_lcore(unsigned slave_id)
 52 {
 53     if (lcore_config[slave_id].state == WAIT)
 54         return 0;
 55 
 56     while (lcore_config[slave_id].state != WAIT &&
 57            lcore_config[slave_id].state != FINISHED);
 58 
 59     rte_rmb();
 60 
 61     /* we are in finished state, go to wait state */
 62     lcore_config[slave_id].state = WAIT;
 63     return lcore_config[slave_id].ret;
 64 }

以上代码就是设置工作线程的主函数lcore_hello,并等待结束,整个线程模型还是中规中矩的。

参考:
http://www.cnblogs.com/LubinLew/p/cpu_affinity.html
https://www.ibm.com/developerworks/cn/linux/l-affinity.html

相关文章

网友评论

      本文标题:DPDK中的多线程模型

      本文链接:https://www.haomeiwen.com/subject/xwqjittx.html