美文网首页
Linux 线程池,执行任务队列(生产者消费者模型)

Linux 线程池,执行任务队列(生产者消费者模型)

作者: FakeCSer爱去网吧 | 来源:发表于2020-04-17 16:09 被阅读0次

    两个结构体

    • 线程池

      线程池结构体包含任务队列线程数组两个结构体,和一些锁,条件变量,长度等成员
    • 任务

      任务队列是一个链表结构,链表中每个任务是一个函数指针做任务句柄
      任务队列是线程和主程序所共享的共享资源,所以要考虑互斥的对任务队列中入队和出对进行操作。主程序向任务队列中加任务,线程从任务队列中取任务,考虑用条件变量来模拟操作系统中的生产者消费者模型,即pv操作

    四个函数

    • void pool_init(int max_thread_num)


      初始化线程池,对线程池各成员进行初始化。根据传入参数初始化n个线程;

    • int pool_add_worker(void(process)(void *arg),void *arg)


      此函数是main函数向任务队列中加任务,相当于pv操作的v操作。此时用到链表的知识,新建一个链表项,链表项中传入任务要完成的函数句柄,之后将链表项插入到队尾。注意互斥访问任务队列。
    • void *thread_routine(void *arg)
      函数太长截屏不下来。参照下面完整代码
      这是线程内部运行过程,即pthread_create时传入的函数句柄。此线程相当于消费者,从任务队列中取任务并执行。逻辑判断参照下面的补充部分。

    • int pool_destory()



      线程池销毁以及资源回收。shutdown 置1,;broadcast唤醒所有等待线程使其exit退出;join掉所有的线程;free回收为线程malloc的堆空间,线程数组,free一次就行;free掉当前任务队列中所有的任务,因为链表结构,每一个链表项都要free掉;销毁互斥锁和条件变量,最后free掉pool结构体。

    补充

    线程在运行时对当前任务队列长度当前线程池状态(shutdown)的判断是很重要的(见thread_routine代码)

    • 没有任务+线程池开 : wait挂起等待生产者(main)加任务,之后进入临界区
    • 没有任务+线程池关 : wait时被pool_destory函数将shutdown置1并且(broadcast)唤醒所有挂起等待的线程,此时线程再次循环发现shutdown为1会exit退出。
    • 有任务+线程池开 : 向下进入临界区


    个人收获

    • 复习生产者消费者模型
    • 复习链表用法
    • 复习内存操作:malloc的数组free一次,链表每个链表项要分别分配空间和回收,回收后指针及时赋空防止野指针

    完整代码

    #include <pthread.h>
    #include <iostream>
    using namespace std;
    #include <assert.h>
    #include <unistd.h>
    #include <sys/types.h>
    
    typedef struct worker
    {
        void *(*process) (void *arg);//任务句柄,函数指针
        void *arg;//函数参数
        struct worker * next;//链表指向下一个任务
    }CThread_worker;
    
    
    typedef struct threadpool
    {
        pthread_mutex_t queue_lock;
        pthread_cond_t queue_ready;
        
        CThread_worker *queue_head;//任务队列,链表结构
        
        pthread_t *threadid;//线程池中的线程,数组结构
        
        int max_thread_num;//线程池中线程数
        int cur_queue_size;//当前任务队列中任务数
        int shutdown;//线程池是否启用
    }CThread_pool;
    
    int pool_add_worker(void*(*process)(void *arg),void *arg);
    void *thread_routine(void *arg);
    
    static CThread_pool *pool = NULL;
    
    void pool_init(int max_thread_num)
    {
        pool = (CThread_pool*)malloc(sizeof(CThread_pool));
    
        pthread_mutex_init(&(pool->queue_lock),NULL);
        pthread_cond_init(&(pool->queue_ready),NULL);
    
        pool->queue_head = NULL;
        
        pool->max_thread_num = max_thread_num;
        pool->cur_queue_size = 0;
    
        pool->shutdown = 0;//running
        
        pool->threadid = (pthread_t*)malloc(max_thread_num*sizeof(pthread_t));
        int i = 0;
        for(i = 0;i<max_thread_num;i++)
        {
            pthread_create(&(pool->threadid[i]),NULL,thread_routine,NULL);
        }
    }
    
    int pool_add_worker(void*(*process)(void *arg),void *arg)
    {//chain add 1,v
        CThread_worker * newworker = (CThread_worker*)malloc(sizeof(CThread_worker));
        
        newworker->process = process;
        newworker->arg = arg;
        newworker->next = NULL;
    
        pthread_mutex_lock(&(pool->queue_lock));
    
        CThread_worker *member = pool->queue_head;
        if(member != NULL)
        {
            while(member->next!=NULL)
                member = member->next;
            member->next = newworker;
        }   
        else
        {
            pool->queue_head = newworker;
        }
        
        assert(pool->queue_head);
        
        pool->cur_queue_size++;
        pthread_mutex_unlock(&(pool->queue_lock));
        pthread_cond_signal(&(pool->queue_ready));//notice thread
        return 0;
    }
    
    int pool_destory()
    {//Recycling resources
        if(pool->shutdown)//avoid multy-shutdown
        {   return -1;}
        pool->shutdown = 1;
        
        pthread_cond_broadcast(&(pool->queue_ready));
        
        //join threads
        int i;
        for(i=0;i<pool->max_thread_num;i++)
        {
            pthread_join(pool->threadid[i],NULL);
        }   
        free(pool->threadid);//malloc
    
        //destory queue_worker
        CThread_worker *head = NULL;
        while(pool->queue_head!=NULL);
        {
            head = pool->queue_head;
            pool->queue_head = pool->queue_head->next;
            free(head);
        }
        
        //destory mutex& cond
        pthread_mutex_destroy(&(pool->queue_lock));
        pthread_cond_destroy(&(pool->queue_ready));
    
        free(pool);
        pool = NULL;
        
        return 0;
    }   
    
    void *thread_routine(void *arg)
    {//p
        cout << "thread" << pthread_self() << "is running!" << endl;
     
        while(1)
        {
            pthread_mutex_lock(&(pool->queue_lock));
            
            while(pool->cur_queue_size==0&&pool->shutdown==0)
            {
                cout << "thread" << pthread_self() << "is waiting" << endl;
                pthread_cond_wait(&(pool->queue_ready),&(pool->queue_lock));
            }
            
            if(pool->shutdown == 1)
            {
                pthread_mutex_unlock(&(pool->queue_lock));
                cout << "thread" << pthread_self() << "will eixt" << endl;
                pthread_exit(NULL);
            }
    //************Critical area
    
            cout << "thread" << pthread_self() << "is going to work" << endl;
            
            assert(pool->cur_queue_size!=0);
            assert(pool->queue_head!=NULL);
    
            //queue pop
            pool->cur_queue_size--;
            CThread_worker * worker = pool->queue_head;
            pool->queue_head = worker->next;
        
    //*************Critical area
    
    //put out source(worker) to operate
            pthread_mutex_unlock(&(pool->queue_lock));
        
            (*(worker->process))(worker->arg);
            free(worker);
            worker = NULL;
    
    
        }
        pthread_exit(NULL);
    }
    
    void *myprocess(void *arg)
    {
        cout << "thread" << pthread_self() << "is working on task " << *(int *)arg << endl;
        sleep(1);
        return NULL;
    }
    
    
    int main()
    {
    
        pool_init(3);
        
        int *workingnum = (int *)malloc(sizeof(int)*10);
        int i;
        for(i=0;i<10;i++)
        {
            workingnum[i] = i;
            pool_add_worker(myprocess,&workingnum[i]);
        }
    
        sleep(5);
        
        pool_destory();
        free(workingnum);
    
        return 0;
    }
    
    

    相关文章

      网友评论

          本文标题:Linux 线程池,执行任务队列(生产者消费者模型)

          本文链接:https://www.haomeiwen.com/subject/wvjevhtx.html