美文网首页
简单线程池实现

简单线程池实现

作者: besmallw | 来源:发表于2020-05-29 15:20 被阅读0次

    今天在公司摸鱼的时候,想起之前没有看的线程池,然后赶紧把它捡起来。
    完整代码放在我的github

    介绍

    线程池:有一堆已经创建好的线程,初始化时处于空闲态,当有新的任务进来的时候,分配一个线程去处理任务。然任务处理完之后,线程放回到线程池中。当线程池中的线程都在忙碌,此时新的任务将放到任务队列中等待空闲线程。

    为什么要有线程池:不能一直开关线程,这样会给系统产生大量的消耗。线程是可重用的资源,不需要每次使用时都进行初始化,因为可以采用有限的线程处理无限的任务。

    线程池实现

    1.封装条件变量

    为什么要使用条件变量:死锁
    死锁:
    int n = 0;
    消费者A:等待n > 0
    生产者B:更改n, 使得n > 0

    1.当消费者A进入临界区时,其他线程不能进入临界区,意味着生产者B没有机会去修改n,n的值一直为0,不满足消费者A继续执行的条件(n > 0),消费者A只能一直等待。
    2.消费者进程A拿到互斥锁 --> 进入临界区 --> 发现共享资源 n 不满足继续执行的条件(n > 0) --> 等待 n > 0
    3.消费者进程A占有互斥锁 --> 生产者进程B无法进入临界区 --> 无法修改 n 的值 --> 生产者B等待消费者A释放互斥锁
    解决死锁的方案就是采用条件变量。

    下面有详细的注释

    #ifndef _CONDITION_H_
    #define _CONDITION_H_
    
    // 线程池:有一堆已经创建好的线程,初始状态是都处于空闲状态,当有新的任务进来,从线程池中取出一个空闲的线程处理任务,然后当任务处理完之后,该线程会放回到线程池中
    // 当线程池中的线程都在处理任务,没有空闲的线程。若有新的任务产生,只能等待线程池中有归还的线程
    
    // 为什么要有线程池:不能一直开关线程,这样会给系统产生大量的消耗。线程是可重用的资源,不需要每次使用时都进行初始化,因为可以采用有限的线程处理无限的任务
    
    #include <pthread.h>
    
    // 封装一个互斥量和条件变量作为状态
    typedef struct condition {
        // 互斥量 锁住变量
        pthread_mutex_t pmutex; // 在操作一个共享变量时,先用锁把变量锁住,然后再操作,操作完了之后再释放掉锁
        
        /*
        死锁:
        int n = 0;
        消费者A:等待n > 0
        生产者B:更改n, 使得n > 0
        当消费者A进入临界区时,其他线程不能进入临界区,意味着生产者B没有机会去修改n,n的值一直为0,不满足消费者A继续执行的条件(n > 0),消费者A只能一直等待。
        消费者进程A拿到互斥锁 --> 进入临界区 --> 发现共享资源 n 不满足继续执行的条件(n > 0) --> 等待 n > 0
        消费者进程A占有互斥锁 --> 生产者进程B无法进入临界区 --> 无法修改 n 的值 --> 生产者B等待消费者A释放互斥锁
        解决死锁的方案就是采用条件变量。
        */
        pthread_cond_t pcond;
    } condition_t;
    
    // 对状态的操作函数
    int condition_init(condition_t *cond);
    int condition_lock(condition_t *cond);
    int condition_unlock(condition_t *cond);
    int condition_wait(condition_t *cond);
    int condition_timedwait(condition_t *cond, const struct timespec *abstime);
    int condition_signal(condition_t* cond);
    int condition_broadcast(condition_t *cond);
    int condition_destroy(condition_t *cond);
    
    #endif
    
    #include "condition.h"
    
    //初始化
    int condition_init(condition_t *cond) {
        int status;
        if((status = pthread_mutex_init(&cond->pmutex, NULL))) //初始化互斥锁
            return status;
        
        if((status = pthread_cond_init(&cond->pcond, NULL))) //初始化条件变量
            return status;
        
        return 0;
    }
    
    //加锁
    int condition_lock(condition_t *cond) {
        return pthread_mutex_lock(&cond->pmutex); //拿到互斥锁
    }
    
    //解锁
    int condition_unlock(condition_t *cond) { //释放互斥锁
        return pthread_mutex_unlock(&cond->pmutex);
    }
    
    //等待
    int condition_wait(condition_t *cond) { //等待在条件变量上
        return pthread_cond_wait(&cond->pcond, &cond->pmutex);
    }
    
    //固定时间等待
    int condition_timedwait(condition_t *cond, const struct timespec *abstime) {
        return pthread_cond_timedwait(&cond->pcond, &cond->pmutex, abstime);
    }
    
    //唤醒一个睡眠线程
    int condition_signal(condition_t* cond) {
        return pthread_cond_signal(&cond->pcond); //通知等待在条件变量上的消费者
    }
    
    //唤醒所有睡眠线程
    int condition_broadcast(condition_t *cond) {
        return pthread_cond_broadcast(&cond->pcond);
    }
    
    //释放
    int condition_destroy(condition_t *cond) {
        int status;
        if ((status = pthread_mutex_destroy(&cond->pmutex))) //销毁互斥锁
            return status;
        
        if ((status = pthread_cond_destroy(&cond->pcond))) //销毁条件变量
            return status;
            
        return 0;
    }
    

    2.任务、线程池的封装

    任务:
    run 执行的任务
    arg 任务参数
    next 值向下一个任务
    线程池:
    ready 操作时要锁住线程池
    first 任务队列的第一个任务
    last 任务队列的最后一个任务
    counter 线程池中已有的线程数
    idle 线程池中空闲线程数
    max_threads 线程池中最大线程数
    quit 是否退出的标志

    #ifndef _THREAD_POOL_H_
    #define _THREAD_POOL_H_
    
    // 线程池头文件
    #include "condition.h"
    
    // 封装线程池中的对象需要执行的任务对象
    typedef struct task {
        void *(*run)(void *);     // 需要执行的任务
        void *arg;                // 参数
        struct task *next;        // 任务队列中下一个任务
    } task_t;
    
    // 线程池结构体
    typedef struct threadpool {
        condition_t ready; // 状态量
        task_t *first;     // 任务队列中第一个任务
        task_t *last;      // 任务队列中最后一个任务
        int counter;       // 线程池中已有线程数
        int idle;          // 线程池中空闲线程数
        int max_threads;   // 线程池最大线程数
        int quit;          // 是否退出的标志
    } threadpool_t;
    
    // 线程池初始化
    void threadpool_init(threadpool_t *pool, int threads);
    
    // 往线程池中加入任务
    void threadpool_add_task(threadpool_t *pool, void *(*run)(void *arg), void *arg);
    
    // 摧毁线程池
    void threadpool_destroy(threadpool_t *pool);
    
    #endif
    

    1.线程池初始化

    // @param1 线程池 @param2 最大线程数
    void threadpool_init(threadpool_t *pool, int threads) {
        condition_init(&pool->ready); // type condition_t
        pool->first = NULL;           // 任务队列中第一个任务
        pool->last = NULL;            // 任务队列中最后一个任务
        pool->counter = 0;            // 线程池中已有线程数(最大值为pool->max_threads)
        pool->idle = 0;               // 线程池中空闲线程数
        pool->max_threads = threads;  // 线程池最大线程数
        pool->quit = 0;               // 是否退出的标志
    }
    

    2.增加一个任务到线程池

    // 增加一个任务到线程池
    // @param2 任务  @param3 任务参数
    void threadpool_add_task(threadpool_t *pool, void *(*run)(void *), void *arg) {
        // 产生一个新的任务
        task_t *newtask = (task_t *)malloc(sizeof(task_t));
        newtask->run = run;
        newtask->arg = arg;
        newtask->next = NULL;          // 新加的任务放在队列尾端
    
        condition_lock(&pool->ready);  // 线程池的状态被多个线程共享,操作前需要加锁
    
        if (pool->first == NULL) {     // 第一个任务加入
            pool->first = newtask;
        } else {
            pool->last->next = newtask;
        }
        pool->last = newtask;          // 队列尾指向新加入的线程
    
        // 线程池中有线程空闲,唤醒
        if (pool->idle > 0) {
            condition_signal(&pool->ready);
        } else if(pool->counter < pool->max_threads) { // 当前线程池中线程个数没有达到设定的最大值,创建一个新的线性
            pthread_t tid;
            pthread_create(&tid, NULL, thread_routine, pool); // 创建一个线程
            pool->counter++;
        }
        //结束,访问
        condition_unlock(&pool->ready);
    }
    
    3.重点:创建的线程

    每次操作线程池之前必须加锁!!
    while (pool->first == NULL && !pool->quit)这个是用来判断超时的
    然后在任务队列中取出第一个任务,执行任务处理函数。(由于执行任务可能耗时较多,这里需要释放锁)

    // 创建的线程执行
    void *thread_routine(void *arg) {
        struct timespec abstime;
        int timeout;
        printf("thread %d is starting\n", (int)pthread_self());
        threadpool_t *pool = (threadpool_t *)arg;
        while (1) {
            timeout = 0;
            condition_lock(&pool->ready);                                 // 访问线程池之前需要加锁
            pool->idle++; //空闲
            while (pool->first == NULL && !pool->quit) {                  // 等待队列有任务到来 或者 收到线程池销毁通知,否则线程阻塞等待
                printf("thread %d is waiting\n", (int)pthread_self());
                clock_gettime(CLOCK_REALTIME, &abstime);                  // 获取从当前时间,并加上等待时间,设置进程的超时睡眠时间
                abstime.tv_sec += 2;                                      // 2秒超时
                int status = condition_timedwait(&pool->ready, &abstime); // 该函数会解锁,允许其他线程访问,当被唤醒时,加锁
                if (status == ETIMEDOUT) {
                    printf("thread %d wait timed out\n", (int)pthread_self());
                    timeout = 1;
                    break;
                }
            }
    
            pool->idle--;
            if (pool->first != NULL) {
                task_t *t = pool->first;
                pool->first = t->next;          // 取出等待队列最前的任务,移除任务,并执行任务
                condition_unlock(&pool->ready); // 由于任务执行需要消耗时间,先解锁让其他线程访问线程池
                t->run(t->arg);                 // 执行任务
                free(t);                        // 执行完任务释放内存
                condition_lock(&pool->ready);   // 重新加锁
            }
    
            // 退出线程池 执行threadpool_destroy才会到这里
            if (pool->quit && pool->first == NULL) {
                pool->counter--;                // 当前工作的线程数-1
                if (pool->counter == 0) {       // 若线程池中没有线程,通知等待线程(主线程)全部任务已经完成
                    condition_signal(&pool->ready);
                }
                condition_unlock(&pool->ready);
                break;
            }
    
            // 超时,跳出销毁线程 只有当condition_timedwait超时时,才会到这里
            if (timeout == 1) {
                pool->counter--;                // 当前工作的线程数-1
                condition_unlock(&pool->ready);
                break;
            }
    
            condition_unlock(&pool->ready);
        }
        
        printf("thread %d is exiting\n", (int)pthread_self());
        return NULL;
    }
    

    4.线程池销毁

    // 线程池销毁
    void threadpool_destroy(threadpool_t *pool) {
        if (pool->quit) return;        // 如果已经调用销毁,直接返回
    
        condition_lock(&pool->ready);  // 加锁
        pool->quit = 1;                // 设置销毁标记为1
        if (pool->counter > 0) {       // 线程池中工作线程个数大于0
            if (pool->idle > 0) {      // 对于等待的线程(空闲),发送信号唤醒
                condition_broadcast(&pool->ready);
            }
            while (pool->counter) {    // 正在执行任务的线程,等待他们结束任务
                condition_wait(&pool->ready);
            }
        }
        condition_unlock(&pool->ready);
        condition_destroy(&pool->ready);
    }
    

    代码转载于https://www.cnblogs.com/yangang92/p/5485868.html


    2020.5.29 15:17 深圳

    相关文章

      网友评论

          本文标题:简单线程池实现

          本文链接:https://www.haomeiwen.com/subject/umbumhtx.html