美文网首页
并发编程

并发编程

作者: 漫游之光 | 来源:发表于2018-10-09 09:28 被阅读0次

    并发在下列情况中是很有用的:

    • 访问慢速I/O设备。
    • 与人交互。
    • 通过推迟工作以降低延迟。
    • 服务多个网络客户端。
    • 在多核机器上进行并行计算。

    现代操作系统提供了三种基本的构造并发程序的方法:

    • 进程。
    • I/O多路复用。
    • 线程。

    基于进程的并发编程

    构建并发程序最简单的方法就是进程,进程有独立的地址空间,不用担心一个虚拟地址覆盖另一个进程的虚拟地址,但会导致进程间通信比较麻烦。此外,基于进程的并发往往比较慢,因为进程控制和IPC的开销很高。下面是基于进程的echo服务器版本:

    #include<stdio.h>
    #include<string.h>
    #include<sys/socket.h>
    #include<arpa/inet.h>
    #include<unistd.h>
    #include<signal.h>
    #include<sys/types.h>
    #include<sys/wait.h>
    
    #define SERVER_PORT 9090
    #define BUFFER_SIZE 1024
    
    void sigchild_handler(int sig){
        while(waitpid(0,NULL,WNOHANG) > 0)
            ;
    }
    
    int main(int argc, char const *argv[])
    {
        struct sockaddr_in server,client;
        int sfd,cfd;
        socklen_t clen;
        int n;
        char buf[BUFFER_SIZE];
        pid_t pid;
    
        /* register the signal handler,child process have the same handler,
         * but only parent process receive the signal */
        struct sigaction newact;
        newact.sa_handler = sigchild_handler;
        sigemptyset(&newact.sa_mask);
        newact.sa_flags = 0;
        sigaction(SIGCHLD,&newact,NULL);    
    
        /* initial socket */
        sfd = socket(AF_INET,SOCK_STREAM,0);
        memset(&server,0,sizeof(server));
        server.sin_family = AF_INET;
        server.sin_addr.s_addr = htonl(INADDR_ANY);
        server.sin_port = htons(SERVER_PORT);
    
        bind(sfd,(struct sockaddr *)&server,sizeof(server));
        listen(sfd,20);
    
        while(1){
            clen = sizeof(client);
            cfd = accept(sfd,(struct sockaddr *)&client,&clen);
            pid = fork();
            if(pid > 0){ /* parent process */
                close(cfd);
            }else if( pid == 0 ){
                while( (n = read(cfd,buf,BUFFER_SIZE)) > 0){
                    write(STDOUT_FILENO,buf,n);
                    write(cfd,buf,n);
                }
                close(cfd);
                return 0; /* child process quit */
            }
        }
        close(sfd);
        return 0;
    }
    

    基于I/O多路复用的并发编程

    在我看来,I/O多路复用就类似于餐馆的服务员。服务员负责监听一些桌子,如果桌子有客人来了,需要给客人菜单;如果某张桌子的菜好了,需要给他们上菜;如果客人吃完了,需要收拾桌子;如果没有事件,那么服务员处于等待的状态,等待某张桌子中上述事件中的一个的发生。很明显,这是并发了,因为通常都是一个服务员,为多张桌子服务。I/O多路复用的思想和这个类似。

    I/O多路复用可以用作并发事件驱动程序的基础,在事件驱动程序中,某些事件导致流向前推进。一般的思路是将逻辑流模型化为状态机。基于I/O多路的事件驱动编程的优点是,它比基于线程和进程的设计给了程序员更多的对程序行为的控制,并且,它有明显的性能优势,现代高性能服务器,如Node.js,nginx都使用了这种编程方式。事件驱动设计的明显的缺点就是编码复杂。就如服务员这个比喻,服务员很多情况下无法把一件事做完再去做下一件事,例如如果客人点菜很慢,站在那里等是很浪费资源的。所以,基于这种模式的编程每次执行的代码应该是很短的。通常需要把一个事件分成若干部分,所以I/O多路复用的编码比较复杂。

    I/O多路复用的基本函数是select函数,它要求内核挂起进程,只有一个或多个I/O事件发生后,才将控制返回给应用程序。select及其相关函数的原型如下:

    /* 返回已准备好的描述符的非零的个数,若出错则返回-1 */
    int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
    /* 删除set中的fd */
    void FD_CLR(int fd, fd_set *set);
    /* 判断fd是否在set中 */
    int  FD_ISSET(int fd, fd_set *set);
    /* 将fd添加到set中 */
    void FD_SET(int fd, fd_set *set);
    /* 将set清零 */
    void FD_ZERO(fd_set *set);
    

    下面使用select函数对echo服务器进行改写。

    #include<stdio.h>
    #include<string.h>
    #include<sys/socket.h>
    #include<arpa/inet.h>
    #include<unistd.h>
    #include<sys/select.h>
    
    #define SERVER_PORT 9090
    #define BUFFER_SIZE 1024
    
    int main(int argc, char const *argv[])
    {
        struct sockaddr_in server,client;
        socklen_t clen;
        int sfd;
        char buf[BUFFER_SIZE];
        int n;
    
        /* initial socket */
        sfd = socket(AF_INET,SOCK_STREAM,0);
        memset(&server,0,sizeof(server));
        server.sin_family = AF_INET;
        server.sin_addr.s_addr = htonl(INADDR_ANY);
        server.sin_port = htons(SERVER_PORT);
        bind(sfd,(struct sockaddr *)&server,sizeof(server));
        listen(sfd,20);
    
        /* init fd_set */
        int cfd[FD_SETSIZE];
        int maxi = 0;
        int maxfd = sfd;
        int i;
        for(i = 0;i<FD_SETSIZE;i++){
            cfd[i] = -1;
        }
        fd_set allset,tempset;
        FD_ZERO(&allset);
        FD_SET(sfd,&allset);
        int nready;
    
        while(1){
            tempset = allset;
            nready = select(maxfd+1,&tempset,NULL,NULL,NULL);
            if( FD_ISSET(sfd,&tempset) ){
                 clen = sizeof(client);
                int ret = accept(sfd,(struct sockaddr *)&client,&clen);
                for(i = 0;i<FD_SETSIZE;i++){
                    if(cfd[i] == -1){
                        cfd[i] = ret;
                        break;
                    }
                }
                if(i != FD_SETSIZE){
                    if(ret > maxfd){ /* listen to all file descriptors */
                        maxfd = ret;
                    }
                    if( i > maxi){
                        maxi = i;
                    }
                    FD_SET(ret,&allset);
                }
                if(--nready == 0){
                    continue;
                }
            }
            for(i=0;i<=maxi;i++){
                if( FD_ISSET(cfd[i],&tempset) ){
                    if( (n = read(cfd[i],buf,BUFFER_SIZE)) >0 ){
                        write(STDOUT_FILENO,buf,n);
                        write(cfd[i],buf,n);
                    }else{
                        close(cfd[i]);
                        FD_CLR(cfd[i],&allset);
                        cfd[i] = -1;
                    }
                    if(--nready == 0){
                        break;
                    }
                }
            }
        }
    
        close(sfd);
        return 0;
    }
    

    基于线程的并发编程

    基于线程的逻辑流结合了基于进程和基于I/O多路复用的流的特性。同进程一样,线程由内核自动调度,并且内核通过一个整数ID来识别线程。同基于I/O多路复用的流一样,多个线程运行在单一进程的上下文中,因此共享这个进程虚拟地址空间的所有内容,包括它的代码、数据、堆、共享库和打开的文件。下面基于线程对echo服务器进行改写:

    #include<stdio.h>
    #include<string.h>
    #include<netinet/in.h>
    #include<arpa/inet.h>
    #include<unistd.h>
    #include<pthread.h>
    
    #define SERVER_PORT 9090
    #define BUFFER_SIZE 1024
    #define MAX_CLIENTS 1024
    
    struct socket_info{
        struct sockaddr_in client;
        int cfd;
    };
    
    struct socket_info clients[MAX_CLIENTS];
    
    void *echo(void *arg){
        struct socket_info *si = (struct socket_info *)arg;
        char buf[BUFFER_SIZE];
        int n;
        pthread_detach(pthread_self());
        while( (n = read(si->cfd,buf,BUFFER_SIZE)) >0){
            write(STDOUT_FILENO,buf,n);
            write(si->cfd,buf,n);
        }
        close(si->cfd);
        si->cfd = -1;
        return NULL;
    }
    
    int main(int argc, char const *argv[])
    {
        struct sockaddr_in server,client;
        socklen_t clen;
        int sfd,cfd;
    
        /* initial socket */
        sfd = socket(AF_INET,SOCK_STREAM,0);
        memset(&server,0,sizeof(server));
        server.sin_family = AF_INET;
        server.sin_addr.s_addr = htonl(INADDR_ANY);
        server.sin_port = htons(SERVER_PORT);
        bind(sfd,(struct sockaddr *)&server,sizeof(server));
        listen(sfd,20);
    
        int i;
        pthread_t tid;
        for(i=0;i<MAX_CLIENTS;i++){
            clients[i].cfd = -1;
        }
        while(1){
            clen = sizeof(client);
            cfd = accept(sfd,(struct sockaddr *)&client,&clen);
            for(i = 0;i<MAX_CLIENTS;i++){
                if(clients[i].cfd == -1){
                    clients[i].cfd = cfd;
                    clients[i].client = client;
                    pthread_create(&tid,NULL,echo,(void *)&clients[i]);
                    break;
                }
            }
        }
        
        close(sfd);
        return 0;
    }
    

    这里要注意和基于进程的echo服务器进行比较。在线程中每个buf需要在线程中的栈进行创建,而进程中是没有这个过程,因为进程是独立的虚拟地址空间。同样的还有client和cfd。但是,在I/O复用中,buf只有一个,这是因为每次把buf的内容发送出去后才进入下一次,中间不会被打断,而线程中可能被打断,所以需要多个buf。

    线程池

    虽然线程的创建成本低于进程的创建成本,但总的来说,还是比较大的。所以,为了减少线程的创建次数,就有了线程池。线程池的基本思想是,复用线程。也就是说,线程执行完任务后,不是销毁,而是等待下一个任务。这个类似于生产者消费者模型,消息队列和命令模式。线程池的实现的核心有一个任务队列,任务队列里面有执行函数的地址和执行函数的参数。此外,还需要有一个线程来管理其他线程,如线程不够用,需要创建线程;空闲的线程太多,需要删除一些线程。这个线程采取轮询的方式,定时检查。线程池的代码基本上都是参考了别人的,自己敲了一遍,稍微修改了一下。

    下面给出线程池的实现方式,采取把代码分段注释的方式,整个代码是完整的。由于线程池的代码有点多,所以就单独写了一个C文件。先看看头文件和定义的结构体和函数。

    #ifndef _THREADPOOL_H_
    #define _THREADPOOL_H_
    
    typedef struct threadpool_t threadpool_t;
    threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size);
    int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg);
    int threadpool_destroy(threadpool_t *pool);
    
    #endif
    
    #include<stdio.h>
    #include<unistd.h>
    #include<stdlib.h>
    #include<string.h>
    #include<pthread.h>
    #include<signal.h>
    #include <errno.h>
    #include"threadpool.h"
    
    #define MAX_WAIT_TASK 2
    #define INTERVAL_TIME 1
    #define THREAD_STEP_NUMBER 1
    
    typedef struct{
        void *(*function)(void *); /* thread will call this function */
        void *arg; /* function args */
    }threadpool_task_t;
    
    struct threadpool_t{
        pthread_mutex_t lock;
        pthread_mutex_t thread_counter;
        pthread_cond_t queue_not_full;
        pthread_cond_t queue_not_empty;
    
        pthread_t *threads; /* thread array */
        pthread_t adjust_tid; /* manager thread */
        int min_thr_num; /* threads information */
        int max_thr_num;
        int live_thr_num;
        int busy_thr_num;
        int wait_exit_num;
    
        threadpool_task_t *task_queue; /* task queue */
        int queue_front; /* task queue imformation */
        int queue_rear;
        int queue_size;
        int queue_max_size;
    
        int shutdown; /* if shutdown is true,it will notify all threads exit,then thread pool exit */
    };
    
    void *threadpool_thread(void *threadpool);
    void *adjust_thread(void *threadpool);
    int threadpool_free(threadpool_t *threadpool);
    int is_thread_alive(pthread_t tid);
    

    这里我感觉体现的C语言中的封装思想,如果结构体只在本文件中使用,那么就不需要在头文件中定义,但是函数中又使用了这个类型,这时候我觉得这种处理方式不错,把typedef的声明写在头文件中。这个线程池的每个函数的代码都不难,难的是这样一种思想,我开始在想这个问题的时候,没有想到生产者消费者模型,也没有想到如何让线程休息和退出。这里是C语言中数组的典型表示,需要维护一个长度,然后知道一个指向数组第一个数的指针就可以了。然后是关于队列的表示,这里的表示也是类似的,使用数组,在加上队列的头尾和队列的长度。

    threadpool_t *threadpool_create(int min_thr_num,int max_thr_num,int queue_max_size){
        int i;
        threadpool_t *pool = NULL;
        /* using do while in order to avoid using goto */
        do{
            if( (pool = (threadpool_t *)malloc(sizeof(threadpool_t))) == NULL ){
                break;
            }
    
            /* initial lock */
            if(pthread_mutex_init(&(pool->lock),NULL) != 0
                ||pthread_mutex_init(&(pool->thread_counter),NULL) != 0
                ||pthread_cond_init(&(pool->queue_not_empty),NULL) !=0
                ||pthread_cond_init(&(pool->queue_not_full),NULL) !=0){
                    break;
                }
    
            /* inital threads */
            pool->threads = (pthread_t *)(malloc(sizeof(pthread_t)*max_thr_num));
            if(pool->threads == NULL){
                break;
            }
            memset(pool->threads,0,sizeof(pthread_t)*max_thr_num);
            pool->min_thr_num = min_thr_num;
            pool->max_thr_num = max_thr_num;
            pool->live_thr_num = min_thr_num;
            pool->busy_thr_num = 0;
            for(i=0;i<min_thr_num;i++){
                pthread_create(&(pool->threads[i]),NULL,threadpool_thread,(void *)pool);
            }
            pthread_create(&(pool->adjust_tid),NULL,adjust_thread,(void *)pool);
    
            /* inital queue task */
            pool->task_queue = (threadpool_task_t *)malloc(sizeof(threadpool_task_t)*queue_max_size);
            if(pool->task_queue == NULL){
                break;
            }
            pool->queue_max_size = queue_max_size;
            pool->queue_front = 0;
            pool->queue_rear = 0;
            pool->queue_size = 0;
    
            pool->shutdown = 0;
    
            return pool;
        }while(0);
        printf("create thread pool fail.\n");
        threadpool_free(pool);
        return NULL;
    }
    

    创建线程池的代码没什么好说的,唯一的亮点是do while的用法,这种可以使得出错后跳转到错误处理,避免了使用goto语句。

    void *adjust_thread(void *threadpool){
        int i;
        threadpool_t *pool = (threadpool_t *)threadpool;
        while( !pool->shutdown ){
            sleep(INTERVAL_TIME);
            pthread_mutex_lock(&(pool->lock));
            int queue_size = pool->queue_size;
            int live_thr_num = pool->live_thr_num;
            pthread_mutex_unlock(&(pool->lock));
            pthread_mutex_lock(&(pool->thread_counter));
            int busy_thr_num = pool->busy_thr_num;
            pthread_mutex_unlock(&(pool->thread_counter));
            printf("live thrnum = %d,queue_size = %d\n",live_thr_num,queue_size);
    
            /* check whether need to add thread */
            if(queue_size > MAX_WAIT_TASK && live_thr_num < pool->max_thr_num){
                printf("add threads\n");
                pthread_mutex_lock(&(pool->lock));
                int add = 0;
                for(i=0;i<pool->max_thr_num && add<THREAD_STEP_NUMBER 
                    && pool->live_thr_num < pool->max_thr_num;i++){
                    if(pool->threads[i] == 0 || !is_thread_alive(pool->threads[i])){
                        if( pool->threads[i] != 0 ){
                            printf("reap threads when delete threads\n");
                            pthread_join(pool->threads[i],NULL);
                        }
                        pthread_create(&(pool->threads[i]),NULL,threadpool_thread,(void *)pool);
                        printf("%d\n",pool->live_thr_num);
                        pool->live_thr_num ++;
                        add++;
                    }
                }
                pthread_mutex_unlock(&(pool->lock));
            }
    
            /* check whether need to delete thread */
            if((busy_thr_num*2)< live_thr_num && live_thr_num > pool->min_thr_num){
                pthread_mutex_lock(&(pool->lock));
                int del = THREAD_STEP_NUMBER;
                if(pool->live_thr_num - THREAD_STEP_NUMBER < pool->min_thr_num){
                    del = pool->live_thr_num - pool->min_thr_num;
                }
                pool->wait_exit_num = del;
                pthread_mutex_unlock(&(pool->lock));
                printf("delete threads\n");
                for(i=0;i<THREAD_STEP_NUMBER;i++){
                    pthread_cond_signal(&(pool->queue_not_empty));
                }
            }
        }
        return NULL;
    }
    

    这个是管理者线程,负责动态创建和回收线程,使用轮询的方式。

    int threadpool_add(threadpool_t *pool,void *(*function)(void *arg),void *arg){
        pthread_mutex_lock(&(pool->lock));
    
        while( (pool->queue_size == pool->queue_max_size) && (!pool->shutdown) ){
            pthread_cond_wait(&(pool->queue_not_full),&(pool->lock));
        }
        if(pool->shutdown){
            pthread_mutex_unlock(&(pool->lock));
            return 0;
        }
    
        pool->task_queue[pool->queue_rear].function = function;
        pool->task_queue[pool->queue_rear].arg = arg;
        pool->queue_rear = (pool->queue_rear + 1)%pool->queue_max_size;
        pool->queue_size++;
    
        pthread_cond_signal(&(pool->queue_not_empty));
        pthread_mutex_unlock(&(pool->lock));
    }
    

    把任务加入到队列,这里需要注意同步的问题。

    int threadpool_free(threadpool_t *pool){
        if(pool == NULL){
            return -1;
        }
        if(pool->task_queue){
            free(pool->task_queue);
        }
        if(pool->threads){
            free(pool->threads);
        }
        pthread_mutex_lock(&(pool->lock));
        pthread_mutex_destroy(&(pool->lock));
        pthread_mutex_lock(&(pool->thread_counter));
        pthread_mutex_destroy(&(pool->thread_counter));
        pthread_cond_destroy(&(pool->queue_not_empty));
        pthread_cond_destroy(&(pool->queue_not_full));
        free(pool);
        return 0;
    }
    
    int threadpool_destroy(threadpool_t *pool){
        int i;
        if(pool == NULL){
            return -1;
        }
        pool->shutdown = 1;
        pthread_join(pool->adjust_tid,NULL);
    
        /* let thread quit,and then reap the thread */
        for(i=0;i<pool->live_thr_num;i++){
            pthread_cond_broadcast(&(pool->queue_not_empty));
        }
        for(i=0;i<pool->live_thr_num;i++){
            pthread_join(pool->threads[i],NULL);
        }
        threadpool_free(pool);
        return 0;
    }
    
    int is_thread_alive(pthread_t tid){
        int kill_rc = pthread_kill(tid,0);
        if(kill_rc == ESRCH){
            return 0;
        }
        return 1;
    }
    

    销毁线程,这个没有什么好说的。

    使用线程池对echo服务器进行改写,发现对性能的提高不大,因为客户端是阻塞的,所以,不能让一个任务等待一个线程的结束,因为不知道什么时候会结束。这里就不贴代码了,和线程的代码基本上一样的。

    相关文章

      网友评论

          本文标题:并发编程

          本文链接:https://www.haomeiwen.com/subject/nlplaftx.html