参考 https://zhuanlan.zhihu.com/p/503733481
style: return value >= 0 表示 success
1. 三大特点
(1) 线程池 running 时, 无需记录任何 线程 id 或对象, 线程池 terminating 时, 线程池可以通过 一个等一个的方式 优雅地去结束所有线程
即 每个线程都是对等的
例: 发起 destroy 的人 若为 mainThread, 如何做到一个等一个退出
线程(池)的退出, 由 thrdpool_destroy() 设置 pool->terminate 开始
每个线程, 在 while(1) 里 会第一时间发现 terminate (!= NUll), 线程池要退出 了, 然后 break 出 while 循环
注意: 此时, 还 持有 mutex, 拿到 pool 上 唯一的 tid, 放到 自己的 临时变量, 据 tid 是否 == 0 决定是否 join preWorkerThread
把 自己的 tid == pthread_self() 放到 pool 上(以使 preThread join 他), 再解锁 mutex
=>
1] 第1个从 pool 上 拿 tid 的 人, 会发现是 0 值, 可直接结束
不用负责 等待 任何其他人, 但我 在完全结束前 需要 有人负责等待我的结束, 所以我会把我的 tid 放上去
2] 而如果发现自己从 pool里拿到的 tid 不是 0 值, 说明 我要负责 join 上一个人, 并把我的 tid 放上去,让下一个人负责我
3] 最后 那个人(workerThread), 是发现 pool->nthreads 为 0 的 人,
他通过 terminate(condition) 通知 发起 destroy 的人
4] 最后 thrdPoolDestoryerThread 退出
非常优雅的做法!!!
(2) a thrdPoolTask(消费者) 可以 调 another thrdPoolTask;
线程池 destorying 时, 线程池中的 task 也可以通过 调 thrdpool_schedule() 来 提交 nextTask
这很重要, 因为线程本身很可能不知道线程池的状态
即, 每个任务是对等的
another thrdPoolTask / the next thrdPoolTask 是 ( thrdPoolTask routine 中 调)
[1] thrdpool_schedule() --- 生产者
[2] __thrdpool_terminate() --- thrdPoolTerminateThread
[3] thrdpool_destroy() --- thrdPoolDestoryerThread
只要 taskList 管理得好, 消费者(consumer) 所执行的函数 也可以做 生产者(producer)
执行 routine 的线程, 都是 消费者
发起 schedule 的线程, 都是 生产者
(3) 线程任务 可销毁 线程池
即 每一种行为是对等的, 包括 destroy
2. 线程池中 任务入口:
[1] hook: 双向链表 的 node, 无 data 成员, 只有 pre/next 的 link 成员, 以 形成 hook list
[2] task 靠 关联的 hook 被 挂起来, 形成 task list
prototype & context 相同
struct __thrdpool_task_entry
{
struct list_head list; // task's hook(钩子): 将 task 挂起来, hookList - - - 固定 offset - - - > taskList
struct thrdpool_task task; // task
};
3. 线程池
struct __thrdpool
{
struct list_head task_queue; // 任务队列 关联的 hookList 的 headNode(invalid, there is no associated task)
size_t nthreads; // 线程数
size_t stacksize; // 线程栈 size
pthread_t tid; // pool 运行 时, pool 上记录的 tid 是 zero 值
pthread_mutex_t mutex;
pthread_cond_t cond;
pthread_key_t key;
pthread_cond_t *terminate;
};
没有一个多余, 每个成员都很到位
task_queue
1) init
thrdpool_create() 中
INIT_LIST_HEAD(&pool->task_queue);
hookList init: invalid 头结点(pool->task_queue) pre/next 都 指向自身
task_queue 无 associated task
=> task_queue 含义: hookList 的 invalid 头结点
2) 尾插 task 到 taskList: 变 link 域 (pre/next)
__thrdpool_schedule() 中
list_add_tail(&entry->list, &pool->task_queue);
尾插 task's hook 到 hookList <=> 尾插 task 到 taskList
尾插: 插到 hookList 的 headPre == tail 与 head 之间
3) 取出 firstTask -> delete it from tastList -> 先解锁 -> 再 执行 firstTask
取 firstValidHook(==) 的 ptr
-> 据 firstValidHook(struct mem) 的 ptr(mem ptr/address 更大)
与其 所在 taskEntry(struct) 地址 的 offset
求 其 taskEntry 的 addr
-> delete firstValidHook from hookList <=> delete firstTask from taskList
-> ...
__thrdpool_routine()
{
// ...
headNext = &pool->task_queue.next;
// ...
entry = list_entry(*headNext, struct __thrdpool_task_entry, list);
list_del(*headNext);
pthread_mutex_unlock(&pool->mutex);
task_routine = entry->task.routine;
task_context = entry->task.context;
free(entry);
task_routine(task_context);
}
4) 逐个 取出 (first)task -> 从 taskList 删除 -> 通过 usrFuncPtr 让 user 回调
thrdpool_destroy()
{
// ...
list_for_each_safe(pos, tmp, &pool->task_queue)
entry = list_entry(pos, struct __thrdpool_task_entry, list);
list_del(pos);
if (pending)
pending(&entry->task);
}
nthreads
= 0
thrdpool_create() 中
++
1] __thrdpool_create_threads() 中 pthread_create() 后
2] thrdpool_increase() 中 pthread_create() 后
--
1] __thrdpool_terminate() 中 in_pool == 1 时
2] __thrdpool_routine() 中 pool->terminate 被置为 cond 时
tid
线程 id, 线程池 同时只记录 1个 tid
线程池 运行时, 不会(没必要)记录 current 正在运行的 worker thread 的 tid => tid 是 空/无效值 0
线程池 退出时, tid 用来实现 `链式等待`
init: 0 值
memset(&pool->tid, 0, sizeof (pthread_t));
use
/*
1) extract old pool->tid: 把 线程池 上记录的 tid(上一个人的 tid) 拿下来,
我(current work thread) 来 负责 上一个人 (pre worker thread)
2) update new pool->tid: 把我自己的 tid 记录到 线程池 上, 下一个人来 负责 我
3) 每个人都减 1, 最后一个人(不算 发起 destory) 负责叫醒 发起 destroy 的 人 (main 线程)
4) unlock
5) 只有第1个人 拿到的 tid == 0, 不 join other worker thread (there is no other worker thread)
6) 只要不是 0 值, 我就要负责 等上一个人结束 才能退:
*/
__thrdpool_routine()
{
// ...
tid = pool->tid;
pool->tid = pthread_self();
if (--pool->nthreads == 0)
pthread_cond_signal(pool->terminate);
pthread_mutex_unlock(&pool->mutex); // ===
if (memcmp(&tid, &__zero_tid, sizeof (pthread_t)) != 0)
pthread_join(tid, NULL);
// ...
}
/* __thrdpool_terminate()
(1) 两种 case 下被调用
[1] __thrdpool_create_threads() 中 创建 n 个线程失败 时, 被调用
[2] thrdpool_destroy() 中 被调用
(2) 与 __thrdpool_routine() 中 比较 pool->tid 与 0, 然后 join preWorkerThread
[1] terminateThread/thrdPoolDestoryerThread 等 workerThreadN, 自己 不必被等待
[2] workerThread1 不必等待
[3] workerThread2-N 等待 preWorkerThread
*/
__thrdpool_terminate()
{
// ...
while (pool->nthreads > 0)
pthread_cond_wait(&term, &pool->mutex); // ===
// <=> pthread_cond_wait(pool->terminate, &pool->mutex);
pthread_mutex_unlock(&pool->mutex); // ===
if (memcmp(&pool->tid, &__zero_tid, sizeof (pthread_t)) != 0)
pthread_join(pool->tid, NULL);
}
=> pool->tid 含义:
workerThread1 workerThread2 ... workerThreadN thrdPoolDestoryerThread(main / workerThreadNPlus1)
terminate 线程池 时, 池中各线程 & thrdPoolDestoryerThread 逐个 链式等待(join) 前1个 结束
对 当前 workerThread1 & thrdPoolDestoryerThread 来说 preWorkerThread 的 identifer
对 workerThread1 来说, preWorkerThread 不存在 => pool->tid = 0 表示 无效值
对 等待 workerThreadN 的 thrdPoolDestoryerThread 来说, preWorkerThread 是 workerThreadN
Note
thrdpool_increase() 与 __thrdpool_create_threads() 中 pthread_create(&tid,...) 的 tid 只在 两个函数中用
cond
配合 mutex, 线程间同步
cond 用来 给 生产者和消费者 去操作 taskList 用的
Note: 都在 hold mutex 时进行
1) init
__thrdpool_init_locks(thrdpool_t *pool)
{
...
ret = pthread_cond_init(&pool->cond, NULL);
...
}
2) destory
__thrdpool_destroy_locks(thrdpool_t *pool)
{
...
pthread_cond_destroy(&pool->cond);
}
3) threadPool 运行时, worker thread waiting on cond
等待着
1] a task 被 schedule(调度) 进 threadPool, 或
2] 收到 结束线程池 的 broadcast(广播)
__thrdpool_routine(void *arg)
{
while (1)
{
pthread_mutex_lock(&pool->mutex); // ===
while (!pool->terminate && list_empty(&pool->task_queue))
pthread_cond_wait(&pool->cond, &pool->mutex);
...
}
}
4) 调度 1个 task 时, signal
void __thrdpool_schedule(const struct thrdpool_task *task, void *buf,
thrdpool_t *pool)
{
struct __thrdpool_task_entry *entry = (struct __thrdpool_task_entry *)buf;
// 1)
entry->task = *task;
pthread_mutex_lock(&pool->mutex); // ===
// 2) 尾插 task 到 taskList
list_add_tail(&entry->list, &pool->task_queue);
// 3) 叫醒 a waiting worker thread
pthread_cond_signal(&pool->cond);
pthread_mutex_unlock(&pool->mutex); // ===
}
5) 结束 线程池 时, broadcast
__thrdpool_terminate(int thread_in_pool, thrdpool_t *pool)
{
pthread_cond_t term = PTHREAD_COND_INITIALIZER;
pthread_mutex_lock(&pool->mutex); // ===
// (1) terminate 置 non-NULL
pool->terminate = &term;
// (2)
pthread_cond_broadcast(&pool->cond);
}
key
线程池 的 key
赋予 每个由 线程池创建的线程 用于 thread local
用于 区分 destoryer 线程 是否由 线程池创建 <=> 是否为 consumerThread/workerThread
1) key_create
thrdpool_create(size_t nthreads, size_t stacksize)
{
...
ret = pthread_key_create(&pool->key, NULL);
...
}
2) key_delete
thrdpool_create(size_t nthreads, size_t stacksize)
{
...
if (__thrdpool_create_threads(nthreads, pool) >= 0)
return pool;
// === below: __thrdpool_create_threads(nthreads, pool) fail 时
pthread_key_delete(pool->key);
...
}
thrdpool_destroy(void (*pending)(const struct thrdpool_task *),
thrdpool_t *pool)
{
...
pthread_key_delete(pool->key);
}
3) setspecific
// 由线程池创建的线程, 用 key 关联 thread local(TSD 线程特定数据): 均设为 current 线程池结构指针 pool
// => outside thread: pool->key 关联的不是 pool
__thrdpool_routine(void *arg)
{
...
pthread_setspecific(pool->key, pool);
}
4) getspecific
// 该函数 所在 线程(thrdPoolDestoryerThread) 是否 由线程池创建
inline int thrdpool_in_pool(thrdpool_t *pool)
{
return pthread_getspecific(pool->key) == pool;
}
void thrdpool_destroy(void (*pending)(const struct thrdpool_task *),
thrdpool_t *pool)
{
// destoryerThread 是否由 线程池创建 <=> 是否为 consumerThread/workerThread
int thread_in_pool = thrdpool_in_pool(pool);
...
}
terminate
2个用途
1] 线程池 terminate/destory 时的 标记位
2] 销毁线程池 的 人(thrdPoolDestoryerThread) 要等待的 condition
1) 线程池创建时, 置 NULL, 标志 线程池 处于 运行中(running)
thrdpool_create(size_t nthreads, size_t stacksize)
{
...
pool->terminate = NULL;
...
}
2) 线程池 结束时, 置 non-Null / &term(Conditon),
1] 标志 线程池处于 terminating 中
2] 同时 启用 terminate 作为 链式等待 workerThread 结束 的 CV/Condition 的 作用
__thrdpool_routine(void *arg)
{
while (1)
{
// (1) 从 任务队列 取出 1个任务, 没有就等待
pthread_mutex_lock(&pool->mutex); // ===
while (!pool->terminate && list_empty(&pool->task_queue))
pthread_cond_wait(&pool->cond, &pool->mutex); // ===
if (pool->terminate)
break;
}
...
}
}
__thrdpool_terminate(int thread_in_pool, thrdpool_t *pool)
{
pthread_cond_t term = PTHREAD_COND_INITIALIZER;
pthread_mutex_lock(&pool->mutex); // ===
// (1) 标志 线程池 terminate
pool->terminate = &term;
...
// (2) terminate 线程池 时, 池中各线程 & thrdPoolDestoryerThread 逐个 链式等待(join) 前1个 结束
while (pool->nthreads > 0)
pthread_cond_wait(&term, &pool->mutex); // ===
// <=> pthread_cond_wait(pool->terminate, &pool->mutex);
...
}
3) signal
__thrdpool_routine(void *arg)
{
...
if (--pool->nthreads == 0) // 3) 每个人都减 1, 最后一个人(不算 发起 destory) 负责叫醒 发起 destroy 的 人
pthread_cond_signal(pool->terminate);
...
}
4. 问题?
若 __thrdpool_terminate() 由线程池中 某线程 的 routine() 发起, 则 该线程 等完线程池中 thePreLastWorkerThread 完成,
自己却自行 detach, 那谁等 该线程呢?
因为 这种情形, thrdPoolTerminateThread 或 thrdPoolDestoryerThread 不再是 mainThread,而可能 mainThread 已经终止,
那 所有 subThread(线程池中各线程) 就 直接蒸发了
网友评论