美文网首页
并发编程第一篇

并发编程第一篇

作者: 明翼 | 来源:发表于2020-03-22 16:47 被阅读0次

    这也是篇学习笔记,加上不少自己的理解。对于并发编程来说,想必做开发的同学都不会陌生。逻辑控制流在时间上是重叠的,就是并发。并发和并行是两个概念,并发多是指在一定的时间间隔内,看起来是同时运行,一般采用时间片,分时操作来完成不同进程的上下文切换,由于cpu运行速度很快,在我们看来整个程序是并行运行的,比如在单核cpu的机器上,也可以一边听歌,一边写文档。
    并行那是不管在时间间隔内,微观上,在同一个时刻,多个线程在不同的核心上同时执行。
    有些书上对这两个概念也定义的没那么准确,知道就行,也没有必要这么去扣字眼。本文所述的主要是并行,应用级的并行。

    一 并行的必要性和实现方法

    1.1 并行执行的必要性

    • 多处理器上可以实现并行计算,提升性能。
    • 访问慢速io设备时候,可以提升cpu利用率,比如我们在打印文件的时候,还可以同时进行其他操作,如果是顺序执行的,那么必然会造成cpu空转等待打印完成后才可以进行其他操作。
    • 人机交互 机器机在执行任务的时候,需要随时响应用户操作。
    • 减少程序执行时间,通过并行让程序的执行时间缩短
    • 服务多个网络客户端,一个服务器应用通过并行的方式对多个客户端提供服务,比如我们都同时打开一个web网站,这个web应用同时为我们多个客户端进行服务。

    1.2 并行实现方法

    并行实现除了我们常说的多线程,多进程,还有一个就是IO多路复用,我个人觉得IO多路复用只能算是整体的并行,微观上算不上真正的并行,因为它是一个进程,共享同一个地址空间。只是通过将逻辑流转成状态机,根据不同的状态执行不同的操作,这个在高性能服务器开发领域用的比较多。

    • 多进程,每个进程都有自己独立的虚拟内存空间,不同的进程之间不能直接访问对方的内存空间,只能通过多进程中的通讯来完成互相访问,这种多进程之间的通信技术叫IPC,包括共享内存,管道,套接字,文件等。
    • 多线程,同一个进程下的多线程,是共享同一个虚拟地址空间的,而执行的时候又是独立运行的,线程是cpu调度的单位,任何一个进程都至少有一个线程。

    二 并行举例

    用网络服务器来举例,还是比较好阐述应用的并行性。

    2.1 基于进程的并发编程

    网络编程嘛,那就是开发网络服务器服务多个客户端,对于基于进程的网络编程的办法很简单,每接受到一个连接来的时候,就可以开启一个新的进程为这个客户端服务器,当客户端关闭的时候进程销毁。
    这种方式的网络服务器,一般比较稳定,但是支持并发数量少,另外由于进程的创建和销毁都是比较重的操作,所以以此方式开发的网络服务器性能一般不够好。对于进程开销大的问题,也可以通过类似与线程池的办法,先预先创建多个进程为即将到来的客户端服务器,比如Apache的prefork模式; 还有的是如果是内部连接数肯定不多的情况下,可以采用这种模式。


    多进程

    这种模式下需要注意点是:

    1. 在fork进程后,新进程是对老进程的数据进行了拷贝,这里面的数据包括地址空间,打开的文件描述符,程序计数器,还有程序执行的代码。所以父子进程需要关闭各自不需要的套接字。对于监听的父进程来说,创建子进程后不需要关心连接的套接字,所以可以直接关闭;对于被创建的子进程来说,不需要关心的是服务的套接字,所以需要关闭监听的套接字。
    2. 子进程执行完毕后,子进程需要被回收,通过注册事件处理来完成父进程对子进程的资源回收:
    signal(SIGCHLD, sigchld_handler);  
    

    子进程执行结束后,会发送SIGCHLD 信号给父进程,默认是忽略不处理此信号,如果不回收,子进程就成为僵尸进程,销毁系统资源,多了就把系统搞挂了。

    
    #include "lib/common.h"
    
    #define MAX_LINE 4096
    
    char rot13_char(char c) {
        if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M'))
            return c + 13;
        else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z'))
            return c - 13;
        else
            return c;
    }
    
    void child_run(int fd) {
        char outbuf[MAX_LINE + 1];
        size_t outbuf_used = 0;
        ssize_t result;
    
        while (1) {
            char ch;
            result = recv(fd, &ch, 1, 0);
            if (result == 0) {
                break;
            } else if (result == -1) {
                perror("read");
                break;
            }
    
            if (outbuf_used < sizeof(outbuf)) {
                outbuf[outbuf_used++] = rot13_char(ch);
            }
    
            if (ch == '\n') {
                send(fd, outbuf, outbuf_used, 0);
                outbuf_used = 0;
                continue;
            }
        }
    }
    
    void sigchld_handler(int sig) {
       // 回收子进程资源
        while (waitpid(-1, 0, WNOHANG) > 0);
        return;
    }
    
    int main(int c, char **v) {
        int listener_fd = tcp_server_listen(SERV_PORT);
        signal(SIGCHLD, sigchld_handler);
        while (1) {
            struct sockaddr_storage ss;
            socklen_t slen = sizeof(ss);
            int fd = accept(listener_fd, (struct sockaddr *) &ss, &slen);
            if (fd < 0) {
                error(1, errno, "accept failed");
                exit(1);
            }
    
            if (fork() == 0) {
                // 子进程关闭服务套接字
                close(listener_fd);
                child_run(fd);
                exit(0);
            } else {
                 // 父进程 关闭连接的客户端套接字,因为已经复制给子进程处理了
                close(fd);
            }
        }
    
        return 0;
    }
    

    说明,代码来自极客时间《网络编程实战》

    2.2 基于多线程的并发模型

    进程的创建比较耗时,另外不同的进程之间通信要复杂的多,所以很多服务器采用多线程的模式。
    同一个进程的多线程,共享相同的地址空间,使得它们之间的通信更为方便,线程由内核自动调度,不同的线程有不同的上下文,不同线程交错运行的时候,需要发生上线问切换,每个线程都有自己的线程上下文,包括线程ID,栈,栈指针,程序计数器,通用的目标寄存器和条件码。
    多进程之间也发生切换,但是上下文更重,没有线程切换快,而且线程没有进程中父子进程的说法,所有的线程都是对等的。
    POSIX线程标准接口,比较常用的操作:

    #include <pthread.h>
    typedef void *(func)(void *);
    // 线程的创建
    int pthread_create(pthread t *tid, pthread attr t *attr, func *f, void *arg);
    // 获取自身的线程ID
    pthread t pthread_self(void);
    // 终止线程,thread_return为线程返回值
    int pthread_exit(void *thread_return);
    // 终止pid的线程
    int pthread_cancel(pthread t tid);
    // 调用后会阻塞等待,直到线程tid终止,回收线程资源
    int pthread_join(pthread t tid, void **thread return);
    // 分离线程,默认线程是joinable状态,调用后为detached
    int pthread_detach(pthread t tid);
    
    pthread_once_t once_control= PTHREAD_ONCE_INIT;
    
    int  pthread_once(pthread_once_t * once_control,void (*init_routine)(void));
    

    说明:

    1. 线程创建后,默认为joinable的,joinable的线程意味着,这个线程可以被其他线程回收资源和杀死,在其他线程回收之前,它的存器资源比如栈等是不会被释放的。当pthread_detach一个线程后,线程就是分离的,不能被其他线程回收和杀死,它的存器资源在终止时,被系统自动释放。
    2. pthread_once 此函数我以前很少用到,这个函数是为了多线程初始化使用,多个线程执行的时候如果用同一个once_control调用pthread_once,则init_routine只会被调用一次。在初始化多线程的时候有用。

    相对于多进程来说,多线程共享数据更方便,多线程的共享部分包括整个进程的虚拟存储区域,它是由只读文本,读写数据,堆,共享的代码和数据区域组成,线程间也共享所有打开的文件集合。
    所有的线程上下文的数据显然是无法共享的,比如不同线程的栈数据等。
    这种共享带来了方便,也带来了麻烦,那就是共享如果不做控制,容易造成覆盖和错乱,因为共享所以多个线程可以同时访问,如果不做控制一个线程设置的结果可能会被另外一个线程所覆盖。

    全局变量,本地的静态变量均是共享的,像函数内的局部变量,如果没有通过指针的方式传递给其他线程,则是非共享的,不同的线程中有不同的函数内的局部变量。

    简单的多线程服务器如下:

    
    #include "lib/common.h"
    
    extern void loop_echo(int);
    
    void thread_run(void *arg) {
        pthread_detach(pthread_self());
        int fd = (int) arg;
        loop_echo(fd);
    }
    
    int main(int c, char **v) {
        int listener_fd = tcp_server_listen(SERV_PORT);
        pthread_t tid;
        
        while (1) {
            struct sockaddr_storage ss;
            socklen_t slen = sizeof(ss);
            int fd = accept(listener_fd, (struct sockaddr *) &ss, &slen);
            if (fd < 0) {
                error(1, errno, "accept failed");
            } else {
                pthread_create(&tid, NULL, &thread_run, (void *) fd);
            }
        }
    
        return 0;
    }
    
    
    char rot13_char(char c) {
        if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M'))
            return c + 13;
        else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z'))
            return c - 13;
        else
            return c;
    }
    
    void loop_echo(int fd) {
        char outbuf[MAX_LINE + 1];
        size_t outbuf_used = 0;
        ssize_t result;
        while (1) {
            char ch;
            result = recv(fd, &ch, 1, 0);
    
            //断开连接或者出错
            if (result == 0) {
                break;
            } else if (result == -1) {
                error(1, errno, "read error");
                break;
            }
    
            if (outbuf_used < sizeof(outbuf)) {
                outbuf[outbuf_used++] = rot13_char(ch);
            }
    
            if (ch == '\n') {
                send(fd, outbuf, outbuf_used, 0);
                outbuf_used = 0;
                continue;
            }
        }
    }
    

    这个简单的服务器没什么可以说的,现实中更多是采用线程池结合队列的方式,整体的架构如下:


    多线程架构图

    这个模式也比较简单,就是客户端连接之后,主线程将socket描述符加入到socket缓存队列中,worker线程从socket缓存队列中取出套接字,然后提供服务。这是个典型的生产者消费者模式,由于主线程和工作线程之间有共享的队列,所以需要加锁;同时由于队列大小有限制,则需要通过信号量等方式控制,比如等队列空了之后,主线程才可以放,等队列中有数据了,worker线程才可以工作。

    2.3 基于IO的多路复用的并发编程

    我们只所以在前面中采用多线程或多进程方式来实现程序,是因为有多个套接字需要同时服务,而这些套接字操作是阻塞的,没办法进行进行同时处理。如果我们可以把这些套接字放在一起统一进行监控起来,任何套接字有请求我们都知道,根据请求时间的不同进行回调不同的函数进行处理,这就是IO多路复用,现在的高性能的服务器一般都是通过IO多路复用这种方式实现的。

    如果是非阻塞的IO,我们可以通过轮询的方式,遍历套接字结合,找出需要进行IO处理的套接字,进行处理,但是这种方式有个缺点就是如果套接字的数据量很多,那么遍历一次会很耗时,在遍历的过程中,如果有其他的客户端发起请求,则无法响应,而且为了快速响应请求,cpu必须在极短的时间间隔内做遍历,消耗大量无用的cpu资源。

    现在我们可以通过将套接字集合交给操作系统,系统来判断是否有套接字有IO事件的发生,发生了则返回或超时返回;比如select,poll,epoll等IO分发技术均可以实现这种多路IO复用。
    select 函数说明如下:

    #include <sys/time.h>
    #include <sys/types.h>
    #include <unistd.h>
    
    int select(int nfds, fd_set *readfds, fd_set *writefds,
               fd_set *exceptfds, struct timeval *timeout);
    //从集合中删除指定的fd描述符
    void FD_CLR(int fd, fd_set *set); 
    //判断指定的fd描述符是否存在于集合之中
    int  FD_ISSET(int fd, fd_set *set); 
    //将指定的fd添加到集合之中
    void FD_SET(int fd, fd_set *set);
    //初始化集合
    void FD_ZERO(fd_set *set); 
    

    select 操作的是称为fd_set的集合,逻辑上我们可以将集合看成一个有n位掩码的,哪一位设置为1,说明哪一位才是描述符集合中的一个元素。
    使用select函数,要求内核挂起进程,当一个或多个io事件发生的时候,才会返回发生事件的IO集合,然后我们通过用FD_ISSET来判断fd是否在集合中。

    参数说明:nfds 标识最大的描述符+1 ; readfds让内核检测这个集合上的可读事件;writefds是让内核检测的可写集合上的事件;exceptfds让内核检测异常的套接字的集合。

    
    int main(int argc, char **argv) {
        if (argc != 2) {
            error(1, 0, "usage: select01 <IPaddress>");
        }
        int socket_fd = tcp_client(argv[1], SERV_PORT);
    
        char recv_line[MAXLINE], send_line[MAXLINE];
        int n;
    
        fd_set readmask;
        fd_set allreads;
        FD_ZERO(&allreads);
        FD_SET(0, &allreads);
        FD_SET(socket_fd, &allreads);
    
        for (;;) {
            readmask = allreads;
            int rc = select(socket_fd + 1, &readmask, NULL, NULL, NULL);
    
            if (rc <= 0) {
                error(1, errno, "select failed");
            }
    
            if (FD_ISSET(socket_fd, &readmask)) {
                n = read(socket_fd, recv_line, MAXLINE);
                if (n < 0) {
                    error(1, errno, "read error");
                } else if (n == 0) {
                    error(1, 0, "server terminated \n");
                }
                recv_line[n] = 0;
                fputs(recv_line, stdout);
                fputs("\n", stdout);
            }
    
            if (FD_ISSET(STDIN_FILENO, &readmask)) {
                if (fgets(send_line, MAXLINE, stdin) != NULL) {
                    int i = strlen(send_line);
                    if (send_line[i - 1] == '\n') {
                        send_line[i - 1] = 0;
                    }
    
                    printf("now sending %s\n", send_line);
                    size_t rt = write(socket_fd, send_line, strlen(send_line));
                    if (rt < 0) {
                        error(1, errno, "write failed ");
                    }
                    printf("send bytes: %zu \n", rt);
                }
            }
        }
    
    }
    

    初始的时候fd_set 通过FD_ZERO 来进行初始化,结果如下图:


    初始化套接字

    接着将标准输入的套接字加入进去,如下图:


    标准输入加入到套接字集合中
    需要特别注意的是:
           readmask = allreads;
    

    因为select每次返回的时候,返回的是准备好的套接字集合,同时也会修改注册在select中的函数,所以每次都需要重新赋值。

    select 函数虽然可以实现多路复用,但是有不少缺点:

    • 套接字集合支持的总数量受限,在32位机器上,支持1024个套接字,64位上为2048个。
    • 套接字需要遍历FD_SETSIZE个套接字,来判断套接字是否准备好了,消耗性能。
    • 每次有事件发生后,都需要重新调用select函数,这就发生了数据的拷贝,将所有的套接字集合从用户空间拷贝到内核空间,性能比较低。

    暂时写到这里吧,下次可以接着写select的改进相关。

    相关文章

      网友评论

          本文标题:并发编程第一篇

          本文链接:https://www.haomeiwen.com/subject/umumyhtx.html