最近在做dpdk相关的开发和调优,有点小压力,但还是能成长的,打算用两篇博客来介绍下dpdk中的多线程模型,然后下一篇介绍它里面的一些优化方法和NUMA架构[高性能的原因]。
主要是截取相关的源代码进行分析,以及开发过程中可能会遇到的一些坑[一些点参考书《深入dpdk》]。
这里每个线程运行在一个核上,在启动时就绑定了,防止上下文切换等性能开销。从helloworld例子开始吧〜
比如启动参数为-c 0xff,在main入口处,会调用rte_eal_init
初始化运行相关的参数,其中对启动线程相关的参数分析eal_parse_coremask
,主要实现如下:
274 static int
275 eal_parse_coremask(const char *coremask)
276 {
277 struct rte_config *cfg = rte_eal_get_configuration();
278 int i, j, idx = 0;
279 unsigned count = 0;
280 char c;
281 int val;
282
283 if (coremask == NULL)
284 return -1;
285 /* Remove all blank characters ahead and after .
286 * Remove 0x/0X if exists.
287 */
288 while (isblank(*coremask))
289 coremask++;
290 if (coremask[0] == '0' && ((coremask[1] == 'x')
291 || (coremask[1] == 'X')))
292 coremask += 2;
293 i = strlen(coremask);
294 while ((i > 0) && isblank(coremask[i - 1]))
295 i--;
296 if (i == 0)
297 return -1;
299 for (i = i - 1; i >= 0 && idx < RTE_MAX_LCORE; i--) {
300 c = coremask[i];
301 if (isxdigit(c) == 0) {
302 /* invalid characters */
303 return -1;
304 }
305 val = xdigit2val(c);
306 for (j = 0; j < BITS_PER_HEX && idx < RTE_MAX_LCORE; j++, idx++)
307 {
308 if ((1 << j) & val) {
309 if (!lcore_config[idx].detected) {
310 RTE_LOG(ERR, EAL, "lcore %u "
311 "unavailable\n", idx);
312 return -1;
313 }
314 cfg->lcore_role[idx] = ROLE_RTE;
315 lcore_config[idx].core_index = count;
316 count++;
317 } else {
318 cfg->lcore_role[idx] = ROLE_OFF;
319 lcore_config[idx].core_index = -1;
320 }
321 }
322 }
323 for (; i >= 0; i--)
324 if (coremask[i] != '0')
325 return -1;
326 for (; idx < RTE_MAX_LCORE; idx++) {
327 cfg->lcore_role[idx] = ROLE_OFF;
328 lcore_config[idx].core_index = -1;
329 }
330 if (count == 0)
331 return -1;
332 /* Update the count of enabled logical cores of the EAL configuration */
333 cfg->lcore_count = count;
334 return 0;
335 }
79 struct rte_config {
80 uint32_t master_lcore; /**< Id of the master lcore */
81 uint32_t lcore_count; /**< Number of available logical cores. */
82 enum rte_lcore_role_t lcore_role[RTE_MAX_LCORE]; /**< State of cores. */
83
84 /** Primary or secondary configuration */
85 enum rte_proc_type_t process_type;
86
87 /**
88 * Pointer to memory configuration, which may be shared across multiple
89 * DPDK instances
90 */
91 struct rte_mem_config *mem_config;
92 } __attribute__((__packed__));
63 struct lcore_config {
64 unsigned detected; /**< true if lcore was detected */
65 pthread_t thread_id; /**< pthread identifier */
66 int pipe_master2slave[2]; /**< communication pipe with master */
67 int pipe_slave2master[2]; /**< communication pipe with master */
68 lcore_function_t * volatile f; /**< function to call */
69 void * volatile arg; /**< argument of function */
70 volatile int ret; /**< return value of function */
71 volatile enum rte_lcore_state_t state; /**< lcore state */
72 unsigned socket_id; /**< physical socket id for this lcore */
73 unsigned core_id; /**< core number on socket for this lcore */
74 int core_index; /**< relative index, starting from 0 */
75 rte_cpuset_t cpuset; /**< cpu set which the lcore affinity to */
76 };
行288~297是跳过启动参数-c 0xff中-c和0xff之间的空白符,跳过0x,并跳过ff右边的空白符,即执行完此语句后只剩下ff;
行299~322是对ff从右往左依次判断是否是十六进制数字,然后再转换成int型,比如f对应的int的二进制为000...1111,如果相应的位为1则执行cfg->lcore_role[idx] = ROLE_RTE
[在core_config[idx].detected
为true的情况下],否则cfg->lcore_role[idx] = ROLE_OFF
;
行323~334分别对其它位置的参数检测,因为RTE_MAX_LCORE
在此源码中为32,跳出行299的循环要么是return -1
,要么是idx < RTE_MAX_LCORE
,如果输入的参数比如0x444ffffff是有问题的,再如0x00000000ffffff则合法,并对未设置的核进行cfg->lcore_role[idx] = ROLE_OFF
;
struct lcore_config
是核的配置结构数据,而struct rte_config
是全局rte配置结构数据;
53 int
54 rte_eal_cpu_init(void)
55 {
56 /* pointer to global configuration */
57 struct rte_config *config = rte_eal_get_configuration();
58 unsigned lcore_id;
59 unsigned count = 0;
60
61 /*
62 * Parse the maximum set of logical cores, detect the subset of running
63 * ones and enable them by default.
64 */
65 for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) {
66 lcore_config[lcore_id].core_index = count;
67
68 /* init cpuset for per lcore config */
69 CPU_ZERO(&lcore_config[lcore_id].cpuset);
70
71 /* in 1:1 mapping, record related cpu detected state */
72 lcore_config[lcore_id].detected = eal_cpu_detected(lcore_id);
73 if (lcore_config[lcore_id].detected == 0) {
74 config->lcore_role[lcore_id] = ROLE_OFF;
75 lcore_config[lcore_id].core_index = -1;
76 continue;
77 }
78
79 /* By default, lcore 1:1 map to cpu id */
80 CPU_SET(lcore_id, &lcore_config[lcore_id].cpuset);
81
82 /* By default, each detected core is enabled */
83 config->lcore_role[lcore_id] = ROLE_RTE;
84 lcore_config[lcore_id].core_id = eal_cpu_core_id(lcore_id);
85 lcore_config[lcore_id].socket_id = eal_cpu_socket_id(lcore_id);
100 count++;
101 }
102 /* Set the count of enabled logical cores of the EAL configuration */
103 config->lcore_count = count;
108 //more code...
109 return 0;
110 }
rte_eal_cpu_init
函数主要设置每个线程lcore_config
相关信息,其中CPU_ZERO
和CPU_SET
是设置相关cpu亲和性的接口,这里指的是硬亲和性;简单解释下这样做的好处:使得线/进程在某个给定的 CPU 上尽量长时间地运行而不被迁移到其他处理器,迁移的频率小就意味着产生的负载小,提高了cpu cache的命中率,从而减少内存访问损耗,提高程序的速度,每个核心可以更专注地处理一件事情,资源体系被充分使用,减少了同步的损耗;
99 /* set affinity for current thread */
100 static int
101 eal_thread_set_affinity(void)
102 {
103 unsigned lcore_id = rte_lcore_id();
104
105 /* acquire system unique id */
106 rte_gettid();
107
108 /* update EAL thread core affinity */
109 return rte_thread_set_affinity(&lcore_config[lcore_id].cpuset);
110 }
112 void eal_thread_init_master(unsigned lcore_id)
113 {
114 /* set the lcore ID in per-lcore memory area */
115 RTE_PER_LCORE(_lcore_id) = lcore_id;
116
117 /* set CPU affinity */
118 if (eal_thread_set_affinity() < 0)
119 rte_panic("cannot set affinity\n");
120 }
上面是设置主线程的cpu亲和性,绑定指定核掩码的第一个;
586 RTE_LCORE_FOREACH_SLAVE(i) {
587
588 /*
589 * create communication pipes between master thread
590 * and children
591 */
592 if (pipe(lcore_config[i].pipe_master2slave) < 0)
593 rte_panic("Cannot create pipe\n");
594 if (pipe(lcore_config[i].pipe_slave2master) < 0)
595 rte_panic("Cannot create pipe\n");
596
597 lcore_config[i].state = WAIT;
598
599 /* create a thread for each lcore */
600 ret = pthread_create(&lcore_config[i].thread_id, NULL,
601 eal_thread_loop, NULL);
602 if (ret != 0)
603 rte_panic("Cannot create thread\n");
604
605 /* Set thread_name for aid in debugging. */
606 snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN,
607 "lcore-slave-%d", i);
608 rte_thread_setname(lcore_config[i].thread_id, thread_name);
609 }
上面主要是创建每个工作线程,设置初始状态,初始化pipe[以前只知道用于进程间通信];
122 /* main loop of threads */
123 __attribute__((noreturn)) void *
124 eal_thread_loop(__attribute__((unused)) void *arg)
125 {
126 char c;
127 int n, ret;
128 unsigned lcore_id;
129 pthread_t thread_id;
130 int m2s, s2m;
131 char cpuset[RTE_CPU_AFFINITY_STR_LEN];
132
133 thread_id = pthread_self();
134
135 /* retrieve our lcore_id from the configuration structure */
136 RTE_LCORE_FOREACH_SLAVE(lcore_id) {
137 if (thread_id == lcore_config[lcore_id].thread_id)
138 break;
139 }
140 if (lcore_id == RTE_MAX_LCORE)
141 rte_panic("cannot retrieve lcore id\n");
142
143 m2s = lcore_config[lcore_id].pipe_master2slave[0];
144 s2m = lcore_config[lcore_id].pipe_slave2master[1];
145
146 /* set the lcore ID in per-lcore memory area */
147 RTE_PER_LCORE(_lcore_id) = lcore_id;
148
149 /* set CPU affinity */
150 if (eal_thread_set_affinity() < 0)
151 rte_panic("cannot set affinity\n");
152
153 ret = eal_thread_dump_affinity(cpuset, RTE_CPU_AFFINITY_STR_LEN);
154
155 RTE_LOG(DEBUG, EAL, "lcore %u is ready (tid=%p;cpuset=[%s%s])\n",
156 lcore_id, thread_id, cpuset, ret == 0 ? "" : "...");
157
158 /* read on our pipe to get commands */
159 while (1) {
160 void *fct_arg;
161
162 /* wait command */
163 do {
164 n = read(m2s, &c, 1);
165 } while (n < 0 && errno == EINTR);
166
167 if (n <= 0)
168 rte_panic("cannot read on configuration pipe\n");
169
170 lcore_config[lcore_id].state = RUNNING;
171
172 /* send ack */
173 n = 0;
174 while (n == 0 || (n < 0 && errno == EINTR))
175 n = write(s2m, &c, 1);
176 if (n < 0)
177 rte_panic("cannot write on configuration pipe\n");
178
179 if (lcore_config[lcore_id].f == NULL)
180 rte_panic("NULL function pointer\n");
181
182 /* call the function and store the return value */
183 fct_arg = lcore_config[lcore_id].arg;
184 ret = lcore_config[lcore_id].f(fct_arg);
185 lcore_config[lcore_id].ret = ret;
186 rte_wmb();
187 lcore_config[lcore_id].state = FINISHED;
188 }
189
190 /* never reached */
191 /* pthread_exit(NULL); */
192 /* return NULL; */
193 }
上面这段代码主要是从pipe读到命令,并写入pipe确认,然后执行线程工作函数;即行184,参数是行183,然后保存结果并更新状态;rte_wmb,rte_rmb
作用会在后面介绍;
例子中的main部分如下:
67 /* call lcore_hello() on every slave lcore */
68 RTE_LCORE_FOREACH_SLAVE(lcore_id) {
69 rte_eal_remote_launch(lcore_hello, NULL, lcore_id);
70 }
71
72 /* call it on master lcore too */
73 lcore_hello(NULL);
74
75 rte_eal_mp_wait_lcore();
110 void
111 rte_eal_mp_wait_lcore(void)
112 {
113 unsigned lcore_id;
114
115 RTE_LCORE_FOREACH_SLAVE(lcore_id) {
116 rte_eal_wait_lcore(lcore_id);
117 }
118 }
47 /*
48 * Wait until a lcore finished its job.
49 */
50 int
51 rte_eal_wait_lcore(unsigned slave_id)
52 {
53 if (lcore_config[slave_id].state == WAIT)
54 return 0;
55
56 while (lcore_config[slave_id].state != WAIT &&
57 lcore_config[slave_id].state != FINISHED);
58
59 rte_rmb();
60
61 /* we are in finished state, go to wait state */
62 lcore_config[slave_id].state = WAIT;
63 return lcore_config[slave_id].ret;
64 }
以上代码就是设置工作线程的主函数lcore_hello
,并等待结束,整个线程模型还是中规中矩的。
参考:
http://www.cnblogs.com/LubinLew/p/cpu_affinity.html
https://www.ibm.com/developerworks/cn/linux/l-affinity.html
网友评论