参考 https://zhuanlan.zhihu.com/p/503733481
// threadPool.h
/*
Copyright (c) 2019 Sogou, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Author: Xie Han (xiehan@sogou-inc.com)
*/
#ifndef _THRDPOOL_H_
#define _THRDPOOL_H_
#include <stddef.h>
// typedef: 封装作用, 使 外部 只可见 struct/类型 名, 不可见 内部实现
typedef struct __thrdpool thrdpool_t;
// 交给线程池的 任务
struct thrdpool_task
{
void (*routine)(void *); // (线程)函数指针
void *context; // 上下文
};
#ifdef __cplusplus
extern "C"
{
#endif
/*
* Thread pool originates from project Sogou C++ Workflow
* https://github.com/sogou/workflow
*
* A thread task can be scheduled by another task, which is very important,
* even if the pool is being destroyed. Because thread task is hard to know
* what's happening to the pool.
* The thread pool can also be destroyed by a thread task. This may sound
* strange, but it's very logical. Destroying thread pool in thread task
* does not end the task thread. It'll run till the end of task.
*/
// 创建 线程池
thrdpool_t *thrdpool_create(size_t nthreads, size_t stacksize);
// 把 任务 交给 线程池 的 入口
int thrdpool_schedule(const struct thrdpool_task *task, thrdpool_t *pool);
int thrdpool_increase(thrdpool_t *pool);
int thrdpool_in_pool(thrdpool_t *pool);
// 销毁 线程池
void thrdpool_destroy(void (*pending)(const struct thrdpool_task *),
thrdpool_t *pool);
#ifdef __cplusplus
}
#endif
#endif
// thrdpool.c
/*
Copyright (c) 2019 Sogou, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Author: Xie Han (xiehan@sogou-inc.com)
*/
#include <errno.h>
#include <pthread.h>
#include <stdlib.h>
#include <string.h>
#include "list.h"
#include "thrdpool.h"
struct __thrdpool // 线程池
{
struct list_head task_queue; // 任务队列 关联的 hookList 的 headNode(invalid, there is no associated task)
size_t nthreads; // 线程数
size_t stacksize; // 线程栈 size
pthread_t tid; // pool 运行 时, pool 上记录的 tid 是 zero 值
pthread_mutex_t mutex;
pthread_cond_t cond;
pthread_key_t key;
pthread_cond_t *terminate;
};
struct __thrdpool_task_entry
{
struct list_head list;
struct thrdpool_task task;
};
static pthread_t __zero_tid; // Note: static global(integer): initValue = 0
static void *__thrdpool_routine(void *arg) // 不停从 任务队列 拿出任务 执行
{
thrdpool_t *pool = (thrdpool_t *)arg;
struct list_head **headNext = &pool->task_queue.next;
struct __thrdpool_task_entry *entry;
void (*task_routine)(void *);
void *task_context;
pthread_t tid;
pthread_setspecific(pool->key, pool);
while (1)
{
pthread_mutex_lock(&pool->mutex); // === // (1) 从 任务队列 取出 1个任务, 没有就等待
while (!pool->terminate && list_empty(&pool->task_queue)) // wait 2 个条件 之一满足即可: pool not terminate 或 taskList empty
pthread_cond_wait(&pool->cond, &pool->mutex); // === Note: 被 broadcase 后, 先 block on mutex, 直到 thrdPoolTerminateThread wait on pool->terminate(CV)
if (pool->terminate) // (2) 既是 线程池 结束的标志位, 也是 发起销毁的那个人 所等待的 condition
break;
entry = list_entry(*headNext, struct __thrdpool_task_entry, list); // (3) extract the first task & delete it from tastList
list_del(*headNext);
pthread_mutex_unlock(&pool->mutex); // === (4) 先解锁
task_routine = entry->task.routine;
task_context = entry->task.context;
free(entry);
task_routine(task_context); // (5) execute the task: routine(context)
if (pool->nthreads == 0) // (6) Note: 哪种 case 能走到这 ? thrdPool's 某个 task 是 thrdPoolTerminateTask, 即 taskRoutine 调用 __thrdpool_terminate(), 调用完成后, 函数栈退出, 返回, 走到这
{
/* Thread pool's memory was free by the thrdPool's task. */ // 线程池里的 task 可 销毁 线程池
free(pool);
return NULL;
}
}
// (7) 线程池 terminate(退出) 时, One thread joins another. Don't need to keep all thread IDs.
tid = pool->tid; // 1) 把线程池上记录的那个tid拿下来,我来负责上一人
pool->tid = pthread_self(); // 2) 把我自己记录到线程池上,下一个人来负责我
if (--pool->nthreads == 0) // 3) 每个人都减 1, 最后一个人(不算 发起 destory) 负责叫醒 发起 destroy 的 人
pthread_cond_signal(pool->terminate);
pthread_mutex_unlock(&pool->mutex); // === 4) unlock
if (memcmp(&tid, &__zero_tid, sizeof (pthread_t)) != 0) // 5) 只有第1个人拿到的 tid == 0
pthread_join(tid, NULL); // 6) 只要不是 0 值, 我就要负责 等上一个人结束 才能退
return NULL; // 7) 退出, 干干净净~
}
static int __thrdpool_init_locks(thrdpool_t *pool) // init pool's mutex & cond
{
int ret;
ret = pthread_mutex_init(&pool->mutex, NULL);
if (ret == 0)
{
ret = pthread_cond_init(&pool->cond, NULL);
if (ret == 0)
return 0;
pthread_mutex_destroy(&pool->mutex); // === below: _cond_init() 失败
}
errno = ret;
return -1;
}
static void __thrdpool_destroy_locks(thrdpool_t *pool)
{
pthread_mutex_destroy(&pool->mutex);
pthread_cond_destroy(&pool->cond);
}
static void __thrdpool_terminate(int thread_in_pool, thrdpool_t *pool)
{
pthread_cond_t term = PTHREAD_COND_INITIALIZER;
pthread_mutex_lock(&pool->mutex); // ===
pool->terminate = &term;
pthread_cond_broadcast(&pool->cond);
if (thread_in_pool) // (1) thrdPoolDestoryerThread 是否 由线程池创建. 若是, thrdPoolDestoryerThread 自行 detach, pool->nthreads--
{
/* Thread pool terminated/destroyed in a pool thread is legal. */
pthread_detach(pthread_self());
pool->nthreads--;
}
while (pool->nthreads > 0) // Note: broadcast 完了走到 135 行, 其他人 才能在 60 行 拿回锁(mutex)
pthread_cond_wait(&term, &pool->mutex); // === <=> pthread_cond_wait(pool->terminate, &pool->mutex);
pthread_mutex_unlock(&pool->mutex); // ===
if (memcmp(&pool->tid, &__zero_tid, sizeof (pthread_t)) != 0)
pthread_join(pool->tid, NULL);
}
static int __thrdpool_create_threads(size_t nthreads, thrdpool_t *pool) // 循环创建 nthreads 个线程, 若 不够 nthreads 个, 则 terminate 线程池
{
pthread_attr_t attr; // 用于 attr_init -> attr_setstacksize -> pthread_create -> attr_destroy
pthread_t tid;
int ret;
ret = pthread_attr_init(&attr);
if (ret == 0)
{
if (pool->stacksize)
pthread_attr_setstacksize(&attr, pool->stacksize);
while (pool->nthreads < nthreads) // (1)
{
ret = pthread_create(&tid, &attr, __thrdpool_routine, pool); // Note: __thrdpool_create_threads() 只能由 main thread(not pool's worker thread) 调用
if (ret == 0)
pool->nthreads++;
else
break;
}
pthread_attr_destroy(&attr);
if (pool->nthreads == nthreads) // (2)
return 0; // === below: _create_threads() fail
__thrdpool_terminate(0, pool);
}
errno = ret;
return -1;
}
thrdpool_t *thrdpool_create(size_t nthreads, size_t stacksize)
{
thrdpool_t *pool;
int ret;
pool = (thrdpool_t *)malloc(sizeof (thrdpool_t));
if (pool)
{
if (__thrdpool_init_locks(pool) >= 0)
{
ret = pthread_key_create(&pool->key, NULL);
if (ret == 0)
{
INIT_LIST_HEAD(&pool->task_queue); // (1) hookList init: invalid 头结点(pool->task_queue) pre/next 都 指向自身
pool->stacksize = stacksize;
pool->nthreads = 0;
memset(&pool->tid, 0, sizeof (pthread_t) ); // (2) tid & terminate
pool->terminate = NULL;
if (__thrdpool_create_threads(nthreads, pool) >= 0) // (3)
return pool; // === below: _create_threads() fail
pthread_key_delete(pool->key);
}
else
errno = ret;
__thrdpool_destroy_locks(pool);
}
free(pool);
}
return NULL;
}
inline void __thrdpool_schedule(const struct thrdpool_task *task, void *buf,
thrdpool_t *pool); // ====== consumer
void __thrdpool_schedule(const struct thrdpool_task *task, void *buf,
thrdpool_t *pool)
{
struct __thrdpool_task_entry *entry = (struct __thrdpool_task_entry *)buf;
entry->task = *task;
pthread_mutex_lock(&pool->mutex); // ===
list_add_tail(&entry->list, &pool->task_queue); // 尾插 task's hook 到 hookList <=> 尾插 task 到 taskList
pthread_cond_signal(&pool->cond); // 叫醒 a waiting worker thread
pthread_mutex_unlock(&pool->mutex); // ===
}
int thrdpool_schedule(const struct thrdpool_task *task, thrdpool_t *pool) // schedule/put a task into threadPool
{
void *buf = malloc(sizeof (struct __thrdpool_task_entry));
if (buf)
{
__thrdpool_schedule(task, buf, pool);
return 0;
}
return -1;
}
int thrdpool_increase(thrdpool_t *pool) // 增加1个线程到 thrdPool
{
pthread_attr_t attr;
pthread_t tid;
int ret;
ret = pthread_attr_init(&attr);
if (ret == 0)
{
if (pool->stacksize)
pthread_attr_setstacksize(&attr, pool->stacksize);
pthread_mutex_lock(&pool->mutex); // ===
ret = pthread_create(&tid, &attr, __thrdpool_routine, pool);
if (ret == 0)
pool->nthreads++;
pthread_mutex_unlock(&pool->mutex); // ===
pthread_attr_destroy(&attr);
if (ret == 0)
return 0;
}
errno = ret;
return -1;
}
inline int thrdpool_in_pool(thrdpool_t *pool) // Note: 该函数 所在 线程(thrdPoolDestoryerThread) 是否 由线程池创建
{
return pthread_getspecific(pool->key) == pool;
}
void thrdpool_destroy(void (*pending)(const struct thrdpool_task *),
thrdpool_t *pool)
{
int thread_in_pool = thrdpool_in_pool(pool);
struct __thrdpool_task_entry *entry;
struct list_head *pos, *tmp;
__thrdpool_terminate(thread_in_pool, pool); // (1) 内部会设置 pool->terminate, 并 叫醒 所有等在队列 拿任务的线程
list_for_each_safe(pos, tmp, &pool->task_queue) // (2) 逐个 取出 未被执行的 (first)task -> 从 taskList 删除 -> 通过 usrFuncPtr/pending 让 user 回调
{
entry = list_entry(pos, struct __thrdpool_task_entry, list);
list_del(pos);
if (pending)
pending(&entry->task);
free(entry);
}
pthread_key_delete(pool->key);
__thrdpool_destroy_locks(pool);
if (!thread_in_pool)
free(pool);
}
list.h
#ifndef _LINUX_LIST_H
#define _LINUX_LIST_H
/*
* Simple doubly linked list implementation.
*
* Some of the internal functions ("__xxx") are useful when
* manipulating whole lists rather than single entries, as
* sometimes we already know the next/prev entries and we can
* generate better code by using them directly rather than
* using the generic single-entry routines.
*/
struct list_head {
struct list_head *next, *prev;
};
#define LIST_HEAD_INIT(name) { &(name), &(name) }
#define LIST_HEAD(name) \
struct list_head name = LIST_HEAD_INIT(name)
static inline void INIT_LIST_HEAD(struct list_head *list)
{
list->next = list;
list->prev = list;
}
/*
* Insert a new entry between two known consecutive entries.
*
* This is only for internal list manipulation where we know
* the prev/next entries already!
*/
static inline void __list_add(struct list_head *node,
struct list_head *prev,
struct list_head *next)
{
next->prev = node;
node->next = next;
node->prev = prev;
prev->next = node;
}
/**
* list_add - add a new entry
* @new: new entry to be added
* @head: list head to add it after
*
* Insert a new entry after the specified head.
* This is good for implementing stacks.
*/
static inline void list_add(struct list_head *node, struct list_head *head)
{
__list_add(node, head, head->next);
}
/**
* list_add_tail - add a new entry
* @new: new entry to be added
* @head: list head to add it before
*
* Insert a new entry before the specified head.
* This is useful for implementing queues.
*/
static inline void list_add_tail(struct list_head *node,
struct list_head *head)
{
__list_add(node, head->prev, head);
}
/*
* Delete a list entry by making the prev/next entries
* point to each other.
*
* This is only for internal list manipulation where we know
* the prev/next entries already!
*/
static inline void __list_del(struct list_head *prev, struct list_head *next)
{
next->prev = prev;
prev->next = next;
}
/**
* list_del - deletes entry from list.
* @entry: the element to delete from the list.
* Note: list_empty on entry does not return true after this, the entry is in an undefined state.
*/
static inline void list_del(struct list_head *entry)
{
__list_del(entry->prev, entry->next);
}
/**
* list_empty - tests whether a list is empty
* @head: the list to test.
*/
static inline int list_empty(const struct list_head *head)
{
return head->next == head;
}
/**
* list_entry - get the struct for this entry
* @ptr: the &struct list_head pointer.
* @type: the type of the struct this is embedded in.
* @member: the name of the list_struct within the struct.
*/
#define list_entry(ptr, type, member) \
((type *)((char *)(ptr)-(unsigned long)(&((type *)0)->member)))
/**
* list_for_each_safe - iterate over a list safe against removal of list entry
* @pos: the &struct list_head to use as a loop counter.
* @n: another &struct list_head to use as temporary storage
* @head: the head for your list.
*/
#define list_for_each_safe(pos, n, head) \
for (pos = (head)->next, n = pos->next; pos != (head); \
pos = n, n = pos->next)
// ===== use less
/**
* list_move - delete from one list and add as another's head
* @list: the entry to move
* @head: the head that will precede our entry
*/
static inline void list_move(struct list_head *list, struct list_head *head)
{
__list_del(list->prev, list->next);
list_add(list, head);
}
/**
* list_move_tail - delete from one list and add as another's tail
* @list: the entry to move
* @head: the head that will follow our entry
*/
static inline void list_move_tail(struct list_head *list,
struct list_head *head)
{
__list_del(list->prev, list->next);
list_add_tail(list, head);
}
static inline void __list_splice(struct list_head *list,
struct list_head *head)
{
struct list_head *first = list->next;
struct list_head *last = list->prev;
struct list_head *at = head->next;
first->prev = head;
head->next = first;
last->next = at;
at->prev = last;
}
/**
* list_splice - join two lists
* @list: the new list to add.
* @head: the place to add it in the first list.
*/
static inline void list_splice(struct list_head *list, struct list_head *head)
{
if (!list_empty(list))
__list_splice(list, head);
}
/**
* list_splice_init - join two lists and reinitialise the emptied list.
* @list: the new list to add.
* @head: the place to add it in the first list.
*
* The list at @list is reinitialised
*/
static inline void list_splice_init(struct list_head *list,
struct list_head *head)
{
if (!list_empty(list)) {
__list_splice(list, head);
INIT_LIST_HEAD(list);
}
}
/**
* list_for_each - iterate over a list
* @pos: the &struct list_head to use as a loop counter.
* @head: the head for your list.
*/
#define list_for_each(pos, head) \
for (pos = (head)->next; pos != (head); pos = pos->next)
/**
* list_for_each_prev - iterate over a list backwards
* @pos: the &struct list_head to use as a loop counter.
* @head: the head for your list.
*/
#define list_for_each_prev(pos, head) \
for (pos = (head)->prev; pos != (head); pos = pos->prev)
/**
* list_for_each_entry - iterate over list of given type
* @pos: the type * to use as a loop counter.
* @head: the head for your list.
* @member: the name of the list_struct within the struct.
*/
#define list_for_each_entry(pos, head, member) \
for (pos = list_entry((head)->next, typeof (*pos), member); \
&pos->member != (head); \
pos = list_entry(pos->member.next, typeof (*pos), member))
/**
* Single-linked list. Added by Xie Han <xiehan@sogou-inc.com>.
*/
struct slist_node {
struct slist_node *next;
};
struct slist_head {
struct slist_node first, *last;
};
#define SLIST_HEAD_INIT(name) { { (struct slist_node *)0 }, &(name).first }
#define SLIST_HEAD(name) \
struct slist_head name = SLIST_HEAD_INIT(name)
static inline void INIT_SLIST_HEAD(struct slist_head *list)
{
list->first.next = (struct slist_node *)0;
list->last = &list->first;
}
static inline void slist_add_head(struct slist_node *node,
struct slist_head *list)
{
node->next = list->first.next;
list->first.next = node;
if (!node->next)
list->last = node;
}
static inline void slist_add_tail(struct slist_node *node,
struct slist_head *list)
{
node->next = (struct slist_node *)0;
list->last->next = node;
list->last = node;
}
static inline void slist_add_after(struct slist_node *node,
struct slist_node *prev,
struct slist_head *list)
{
node->next = prev->next;
prev->next = node;
if (!node->next)
list->last = node;
}
static inline void slist_del_head(struct slist_head *list)
{
list->first.next = list->first.next->next;
if (!list->first.next)
list->last = &list->first;
}
static inline void slist_del_after(struct slist_node *prev,
struct slist_head *list)
{
prev->next = prev->next->next;
if (!prev->next)
list->last = prev;
}
static inline int slist_empty(struct slist_head *list)
{
return !list->first.next;
}
static inline void __slist_splice(struct slist_head *list,
struct slist_node *at,
struct slist_head *head)
{
list->last->next = at->next;
at->next = list->first.next;
if (!list->last->next)
head->last = list->last;
}
static inline void slist_splice(struct slist_head *list,
struct slist_node *at,
struct slist_head *head)
{
if (!slist_empty(list))
__slist_splice(list, at, head);
}
static inline void slist_splice_init(struct slist_head *list,
struct slist_node *at,
struct slist_head *head)
{
if (!slist_empty(list)) {
__slist_splice(list, at, head);
INIT_SLIST_HEAD(list);
}
}
#define slist_entry(ptr, type, member) \
((type *)((char *)(ptr)-(unsigned long)(&((type *)0)->member)))
#define slist_for_each(pos, head) \
for (pos = (head)->first.next; pos; pos = pos->next)
#define slist_for_each_safe(pos, prev, head) \
for (prev = &(head)->first, pos = prev->next; pos; \
prev = prev->next == pos ? pos : prev, pos = prev->next)
#define slist_for_each_entry(pos, head, member) \
for (pos = slist_entry((head)->first.next, typeof (*pos), member); \
&pos->member != (struct slist_node *)0; \
pos = slist_entry(pos->member.next, typeof (*pos), member))
#endif
// test.cpp
void my_routine(void *context) // 我们要执行的函数
{
printf("task-%llu start.\n", reinterpret_cast<unsigned long long>(context); );
}
void my_pending(const struct thrdpool_task *task) // 线程池销毁后,没执行的任务会到这里
{
printf("pending task-%llu.\n", reinterpret_cast<unsigned long long>(task->context););
}
int main()
{
thrdpool_t *thrd_pool = thrdpool_create(3, 1024); // 创建
struct thrdpool_task task;
unsigned long long i;
for (i = 0; i < 5; i++)
{
task.routine = &my_routine;
task.context = reinterpret_cast<void *>(i);
thrdpool_schedule(&task, thrd_pool); // 调用
}
getchar(); // 卡住主线程,按回车继续
thrdpool_destroy(&my_pending, thrd_pool); // 结束
return 0;
}
网友评论