美文网首页
threadPool of sogou's workflow -

threadPool of sogou's workflow -

作者: my_passion | 来源:发表于2022-05-24 23:05 被阅读0次
    参考 https://zhuanlan.zhihu.com/p/503733481
    
    style: return value >= 0 表示 success
    
    1. 三大特点 
        
    (1) 线程池 running 时, 无需记录任何 线程 id 或对象, 线程池 terminating 时, 线程池可以通过 一个等一个的方式 优雅地去结束所有线程
        即 每个线程都是对等的
    
        例: 发起 destroy 的人 若为 mainThread, 如何做到一个等一个退出
    
            线程(池)的退出, 由 thrdpool_destroy() 设置 pool->terminate 开始
            
            每个线程, 在 while(1) 里 会第一时间发现 terminate (!= NUll), 线程池要退出 了, 然后 break 出 while 循环
            
                注意: 此时, 还 持有 mutex, 拿到 pool 上 唯一的 tid, 放到 自己的 临时变量, 据 tid 是否 == 0 决定是否 join preWorkerThread  
                
                    把 自己的 tid == pthread_self() 放到 pool 上(以使 preThread join 他), 再解锁 mutex   
                
            => 
            1] 第1个从 pool 上 拿 tid 的 人, 会发现是 0 值, 可直接结束
                不用负责 等待 任何其他人, 但我 在完全结束前 需要 有人负责等待我的结束, 所以我会把我的 tid 放上去
                
            2] 而如果发现自己从 pool里拿到的 tid 不是 0 值, 说明 我要负责 join 上一个人, 并把我的 tid 放上去,让下一个人负责我
            
            3] 最后 那个人(workerThread), 是发现 pool->nthreads 为 0 的 人, 
                他通过 terminate(condition) 通知 发起 destroy 的人
            
            4] 最后 thrdPoolDestoryerThread 退出
            
            非常优雅的做法!!!
    
    (2) a thrdPoolTask(消费者) 可以 调 another thrdPoolTask; 
        线程池 destorying 时, 线程池中的 task 也可以通过 调 thrdpool_schedule() 来 提交 nextTask
            这很重要, 因为线程本身很可能不知道线程池的状态
            
        即, 每个任务是对等的
        
            another thrdPoolTask / the next thrdPoolTask 是 ( thrdPoolTask routine 中 调)
                [1] thrdpool_schedule()     --- 生产者 
                [2] __thrdpool_terminate()  --- thrdPoolTerminateThread
                [3] thrdpool_destroy()      --- thrdPoolDestoryerThread
        
        只要 taskList 管理得好, 消费者(consumer) 所执行的函数 也可以做 生产者(producer)
        
        执行 routine 的线程, 都是 消费者
        发起 schedule 的线程, 都是 生产者
        
    (3) 线程任务 可销毁 线程池
    
        即 每一种行为是对等的, 包括 destroy
        
    2. 线程池中 任务入口: 
        [1] hook: 双向链表 的 node, 无 data 成员, 只有 pre/next 的 link 成员, 以 形成 hook list
        [2] task 靠 关联的 hook 被 挂起来, 形成 task list 
                prototype & context 相同
                
    struct __thrdpool_task_entry
    {
        struct list_head list;     // task's hook(钩子): 将 task 挂起来, hookList - - - 固定 offset - - - > taskList
        struct thrdpool_task task; // task 
    };
    
    3. 线程池
    struct __thrdpool 
    {
        struct list_head task_queue; // 任务队列 关联的 hookList 的 headNode(invalid, there is no associated task)   
        size_t nthreads;             // 线程数
        size_t stacksize;            // 线程栈 size 
        pthread_t tid;               // pool 运行 时, pool 上记录的 tid 是 zero 值 
        pthread_mutex_t mutex;        
        pthread_cond_t cond;
        pthread_key_t key;
        pthread_cond_t *terminate;
    };
    
    没有一个多余, 每个成员都很到位
    
    task_queue
        1) init
        
            thrdpool_create() 中 
                INIT_LIST_HEAD(&pool->task_queue);
                    hookList init: invalid 头结点(pool->task_queue) pre/next 都 指向自身
            task_queue 无 associated task
                => task_queue 含义: hookList 的 invalid 头结点 
        
        2) 尾插 task 到 taskList: 变 link 域 (pre/next)
        
            __thrdpool_schedule() 中 
                list_add_tail(&entry->list, &pool->task_queue); 
                    尾插 task's hook 到 hookList <=> 尾插 task 到 taskList 
                    尾插: 插到 hookList 的 headPre == tail 与 head 之间 
            
        3) 取出 firstTask -> delete it from tastList -> 先解锁 -> 再 执行 firstTask 
            
            取  firstValidHook(==) 的 ptr 
            -> 据 firstValidHook(struct mem) 的 ptr(mem ptr/address 更大) 
                与其 所在 taskEntry(struct) 地址 的 offset 
                    求 其 taskEntry 的 addr 
            -> delete firstValidHook from hookList <=> delete firstTask from taskList 
            -> ...
            
            __thrdpool_routine()
            {
                // ...
                headNext = &pool->task_queue.next;
                // ...
                entry = list_entry(*headNext, struct __thrdpool_task_entry, list);
                list_del(*headNext);
                pthread_mutex_unlock(&pool->mutex); 
                
                task_routine = entry->task.routine;
                task_context = entry->task.context;
                free(entry);
                task_routine(task_context);
            }
            
        4) 逐个 取出 (first)task -> 从 taskList 删除 -> 通过 usrFuncPtr 让 user 回调
        
            thrdpool_destroy()
            {
                // ...
                list_for_each_safe(pos, tmp, &pool->task_queue)
                    entry = list_entry(pos, struct __thrdpool_task_entry, list);
                    list_del(pos);
                    if (pending)
                        pending(&entry->task);
            }
        
    nthreads
        = 0
            thrdpool_create() 中 
        
        ++ 
            1] __thrdpool_create_threads() 中 pthread_create() 后 
            2] thrdpool_increase()         中 pthread_create() 后 
            
        --
            1] __thrdpool_terminate() 中 in_pool == 1 时 
            
            2] __thrdpool_routine() 中 pool->terminate 被置为 cond 时
    
    tid
    
        线程 id, 线程池 同时只记录 1个 tid
            线程池 运行时, 不会(没必要)记录 current 正在运行的 worker thread 的 tid => tid 是 空/无效值 0
            线程池 退出时, tid 用来实现 `链式等待`
        
        init: 0 值 
            memset(&pool->tid, 0, sizeof (pthread_t));
            
        use
            /* 
            1) extract old pool->tid: 把 线程池 上记录的 tid(上一个人的 tid) 拿下来, 
                    我(current work thread) 来 负责 上一个人 (pre worker thread)
            2) update new pool->tid: 把我自己的 tid 记录到 线程池 上, 下一个人来 负责 我
            3) 每个人都减 1, 最后一个人(不算 发起 destory) 负责叫醒 发起 destroy 的 人 (main 线程)
            4) unlock 
            5) 只有第1个人 拿到的 tid == 0, 不 join other worker thread (there is no other worker thread)
            6) 只要不是 0 值, 我就要负责 等上一个人结束 才能退:
            */
            __thrdpool_routine() 
            {
                // ...
                tid = pool->tid;            
                pool->tid = pthread_self(); 
                if (--pool->nthreads == 0)   
                    pthread_cond_signal(pool->terminate);
    
                pthread_mutex_unlock(&pool->mutex); // === 
                if (memcmp(&tid, &__zero_tid, sizeof (pthread_t)) != 0)
                    pthread_join(tid, NULL); 
                    
                // ...
            }       
            
            /* __thrdpool_terminate() 
             (1) 两种 case 下被调用 
                [1] __thrdpool_create_threads() 中 创建 n 个线程失败 时, 被调用 
                [2] thrdpool_destroy() 中 被调用
            
             (2) 与 __thrdpool_routine() 中 比较 pool->tid 与 0, 然后 join preWorkerThread
                    [1] terminateThread/thrdPoolDestoryerThread 等 workerThreadN, 自己 不必被等待
                    [2] workerThread1 不必等待
                    [3] workerThread2-N 等待 preWorkerThread
            */
            __thrdpool_terminate()
            {
                // ...
                while (pool->nthreads > 0)
                pthread_cond_wait(&term, &pool->mutex); // === 
                // <=> pthread_cond_wait(pool->terminate, &pool->mutex);
    
                pthread_mutex_unlock(&pool->mutex);         // ===
                if (memcmp(&pool->tid, &__zero_tid, sizeof (pthread_t)) != 0)
                    pthread_join(pool->tid, NULL); 
            }
            
        => pool->tid 含义: 
            
            workerThread1   workerThread2 ...   workerThreadN   thrdPoolDestoryerThread(main / workerThreadNPlus1)
            
            terminate 线程池 时, 池中各线程 & thrdPoolDestoryerThread 逐个 链式等待(join) 前1个 结束
            
            对 当前 workerThread1 & thrdPoolDestoryerThread 来说 preWorkerThread 的 identifer
                对 workerThread1 来说, preWorkerThread 不存在 => pool->tid = 0 表示 无效值 
                对 等待 workerThreadN 的 thrdPoolDestoryerThread 来说, preWorkerThread 是 workerThreadN
                
        Note
            thrdpool_increase() 与 __thrdpool_create_threads() 中 pthread_create(&tid,...) 的 tid 只在 两个函数中用 
    
    cond
        配合 mutex, 线程间同步
            cond 用来 给 生产者和消费者 去操作 taskList 用的
            
        Note: 都在 hold mutex 时进行
        
        1) init 
            __thrdpool_init_locks(thrdpool_t *pool)
            {
                ...
                ret = pthread_cond_init(&pool->cond, NULL);
                ...
            }
        
        2) destory 
            __thrdpool_destroy_locks(thrdpool_t *pool)
            {
                ...
                pthread_cond_destroy(&pool->cond);
            }
        
        3) threadPool 运行时, worker thread waiting on cond 
            等待着 
                1] a task 被 schedule(调度) 进 threadPool, 或 
                2] 收到 结束线程池 的 broadcast(广播)
        
            __thrdpool_routine(void *arg)
            {
                while (1)
                {
                    pthread_mutex_lock(&pool->mutex);   // ===
                    while (!pool->terminate && list_empty(&pool->task_queue))
                        pthread_cond_wait(&pool->cond, &pool->mutex);
                        
                    ...
                }
            }
            
        4) 调度 1个 task 时, signal 
        
            void __thrdpool_schedule(const struct thrdpool_task *task, void *buf,
                                     thrdpool_t *pool)
            {
                struct __thrdpool_task_entry *entry = (struct __thrdpool_task_entry *)buf;
                // 1)
                entry->task = *task;
                
                pthread_mutex_lock(&pool->mutex);   // ===
                
                // 2) 尾插 task 到 taskList 
                list_add_tail(&entry->list, &pool->task_queue); 
                
                // 3) 叫醒 a waiting worker thread
                pthread_cond_signal(&pool->cond);               
                
                pthread_mutex_unlock(&pool->mutex); // ===
            }
            
        5) 结束 线程池 时, broadcast
        
            __thrdpool_terminate(int thread_in_pool, thrdpool_t *pool)
            {
                pthread_cond_t term = PTHREAD_COND_INITIALIZER;
                pthread_mutex_lock(&pool->mutex);    // === 
                
                // (1) terminate 置 non-NULL
                pool->terminate = &term;
                
                // (2)
                pthread_cond_broadcast(&pool->cond);
            }
            
    key
        线程池 的 key
        赋予 每个由 线程池创建的线程 用于 thread local
        用于 区分 destoryer 线程 是否由 线程池创建 <=> 是否为 consumerThread/workerThread 
        
        1) key_create
        
            thrdpool_create(size_t nthreads, size_t stacksize)
            {
                ...
                ret = pthread_key_create(&pool->key, NULL);
                ...
            }
            
        2) key_delete   
        
            thrdpool_create(size_t nthreads, size_t stacksize)
            {
                ...
                if (__thrdpool_create_threads(nthreads, pool) >= 0)
                        return pool;
                    
                // === below: __thrdpool_create_threads(nthreads, pool) fail 时
                pthread_key_delete(pool->key);
                ...
            }
            
            thrdpool_destroy(void (*pending)(const struct thrdpool_task *),
                          thrdpool_t *pool)
            {
                ...
                pthread_key_delete(pool->key);
            }
            
        3) setspecific
        
            // 由线程池创建的线程, 用 key 关联 thread local(TSD 线程特定数据): 均设为 current 线程池结构指针 pool
            // => outside thread: pool->key 关联的不是 pool
            __thrdpool_routine(void *arg)
            {
                ...
                pthread_setspecific(pool->key, pool);
            }
        
        4) getspecific
        
            // 该函数 所在 线程(thrdPoolDestoryerThread) 是否 由线程池创建
            inline int thrdpool_in_pool(thrdpool_t *pool)
            {
                return pthread_getspecific(pool->key) == pool;
            }
            
            void thrdpool_destroy(void (*pending)(const struct thrdpool_task *),
                          thrdpool_t *pool)
            {
                // destoryerThread 是否由 线程池创建 <=> 是否为 consumerThread/workerThread
                int thread_in_pool = thrdpool_in_pool(pool);
                ...
            }
        
    terminate
        
        2个用途
        1] 线程池 terminate/destory 时的 标记位 
        2] 销毁线程池 的 人(thrdPoolDestoryerThread) 要等待的 condition
            
        1) 线程池创建时, 置 NULL, 标志 线程池 处于 运行中(running)
        
            thrdpool_create(size_t nthreads, size_t stacksize)
            {
                ...
                pool->terminate = NULL;
                ...
            }
    
        2) 线程池 结束时, 置 non-Null / &term(Conditon), 
            1] 标志 线程池处于 terminating 中
            2] 同时 启用 terminate 作为 链式等待 workerThread 结束 的 CV/Condition 的 作用
            
            __thrdpool_routine(void *arg)
            {
                while (1)
                {
                    // (1) 从 任务队列 取出 1个任务, 没有就等待
                    pthread_mutex_lock(&pool->mutex);   // ===
                    while (!pool->terminate && list_empty(&pool->task_queue)) 
                        pthread_cond_wait(&pool->cond, &pool->mutex); // ===
    
                    if (pool->terminate)
                        break;
                    }
                    ...
                }
            }
            
            __thrdpool_terminate(int thread_in_pool, thrdpool_t *pool)
            {
                pthread_cond_t term = PTHREAD_COND_INITIALIZER;
                pthread_mutex_lock(&pool->mutex);           // ===
                
                // (1) 标志 线程池 terminate
                pool->terminate = &term;
                ...
                
                // (2) terminate 线程池 时, 池中各线程 & thrdPoolDestoryerThread 逐个 链式等待(join) 前1个 结束 
                while (pool->nthreads > 0)
                    pthread_cond_wait(&term, &pool->mutex); // === 
                    // <=> pthread_cond_wait(pool->terminate, &pool->mutex);
                ...
            }
        
        3) signal 
        
            __thrdpool_routine(void *arg)
            {
                ...
                if (--pool->nthreads == 0)  // 3) 每个人都减 1, 最后一个人(不算 发起 destory) 负责叫醒 发起 destroy 的 人
                    pthread_cond_signal(pool->terminate);
                ...
            
            }
                
    4. 问题?
    
        若 __thrdpool_terminate() 由线程池中 某线程 的 routine() 发起, 则 该线程 等完线程池中 thePreLastWorkerThread 完成, 
            自己却自行 detach, 那谁等 该线程呢?
            因为 这种情形, thrdPoolTerminateThread 或 thrdPoolDestoryerThread 不再是 mainThread,而可能 mainThread 已经终止, 
            那 所有 subThread(线程池中各线程) 就 直接蒸发了
    
    

    相关文章

      网友评论

          本文标题:threadPool of sogou's workflow -

          本文链接:https://www.haomeiwen.com/subject/kpnlprtx.html