并发在下列情况中是很有用的:
- 访问慢速I/O设备。
- 与人交互。
- 通过推迟工作以降低延迟。
- 服务多个网络客户端。
- 在多核机器上进行并行计算。
现代操作系统提供了三种基本的构造并发程序的方法:
- 进程。
- I/O多路复用。
- 线程。
基于进程的并发编程
构建并发程序最简单的方法就是进程,进程有独立的地址空间,不用担心一个虚拟地址覆盖另一个进程的虚拟地址,但会导致进程间通信比较麻烦。此外,基于进程的并发往往比较慢,因为进程控制和IPC的开销很高。下面是基于进程的echo服务器版本:
#include<stdio.h>
#include<string.h>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<unistd.h>
#include<signal.h>
#include<sys/types.h>
#include<sys/wait.h>
#define SERVER_PORT 9090
#define BUFFER_SIZE 1024
void sigchild_handler(int sig){
while(waitpid(0,NULL,WNOHANG) > 0)
;
}
int main(int argc, char const *argv[])
{
struct sockaddr_in server,client;
int sfd,cfd;
socklen_t clen;
int n;
char buf[BUFFER_SIZE];
pid_t pid;
/* register the signal handler,child process have the same handler,
* but only parent process receive the signal */
struct sigaction newact;
newact.sa_handler = sigchild_handler;
sigemptyset(&newact.sa_mask);
newact.sa_flags = 0;
sigaction(SIGCHLD,&newact,NULL);
/* initial socket */
sfd = socket(AF_INET,SOCK_STREAM,0);
memset(&server,0,sizeof(server));
server.sin_family = AF_INET;
server.sin_addr.s_addr = htonl(INADDR_ANY);
server.sin_port = htons(SERVER_PORT);
bind(sfd,(struct sockaddr *)&server,sizeof(server));
listen(sfd,20);
while(1){
clen = sizeof(client);
cfd = accept(sfd,(struct sockaddr *)&client,&clen);
pid = fork();
if(pid > 0){ /* parent process */
close(cfd);
}else if( pid == 0 ){
while( (n = read(cfd,buf,BUFFER_SIZE)) > 0){
write(STDOUT_FILENO,buf,n);
write(cfd,buf,n);
}
close(cfd);
return 0; /* child process quit */
}
}
close(sfd);
return 0;
}
基于I/O多路复用的并发编程
在我看来,I/O多路复用就类似于餐馆的服务员。服务员负责监听一些桌子,如果桌子有客人来了,需要给客人菜单;如果某张桌子的菜好了,需要给他们上菜;如果客人吃完了,需要收拾桌子;如果没有事件,那么服务员处于等待的状态,等待某张桌子中上述事件中的一个的发生。很明显,这是并发了,因为通常都是一个服务员,为多张桌子服务。I/O多路复用的思想和这个类似。
I/O多路复用可以用作并发事件驱动程序的基础,在事件驱动程序中,某些事件导致流向前推进。一般的思路是将逻辑流模型化为状态机。基于I/O多路的事件驱动编程的优点是,它比基于线程和进程的设计给了程序员更多的对程序行为的控制,并且,它有明显的性能优势,现代高性能服务器,如Node.js,nginx都使用了这种编程方式。事件驱动设计的明显的缺点就是编码复杂。就如服务员这个比喻,服务员很多情况下无法把一件事做完再去做下一件事,例如如果客人点菜很慢,站在那里等是很浪费资源的。所以,基于这种模式的编程每次执行的代码应该是很短的。通常需要把一个事件分成若干部分,所以I/O多路复用的编码比较复杂。
I/O多路复用的基本函数是select函数,它要求内核挂起进程,只有一个或多个I/O事件发生后,才将控制返回给应用程序。select及其相关函数的原型如下:
/* 返回已准备好的描述符的非零的个数,若出错则返回-1 */
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
/* 删除set中的fd */
void FD_CLR(int fd, fd_set *set);
/* 判断fd是否在set中 */
int FD_ISSET(int fd, fd_set *set);
/* 将fd添加到set中 */
void FD_SET(int fd, fd_set *set);
/* 将set清零 */
void FD_ZERO(fd_set *set);
下面使用select函数对echo服务器进行改写。
#include<stdio.h>
#include<string.h>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<unistd.h>
#include<sys/select.h>
#define SERVER_PORT 9090
#define BUFFER_SIZE 1024
int main(int argc, char const *argv[])
{
struct sockaddr_in server,client;
socklen_t clen;
int sfd;
char buf[BUFFER_SIZE];
int n;
/* initial socket */
sfd = socket(AF_INET,SOCK_STREAM,0);
memset(&server,0,sizeof(server));
server.sin_family = AF_INET;
server.sin_addr.s_addr = htonl(INADDR_ANY);
server.sin_port = htons(SERVER_PORT);
bind(sfd,(struct sockaddr *)&server,sizeof(server));
listen(sfd,20);
/* init fd_set */
int cfd[FD_SETSIZE];
int maxi = 0;
int maxfd = sfd;
int i;
for(i = 0;i<FD_SETSIZE;i++){
cfd[i] = -1;
}
fd_set allset,tempset;
FD_ZERO(&allset);
FD_SET(sfd,&allset);
int nready;
while(1){
tempset = allset;
nready = select(maxfd+1,&tempset,NULL,NULL,NULL);
if( FD_ISSET(sfd,&tempset) ){
clen = sizeof(client);
int ret = accept(sfd,(struct sockaddr *)&client,&clen);
for(i = 0;i<FD_SETSIZE;i++){
if(cfd[i] == -1){
cfd[i] = ret;
break;
}
}
if(i != FD_SETSIZE){
if(ret > maxfd){ /* listen to all file descriptors */
maxfd = ret;
}
if( i > maxi){
maxi = i;
}
FD_SET(ret,&allset);
}
if(--nready == 0){
continue;
}
}
for(i=0;i<=maxi;i++){
if( FD_ISSET(cfd[i],&tempset) ){
if( (n = read(cfd[i],buf,BUFFER_SIZE)) >0 ){
write(STDOUT_FILENO,buf,n);
write(cfd[i],buf,n);
}else{
close(cfd[i]);
FD_CLR(cfd[i],&allset);
cfd[i] = -1;
}
if(--nready == 0){
break;
}
}
}
}
close(sfd);
return 0;
}
基于线程的并发编程
基于线程的逻辑流结合了基于进程和基于I/O多路复用的流的特性。同进程一样,线程由内核自动调度,并且内核通过一个整数ID来识别线程。同基于I/O多路复用的流一样,多个线程运行在单一进程的上下文中,因此共享这个进程虚拟地址空间的所有内容,包括它的代码、数据、堆、共享库和打开的文件。下面基于线程对echo服务器进行改写:
#include<stdio.h>
#include<string.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include<unistd.h>
#include<pthread.h>
#define SERVER_PORT 9090
#define BUFFER_SIZE 1024
#define MAX_CLIENTS 1024
struct socket_info{
struct sockaddr_in client;
int cfd;
};
struct socket_info clients[MAX_CLIENTS];
void *echo(void *arg){
struct socket_info *si = (struct socket_info *)arg;
char buf[BUFFER_SIZE];
int n;
pthread_detach(pthread_self());
while( (n = read(si->cfd,buf,BUFFER_SIZE)) >0){
write(STDOUT_FILENO,buf,n);
write(si->cfd,buf,n);
}
close(si->cfd);
si->cfd = -1;
return NULL;
}
int main(int argc, char const *argv[])
{
struct sockaddr_in server,client;
socklen_t clen;
int sfd,cfd;
/* initial socket */
sfd = socket(AF_INET,SOCK_STREAM,0);
memset(&server,0,sizeof(server));
server.sin_family = AF_INET;
server.sin_addr.s_addr = htonl(INADDR_ANY);
server.sin_port = htons(SERVER_PORT);
bind(sfd,(struct sockaddr *)&server,sizeof(server));
listen(sfd,20);
int i;
pthread_t tid;
for(i=0;i<MAX_CLIENTS;i++){
clients[i].cfd = -1;
}
while(1){
clen = sizeof(client);
cfd = accept(sfd,(struct sockaddr *)&client,&clen);
for(i = 0;i<MAX_CLIENTS;i++){
if(clients[i].cfd == -1){
clients[i].cfd = cfd;
clients[i].client = client;
pthread_create(&tid,NULL,echo,(void *)&clients[i]);
break;
}
}
}
close(sfd);
return 0;
}
这里要注意和基于进程的echo服务器进行比较。在线程中每个buf需要在线程中的栈进行创建,而进程中是没有这个过程,因为进程是独立的虚拟地址空间。同样的还有client和cfd。但是,在I/O复用中,buf只有一个,这是因为每次把buf的内容发送出去后才进入下一次,中间不会被打断,而线程中可能被打断,所以需要多个buf。
线程池
虽然线程的创建成本低于进程的创建成本,但总的来说,还是比较大的。所以,为了减少线程的创建次数,就有了线程池。线程池的基本思想是,复用线程。也就是说,线程执行完任务后,不是销毁,而是等待下一个任务。这个类似于生产者消费者模型,消息队列和命令模式。线程池的实现的核心有一个任务队列,任务队列里面有执行函数的地址和执行函数的参数。此外,还需要有一个线程来管理其他线程,如线程不够用,需要创建线程;空闲的线程太多,需要删除一些线程。这个线程采取轮询的方式,定时检查。线程池的代码基本上都是参考了别人的,自己敲了一遍,稍微修改了一下。
下面给出线程池的实现方式,采取把代码分段注释的方式,整个代码是完整的。由于线程池的代码有点多,所以就单独写了一个C文件。先看看头文件和定义的结构体和函数。
#ifndef _THREADPOOL_H_
#define _THREADPOOL_H_
typedef struct threadpool_t threadpool_t;
threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size);
int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg);
int threadpool_destroy(threadpool_t *pool);
#endif
#include<stdio.h>
#include<unistd.h>
#include<stdlib.h>
#include<string.h>
#include<pthread.h>
#include<signal.h>
#include <errno.h>
#include"threadpool.h"
#define MAX_WAIT_TASK 2
#define INTERVAL_TIME 1
#define THREAD_STEP_NUMBER 1
typedef struct{
void *(*function)(void *); /* thread will call this function */
void *arg; /* function args */
}threadpool_task_t;
struct threadpool_t{
pthread_mutex_t lock;
pthread_mutex_t thread_counter;
pthread_cond_t queue_not_full;
pthread_cond_t queue_not_empty;
pthread_t *threads; /* thread array */
pthread_t adjust_tid; /* manager thread */
int min_thr_num; /* threads information */
int max_thr_num;
int live_thr_num;
int busy_thr_num;
int wait_exit_num;
threadpool_task_t *task_queue; /* task queue */
int queue_front; /* task queue imformation */
int queue_rear;
int queue_size;
int queue_max_size;
int shutdown; /* if shutdown is true,it will notify all threads exit,then thread pool exit */
};
void *threadpool_thread(void *threadpool);
void *adjust_thread(void *threadpool);
int threadpool_free(threadpool_t *threadpool);
int is_thread_alive(pthread_t tid);
这里我感觉体现的C语言中的封装思想,如果结构体只在本文件中使用,那么就不需要在头文件中定义,但是函数中又使用了这个类型,这时候我觉得这种处理方式不错,把typedef的声明写在头文件中。这个线程池的每个函数的代码都不难,难的是这样一种思想,我开始在想这个问题的时候,没有想到生产者消费者模型,也没有想到如何让线程休息和退出。这里是C语言中数组的典型表示,需要维护一个长度,然后知道一个指向数组第一个数的指针就可以了。然后是关于队列的表示,这里的表示也是类似的,使用数组,在加上队列的头尾和队列的长度。
threadpool_t *threadpool_create(int min_thr_num,int max_thr_num,int queue_max_size){
int i;
threadpool_t *pool = NULL;
/* using do while in order to avoid using goto */
do{
if( (pool = (threadpool_t *)malloc(sizeof(threadpool_t))) == NULL ){
break;
}
/* initial lock */
if(pthread_mutex_init(&(pool->lock),NULL) != 0
||pthread_mutex_init(&(pool->thread_counter),NULL) != 0
||pthread_cond_init(&(pool->queue_not_empty),NULL) !=0
||pthread_cond_init(&(pool->queue_not_full),NULL) !=0){
break;
}
/* inital threads */
pool->threads = (pthread_t *)(malloc(sizeof(pthread_t)*max_thr_num));
if(pool->threads == NULL){
break;
}
memset(pool->threads,0,sizeof(pthread_t)*max_thr_num);
pool->min_thr_num = min_thr_num;
pool->max_thr_num = max_thr_num;
pool->live_thr_num = min_thr_num;
pool->busy_thr_num = 0;
for(i=0;i<min_thr_num;i++){
pthread_create(&(pool->threads[i]),NULL,threadpool_thread,(void *)pool);
}
pthread_create(&(pool->adjust_tid),NULL,adjust_thread,(void *)pool);
/* inital queue task */
pool->task_queue = (threadpool_task_t *)malloc(sizeof(threadpool_task_t)*queue_max_size);
if(pool->task_queue == NULL){
break;
}
pool->queue_max_size = queue_max_size;
pool->queue_front = 0;
pool->queue_rear = 0;
pool->queue_size = 0;
pool->shutdown = 0;
return pool;
}while(0);
printf("create thread pool fail.\n");
threadpool_free(pool);
return NULL;
}
创建线程池的代码没什么好说的,唯一的亮点是do while的用法,这种可以使得出错后跳转到错误处理,避免了使用goto语句。
void *adjust_thread(void *threadpool){
int i;
threadpool_t *pool = (threadpool_t *)threadpool;
while( !pool->shutdown ){
sleep(INTERVAL_TIME);
pthread_mutex_lock(&(pool->lock));
int queue_size = pool->queue_size;
int live_thr_num = pool->live_thr_num;
pthread_mutex_unlock(&(pool->lock));
pthread_mutex_lock(&(pool->thread_counter));
int busy_thr_num = pool->busy_thr_num;
pthread_mutex_unlock(&(pool->thread_counter));
printf("live thrnum = %d,queue_size = %d\n",live_thr_num,queue_size);
/* check whether need to add thread */
if(queue_size > MAX_WAIT_TASK && live_thr_num < pool->max_thr_num){
printf("add threads\n");
pthread_mutex_lock(&(pool->lock));
int add = 0;
for(i=0;i<pool->max_thr_num && add<THREAD_STEP_NUMBER
&& pool->live_thr_num < pool->max_thr_num;i++){
if(pool->threads[i] == 0 || !is_thread_alive(pool->threads[i])){
if( pool->threads[i] != 0 ){
printf("reap threads when delete threads\n");
pthread_join(pool->threads[i],NULL);
}
pthread_create(&(pool->threads[i]),NULL,threadpool_thread,(void *)pool);
printf("%d\n",pool->live_thr_num);
pool->live_thr_num ++;
add++;
}
}
pthread_mutex_unlock(&(pool->lock));
}
/* check whether need to delete thread */
if((busy_thr_num*2)< live_thr_num && live_thr_num > pool->min_thr_num){
pthread_mutex_lock(&(pool->lock));
int del = THREAD_STEP_NUMBER;
if(pool->live_thr_num - THREAD_STEP_NUMBER < pool->min_thr_num){
del = pool->live_thr_num - pool->min_thr_num;
}
pool->wait_exit_num = del;
pthread_mutex_unlock(&(pool->lock));
printf("delete threads\n");
for(i=0;i<THREAD_STEP_NUMBER;i++){
pthread_cond_signal(&(pool->queue_not_empty));
}
}
}
return NULL;
}
这个是管理者线程,负责动态创建和回收线程,使用轮询的方式。
int threadpool_add(threadpool_t *pool,void *(*function)(void *arg),void *arg){
pthread_mutex_lock(&(pool->lock));
while( (pool->queue_size == pool->queue_max_size) && (!pool->shutdown) ){
pthread_cond_wait(&(pool->queue_not_full),&(pool->lock));
}
if(pool->shutdown){
pthread_mutex_unlock(&(pool->lock));
return 0;
}
pool->task_queue[pool->queue_rear].function = function;
pool->task_queue[pool->queue_rear].arg = arg;
pool->queue_rear = (pool->queue_rear + 1)%pool->queue_max_size;
pool->queue_size++;
pthread_cond_signal(&(pool->queue_not_empty));
pthread_mutex_unlock(&(pool->lock));
}
把任务加入到队列,这里需要注意同步的问题。
int threadpool_free(threadpool_t *pool){
if(pool == NULL){
return -1;
}
if(pool->task_queue){
free(pool->task_queue);
}
if(pool->threads){
free(pool->threads);
}
pthread_mutex_lock(&(pool->lock));
pthread_mutex_destroy(&(pool->lock));
pthread_mutex_lock(&(pool->thread_counter));
pthread_mutex_destroy(&(pool->thread_counter));
pthread_cond_destroy(&(pool->queue_not_empty));
pthread_cond_destroy(&(pool->queue_not_full));
free(pool);
return 0;
}
int threadpool_destroy(threadpool_t *pool){
int i;
if(pool == NULL){
return -1;
}
pool->shutdown = 1;
pthread_join(pool->adjust_tid,NULL);
/* let thread quit,and then reap the thread */
for(i=0;i<pool->live_thr_num;i++){
pthread_cond_broadcast(&(pool->queue_not_empty));
}
for(i=0;i<pool->live_thr_num;i++){
pthread_join(pool->threads[i],NULL);
}
threadpool_free(pool);
return 0;
}
int is_thread_alive(pthread_t tid){
int kill_rc = pthread_kill(tid,0);
if(kill_rc == ESRCH){
return 0;
}
return 1;
}
销毁线程,这个没有什么好说的。
使用线程池对echo服务器进行改写,发现对性能的提高不大,因为客户端是阻塞的,所以,不能让一个任务等待一个线程的结束,因为不知道什么时候会结束。这里就不贴代码了,和线程的代码基本上一样的。
网友评论