这也是篇学习笔记,加上不少自己的理解。对于并发编程来说,想必做开发的同学都不会陌生。逻辑控制流在时间上是重叠的,就是并发。并发和并行是两个概念,并发多是指在一定的时间间隔内,看起来是同时运行,一般采用时间片,分时操作来完成不同进程的上下文切换,由于cpu运行速度很快,在我们看来整个程序是并行运行的,比如在单核cpu的机器上,也可以一边听歌,一边写文档。
并行那是不管在时间间隔内,微观上,在同一个时刻,多个线程在不同的核心上同时执行。
有些书上对这两个概念也定义的没那么准确,知道就行,也没有必要这么去扣字眼。本文所述的主要是并行,应用级的并行。
一 并行的必要性和实现方法
1.1 并行执行的必要性
- 多处理器上可以实现并行计算,提升性能。
- 访问慢速io设备时候,可以提升cpu利用率,比如我们在打印文件的时候,还可以同时进行其他操作,如果是顺序执行的,那么必然会造成cpu空转等待打印完成后才可以进行其他操作。
- 人机交互 机器机在执行任务的时候,需要随时响应用户操作。
- 减少程序执行时间,通过并行让程序的执行时间缩短
- 服务多个网络客户端,一个服务器应用通过并行的方式对多个客户端提供服务,比如我们都同时打开一个web网站,这个web应用同时为我们多个客户端进行服务。
1.2 并行实现方法
并行实现除了我们常说的多线程,多进程,还有一个就是IO多路复用,我个人觉得IO多路复用只能算是整体的并行,微观上算不上真正的并行,因为它是一个进程,共享同一个地址空间。只是通过将逻辑流转成状态机,根据不同的状态执行不同的操作,这个在高性能服务器开发领域用的比较多。
- 多进程,每个进程都有自己独立的虚拟内存空间,不同的进程之间不能直接访问对方的内存空间,只能通过多进程中的通讯来完成互相访问,这种多进程之间的通信技术叫IPC,包括共享内存,管道,套接字,文件等。
- 多线程,同一个进程下的多线程,是共享同一个虚拟地址空间的,而执行的时候又是独立运行的,线程是cpu调度的单位,任何一个进程都至少有一个线程。
二 并行举例
用网络服务器来举例,还是比较好阐述应用的并行性。
2.1 基于进程的并发编程
网络编程嘛,那就是开发网络服务器服务多个客户端,对于基于进程的网络编程的办法很简单,每接受到一个连接来的时候,就可以开启一个新的进程为这个客户端服务器,当客户端关闭的时候进程销毁。
这种方式的网络服务器,一般比较稳定,但是支持并发数量少,另外由于进程的创建和销毁都是比较重的操作,所以以此方式开发的网络服务器性能一般不够好。对于进程开销大的问题,也可以通过类似与线程池的办法,先预先创建多个进程为即将到来的客户端服务器,比如Apache的prefork模式; 还有的是如果是内部连接数肯定不多的情况下,可以采用这种模式。
多进程
这种模式下需要注意点是:
- 在fork进程后,新进程是对老进程的数据进行了拷贝,这里面的数据包括地址空间,打开的文件描述符,程序计数器,还有程序执行的代码。所以父子进程需要关闭各自不需要的套接字。对于监听的父进程来说,创建子进程后不需要关心连接的套接字,所以可以直接关闭;对于被创建的子进程来说,不需要关心的是服务的套接字,所以需要关闭监听的套接字。
- 子进程执行完毕后,子进程需要被回收,通过注册事件处理来完成父进程对子进程的资源回收:
signal(SIGCHLD, sigchld_handler);
子进程执行结束后,会发送SIGCHLD 信号给父进程,默认是忽略不处理此信号,如果不回收,子进程就成为僵尸进程,销毁系统资源,多了就把系统搞挂了。
#include "lib/common.h"
#define MAX_LINE 4096
char rot13_char(char c) {
if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M'))
return c + 13;
else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z'))
return c - 13;
else
return c;
}
void child_run(int fd) {
char outbuf[MAX_LINE + 1];
size_t outbuf_used = 0;
ssize_t result;
while (1) {
char ch;
result = recv(fd, &ch, 1, 0);
if (result == 0) {
break;
} else if (result == -1) {
perror("read");
break;
}
if (outbuf_used < sizeof(outbuf)) {
outbuf[outbuf_used++] = rot13_char(ch);
}
if (ch == '\n') {
send(fd, outbuf, outbuf_used, 0);
outbuf_used = 0;
continue;
}
}
}
void sigchld_handler(int sig) {
// 回收子进程资源
while (waitpid(-1, 0, WNOHANG) > 0);
return;
}
int main(int c, char **v) {
int listener_fd = tcp_server_listen(SERV_PORT);
signal(SIGCHLD, sigchld_handler);
while (1) {
struct sockaddr_storage ss;
socklen_t slen = sizeof(ss);
int fd = accept(listener_fd, (struct sockaddr *) &ss, &slen);
if (fd < 0) {
error(1, errno, "accept failed");
exit(1);
}
if (fork() == 0) {
// 子进程关闭服务套接字
close(listener_fd);
child_run(fd);
exit(0);
} else {
// 父进程 关闭连接的客户端套接字,因为已经复制给子进程处理了
close(fd);
}
}
return 0;
}
说明,代码来自极客时间《网络编程实战》
2.2 基于多线程的并发模型
进程的创建比较耗时,另外不同的进程之间通信要复杂的多,所以很多服务器采用多线程的模式。
同一个进程的多线程,共享相同的地址空间,使得它们之间的通信更为方便,线程由内核自动调度,不同的线程有不同的上下文,不同线程交错运行的时候,需要发生上线问切换,每个线程都有自己的线程上下文,包括线程ID,栈,栈指针,程序计数器,通用的目标寄存器和条件码。
多进程之间也发生切换,但是上下文更重,没有线程切换快,而且线程没有进程中父子进程的说法,所有的线程都是对等的。
POSIX线程标准接口,比较常用的操作:
#include <pthread.h>
typedef void *(func)(void *);
// 线程的创建
int pthread_create(pthread t *tid, pthread attr t *attr, func *f, void *arg);
// 获取自身的线程ID
pthread t pthread_self(void);
// 终止线程,thread_return为线程返回值
int pthread_exit(void *thread_return);
// 终止pid的线程
int pthread_cancel(pthread t tid);
// 调用后会阻塞等待,直到线程tid终止,回收线程资源
int pthread_join(pthread t tid, void **thread return);
// 分离线程,默认线程是joinable状态,调用后为detached
int pthread_detach(pthread t tid);
pthread_once_t once_control= PTHREAD_ONCE_INIT;
int pthread_once(pthread_once_t * once_control,void (*init_routine)(void));
说明:
- 线程创建后,默认为joinable的,joinable的线程意味着,这个线程可以被其他线程回收资源和杀死,在其他线程回收之前,它的存器资源比如栈等是不会被释放的。当
pthread_detach
一个线程后,线程就是分离的,不能被其他线程回收和杀死,它的存器资源在终止时,被系统自动释放。 -
pthread_once
此函数我以前很少用到,这个函数是为了多线程初始化使用,多个线程执行的时候如果用同一个once_control
调用pthread_once
,则init_routine
只会被调用一次。在初始化多线程的时候有用。
相对于多进程来说,多线程共享数据更方便,多线程的共享部分包括整个进程的虚拟存储区域,它是由只读文本,读写数据,堆,共享的代码和数据区域组成,线程间也共享所有打开的文件集合。
所有的线程上下文的数据显然是无法共享的,比如不同线程的栈数据等。
这种共享带来了方便,也带来了麻烦,那就是共享如果不做控制,容易造成覆盖和错乱,因为共享所以多个线程可以同时访问,如果不做控制一个线程设置的结果可能会被另外一个线程所覆盖。
全局变量,本地的静态变量均是共享的,像函数内的局部变量,如果没有通过指针的方式传递给其他线程,则是非共享的,不同的线程中有不同的函数内的局部变量。
简单的多线程服务器如下:
#include "lib/common.h"
extern void loop_echo(int);
void thread_run(void *arg) {
pthread_detach(pthread_self());
int fd = (int) arg;
loop_echo(fd);
}
int main(int c, char **v) {
int listener_fd = tcp_server_listen(SERV_PORT);
pthread_t tid;
while (1) {
struct sockaddr_storage ss;
socklen_t slen = sizeof(ss);
int fd = accept(listener_fd, (struct sockaddr *) &ss, &slen);
if (fd < 0) {
error(1, errno, "accept failed");
} else {
pthread_create(&tid, NULL, &thread_run, (void *) fd);
}
}
return 0;
}
char rot13_char(char c) {
if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M'))
return c + 13;
else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z'))
return c - 13;
else
return c;
}
void loop_echo(int fd) {
char outbuf[MAX_LINE + 1];
size_t outbuf_used = 0;
ssize_t result;
while (1) {
char ch;
result = recv(fd, &ch, 1, 0);
//断开连接或者出错
if (result == 0) {
break;
} else if (result == -1) {
error(1, errno, "read error");
break;
}
if (outbuf_used < sizeof(outbuf)) {
outbuf[outbuf_used++] = rot13_char(ch);
}
if (ch == '\n') {
send(fd, outbuf, outbuf_used, 0);
outbuf_used = 0;
continue;
}
}
}
这个简单的服务器没什么可以说的,现实中更多是采用线程池结合队列的方式,整体的架构如下:
多线程架构图
这个模式也比较简单,就是客户端连接之后,主线程将socket描述符加入到socket缓存队列中,worker线程从socket缓存队列中取出套接字,然后提供服务。这是个典型的生产者消费者模式,由于主线程和工作线程之间有共享的队列,所以需要加锁;同时由于队列大小有限制,则需要通过信号量等方式控制,比如等队列空了之后,主线程才可以放,等队列中有数据了,worker线程才可以工作。
2.3 基于IO的多路复用的并发编程
我们只所以在前面中采用多线程或多进程方式来实现程序,是因为有多个套接字需要同时服务,而这些套接字操作是阻塞的,没办法进行进行同时处理。如果我们可以把这些套接字放在一起统一进行监控起来,任何套接字有请求我们都知道,根据请求时间的不同进行回调不同的函数进行处理,这就是IO多路复用,现在的高性能的服务器一般都是通过IO多路复用这种方式实现的。
如果是非阻塞的IO,我们可以通过轮询的方式,遍历套接字结合,找出需要进行IO处理的套接字,进行处理,但是这种方式有个缺点就是如果套接字的数据量很多,那么遍历一次会很耗时,在遍历的过程中,如果有其他的客户端发起请求,则无法响应,而且为了快速响应请求,cpu必须在极短的时间间隔内做遍历,消耗大量无用的cpu资源。
现在我们可以通过将套接字集合交给操作系统,系统来判断是否有套接字有IO事件的发生,发生了则返回或超时返回;比如select,poll,epoll等IO分发技术均可以实现这种多路IO复用。
select 函数说明如下:
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
int select(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, struct timeval *timeout);
//从集合中删除指定的fd描述符
void FD_CLR(int fd, fd_set *set);
//判断指定的fd描述符是否存在于集合之中
int FD_ISSET(int fd, fd_set *set);
//将指定的fd添加到集合之中
void FD_SET(int fd, fd_set *set);
//初始化集合
void FD_ZERO(fd_set *set);
select
操作的是称为fd_set的集合,逻辑上我们可以将集合看成一个有n位掩码的,哪一位设置为1,说明哪一位才是描述符集合中的一个元素。
使用select函数,要求内核挂起进程,当一个或多个io事件发生的时候,才会返回发生事件的IO集合,然后我们通过用FD_ISSET来判断fd是否在集合中。
参数说明:nfds 标识最大的描述符+1 ; readfds让内核检测这个集合上的可读事件;writefds是让内核检测的可写集合上的事件;exceptfds让内核检测异常的套接字的集合。
int main(int argc, char **argv) {
if (argc != 2) {
error(1, 0, "usage: select01 <IPaddress>");
}
int socket_fd = tcp_client(argv[1], SERV_PORT);
char recv_line[MAXLINE], send_line[MAXLINE];
int n;
fd_set readmask;
fd_set allreads;
FD_ZERO(&allreads);
FD_SET(0, &allreads);
FD_SET(socket_fd, &allreads);
for (;;) {
readmask = allreads;
int rc = select(socket_fd + 1, &readmask, NULL, NULL, NULL);
if (rc <= 0) {
error(1, errno, "select failed");
}
if (FD_ISSET(socket_fd, &readmask)) {
n = read(socket_fd, recv_line, MAXLINE);
if (n < 0) {
error(1, errno, "read error");
} else if (n == 0) {
error(1, 0, "server terminated \n");
}
recv_line[n] = 0;
fputs(recv_line, stdout);
fputs("\n", stdout);
}
if (FD_ISSET(STDIN_FILENO, &readmask)) {
if (fgets(send_line, MAXLINE, stdin) != NULL) {
int i = strlen(send_line);
if (send_line[i - 1] == '\n') {
send_line[i - 1] = 0;
}
printf("now sending %s\n", send_line);
size_t rt = write(socket_fd, send_line, strlen(send_line));
if (rt < 0) {
error(1, errno, "write failed ");
}
printf("send bytes: %zu \n", rt);
}
}
}
}
初始的时候fd_set 通过FD_ZERO 来进行初始化,结果如下图:
初始化套接字
接着将标准输入的套接字加入进去,如下图:
标准输入加入到套接字集合中
需要特别注意的是:
readmask = allreads;
因为select每次返回的时候,返回的是准备好的套接字集合,同时也会修改注册在select中的函数,所以每次都需要重新赋值。
select 函数虽然可以实现多路复用,但是有不少缺点:
- 套接字集合支持的总数量受限,在32位机器上,支持1024个套接字,64位上为2048个。
- 套接字需要遍历FD_SETSIZE个套接字,来判断套接字是否准备好了,消耗性能。
- 每次有事件发生后,都需要重新调用select函数,这就发生了数据的拷贝,将所有的套接字集合从用户空间拷贝到内核空间,性能比较低。
暂时写到这里吧,下次可以接着写select的改进相关。
网友评论