美文网首页
服务器的并发模式

服务器的并发模式

作者: ustclcl | 来源:发表于2019-04-20 23:43 被阅读0次
    1. fork进程
    2. IO复用
    3. 线程

    多进程

    主进程监听,在循环中接受连接请求,当连接建立后,fork一个子进程,在子进程中进行处理。

    • 主进程:listen -> while(1) -> accept -> fork -> close connfd

    • 子进程:close listenfd -> handle -> close connfd

    server:

    [root@linuxkit-00155ddc0103 inet]# ./echoserverfork 6667
    Parent: waitting...
    name[localhost] port[38276]connected! Child No.1 will handle it!
    Parent: waitting...
    server recived 6 bytes
    name[localhost] port[38280]connected! Child No.2 will handle it!
    Parent: waitting...
    server recived 7 bytes

    client1:

    [root@linuxkit-00155ddc0103 inet]# ./echoclient localhost 6667
    first
    first

    client2:

    [root@linuxkit-00155ddc0103 inet]# ./echoclient localhost 6667
    second
    second

    code

    #include "mynet.h"
    int open_listenfd(char *port);
    void echo(int connfd);
    void sigchld_handler(int sig)
    {
        while(waitpid(-1,0,WNOHANG) >0)
          ;
        return;
    }
    
    int main(int argc, char **argv)
    {
        int listenfd, connfd, cno=1;
        socklen_t clientlen;
        struct sockaddr_storage clientaddr;
        char client_hostname[MAXLINE], client_port[MAXLINE];
    
        if(argc!=2){
          fprintf(stderr,"usage: %s <port>\n",argv[0]);
          exit(0);
        }
        signal(SIGCHLD, sigchld_handler);
        listenfd = open_listenfd(argv[1]);
        while(1){
            printf("Parent: waitting...\n");
            clientlen = sizeof(struct sockaddr_storage);
            connfd = accept(listenfd, (SA*)&clientaddr,&clientlen);
            getnameinfo((SA*)&clientaddr,clientlen,client_hostname,MAXLINE,client_port,MAXLINE,0);
            printf("name[%s] port[%s]connected! Child No.%d will handle it!\n",
                        client_hostname,client_port,cno++);
            if(fork()==0) //child
            {
                close(listenfd);
                echo(connfd);
                close(connfd);
                exit(0);
            }
            close(connfd);
        }
        exit(0);
    }
    void echo(int connfd)
    {
        size_t n;
        char buf[MAXLINE];
    
        while((n=read(connfd,buf,MAXLINE))!=0){
            printf("server recived %d bytes\n",(int)n);
            write(connfd,buf,n);
        }
    }
    © 2019 GitHub, Inc.
    

    IO复用

    IO复用即使用select函数

    #include <sys/select.h>
    int select(int n, fd_set *fdset, NULL, NULL, NULL) //等待一组描述符准备好读
    FD_ZERO(fd_set *fdset);
    FD_CLR(int fd, fd_set *fdset);
    FD_SET(int fd, fd_set *fdset);
    FD_ISSET(int fd, fd_set *fdset);

    stdio与connfd的复用

    如果使用echoserver迭代版本的同时,要求能够响应服务器本身的stdin输入,则可以使用select。一个标准输入的fd一般是0,监听fd则是系统分配

    • 将stdin和llistenfd加入fdset
    • 使用select等待是否有fd准备好
    • 通过FD_ISSET查看哪个准备好并进行相应的处理
        
    #include "mynet.h"
    int open_listenfd(char *port);
    void echo(int connfd);
    void command(void);
    
    int main(int argc, char **argv)
    {
        int listenfd, connfd;
        socklen_t clientlen;
        struct sockaddr_storage clientaddr;
        char client_hostname[MAXLINE], client_port[MAXLINE];
        fd_set read_set, ready_set;
    
        if(argc!=2){
          fprintf(stderr,"usage: %s <port>\n",argv[0]);
          exit(0);
        }
        listenfd = open_listenfd(argv[1]);
        FD_ZERO(&read_set); //clear read_set
        FD_SET(STDIN_FILENO, &read_set);    //add stdin to read_set
        FD_SET(listenfd,&read_set); //add listenfd to read_set
    
        while(1){
            ready_set = read_set;
            select(listenfd+1,&ready_set,NULL,NULL,NULL);
            if(FD_ISSET(STDIN_FILENO,&ready_set))
            {
                printf("stdin is ready.\n");
                command();
            }
            if(FD_ISSET(listenfd,&ready_set)){
                printf("client connetted.\n");
                clientlen = sizeof(struct sockaddr_storage);
                connfd = accept(listenfd, (SA*)&clientaddr,&clientlen);
                getnameinfo((SA*)&clientaddr,clientlen,client_hostname,MAXLINE,client_port,MAXLINE,0);
                echo(connfd);
                close(connfd);
            }
        }
        exit(0);
    }
    void command(void){
        char buf[MAXLINE];
        if(!fgets(buf,MAXLINE,stdin)){
            exit(0);    //EOF
        }
        printf("%s",buf);
    }
    void echo(int connfd)
    {
        size_t n;
        char buf[MAXLINE];
    
        while((n=read(connfd,buf,MAXLINE))!=0){
            printf("server recived %d bytes\n",(int)n);
            write(connfd,buf,n);
        }
    }
    

    server behavior:

    [root@linuxkit-00155ddc0103 inet]# ./select 6667
    do something
    stdin is ready.
    do something
    hello
    stdin is ready.
    hello
    client connetted.
    server recived 12 bytes
    ccd //no response here becuse client is connecting
    stdin is ready. //client close now
    ccd
    f
    stdin is ready.
    f
    stdin is ready. //ctrl+D here

    client behavior:

    [root@linuxkit-00155ddc0103 inet]# ./echoclient localhost 6667
    i am client
    i am client

    基于I/O多路复用的并发事件驱动器

    与之前类似,维护一个结构体pool,存储readset,readyset和clientfd

    • server监听,初始化pool
    • 进入循环,初始化readyset,select阻塞,等待fd准备好
    • 若listenfd,添加client
    • 对所有准备好的clientfd回射服务

    server:

    [root@linuxkit-00155ddc0103 inet]# ./echoserverselect 6667
    localhost:38334 connected!
    Server recived 5 (5 total) bytes on fd 4
    Server recived 4 (9 total) bytes on fd 4
    Server recived 7 (16 total) bytes on fd 4
    localhost:38338 connected!
    Server recived 4 (20 total) bytes on fd 5
    Server recived 6 (26 total) bytes on fd 5
    Server recived 2 (28 total) bytes on fd 5
    Close connfd 4
    Server recived 7 (35 total) bytes on fd 5
    Close connfd 5

    client1:

    [root@linuxkit-00155ddc0103 inet]# ./echoclient localhost 6667
    1234
    1234
    123
    123
    123456
    123456
    [root@linuxkit-00155ddc0103 inet]#

    client2:

    [root@linuxkit-00155ddc0103 inet]# ./echoclient localhost 6667
    123
    123
    12345
    12345
    1
    1
    123456
    123456
    [root@linuxkit-00155ddc0103 inet]#

    code

    #include "mynet.h"
    typedef struct{
        int maxfd;
        fd_set read_set;
        fd_set ready_set;
        int nready;
        int maxi;
        int clientfd[FD_SETSIZE];
    }pool;
    void add_client(int connfd, pool *p);
    void check_clients(pool *p);
    void init_pool(int listenfd, pool *p);
    int open_listenfd(char *port);
    
    int byte_cnt = 0;
    
    int main(int argc, char **argv)
    {
        int listenfd, connfd;
        socklen_t clientlen;
        struct sockaddr_storage clientaddr;
        char client_hostname[MAXLINE], client_port[MAXLINE];
        static pool pool;
    
        if(argc!=2){
          fprintf(stderr,"usage: %s <port>\n",argv[0]);
          exit(0);
        }
        listenfd = open_listenfd(argv[1]);
        init_pool(listenfd,&pool);
    
        while(1){
            pool.ready_set = pool.read_set;
            pool.nready = select(pool.maxfd+1,&pool.ready_set,NULL,NULL,NULL);
            if(FD_ISSET(listenfd,&pool.ready_set))
            {
                clientlen = sizeof(struct sockaddr_storage);
                connfd = accept(listenfd, (SA*)&clientaddr,&clientlen);
                getnameinfo((SA*)&clientaddr,clientlen,client_hostname,MAXLINE,client_port,MAXLINE,0);
                printf("%s:%s connected!\n",client_hostname,client_port);
                add_client(connfd, &pool);
            }
            check_clients(&pool);   
        }
        exit(0);
    }
    
    void init_pool(int listenfd, pool *p){
        int i;
        p->maxi = -1;
        for(i=0;i<FD_SETSIZE;i++)
          p->clientfd[i] = -1;
    
        p->maxfd = listenfd;
        FD_ZERO(&p -> read_set);
        FD_SET(listenfd, &p->read_set);
    }
    
    void add_client(int connfd, pool *p){
        int i;
        p->nready--;
        for(i=0;i<FD_SETSIZE;i++){
            if(p->clientfd[i]<0){
                p->clientfd[i] = connfd;
                FD_SET(connfd,&p->read_set);
                if(connfd,&p->maxfd)
                  p->maxfd = connfd;
                if(i>p->maxi)
                  p->maxi = i;
                break;
            }
            if(i == FD_SETSIZE)
              printf("Add client error! Beyond max client numbers.\n");
        }
    }
    
    void check_clients(pool *p)
    {
        int i,connfd,n;
        char buf[MAXLINE];
        char echostr[MAXLINE+10];
    
        for(i=0;(i<=p->maxi)&&(p->nready>0);i++){
            connfd = p->clientfd[i];
            sprintf(buf,"\0");
        
            if((connfd>0)&&(FD_ISSET(connfd,&p->ready_set))){
                p->nready--;
                if((n=read(connfd,buf,MAXLINE))!=0){
                    byte_cnt+=n;
                    printf("Server recived %d (%d total) bytes on fd %d\n",n,byte_cnt,connfd);
                    write(connfd,buf,n);
                }
                else{
                    printf("Close connfd %d\n",connfd);
                    close(connfd);
                    FD_CLR(connfd,&p->read_set);
                    p->clientfd[i] = -1;
                }
            }
        }
    }
    
    

    多线程

    server:

    [root@linuxkit-00155ddc0103 inet]# ./echoserverthread 6667
    name[localhost] port[38362]connected! Thread id:140562122241792 will serve it!
    name[localhost] port[38366]connected! Thread id:140562113849088 will serve it!
    server recived 6 bytes from fd 4
    server recived 4 bytes from fd 4
    server recived 7 bytes from fd 5
    server recived 4 bytes from fd 5

    client1:

    [root@linuxkit-00155ddc0103 inet]# ./echoclient localhost 6667
    12345
    12345
    123
    123

    client2:

    [root@linuxkit-00155ddc0103 inet]# ./echoclient localhost 6667
    654321
    654321
    321
    321

    注意传递connfd时是新申请空间,并在thread里释放
    code

    #include "mynet.h"
    int open_listenfd(char *port);
    void *thread(void *vargp);
    void echo(int connfd);
    
    int main(int argc, char **argv)
    {
        int listenfd, *connfdp, cno=1;
        socklen_t clientlen;
        pthread_t tid;
        struct sockaddr_storage clientaddr;
        char client_hostname[MAXLINE], client_port[MAXLINE];
    
        if(argc!=2){
          fprintf(stderr,"usage: %s <port>\n",argv[0]);
          exit(0);
        }
        listenfd = open_listenfd(argv[1]);
        while(1){
            clientlen = sizeof(struct sockaddr_storage);
            connfdp = malloc(sizeof(int));
            *connfdp = accept(listenfd, (SA*)&clientaddr,&clientlen);
            getnameinfo((SA*)&clientaddr,clientlen,client_hostname,MAXLINE,client_port,MAXLINE,0);
            pthread_create(&tid,NULL,thread,connfdp);
            printf("name[%s] port[%s]connected! Thread id:%ld will serve it!\n",
                        client_hostname,client_port,(unsigned long long)tid);
        }
        exit(0);
    }
    
    void *thread(void* vargp){
        int connfd = *((int *)vargp);
        pthread_detach(pthread_self());
        free(vargp);
        echo(connfd);
        close(connfd);
        return NULL;
    }
    
    void echo(int connfd)
    {
        size_t n;
        char buf[MAXLINE];
    
        while((n=read(connfd,buf,MAXLINE))!=0){
            printf("server recived %d bytes from fd %d\n",(int)n,connfd);
            write(connfd,buf,n);
        }
    }
    

    基于线程的事件驱动程序

    也称作预线程化的方法

    • 主线程将连接符connfd放入池中
    • 工作线程将connfd取出并处理
    • 加入线程和取出线程都使用互斥量保护
    • echo程序中的cnt也使用互斥量保护

    server

    [root@linuxkit-00155ddc0103 inet]# ./echoserver_pre_thread 6667
    client[localhost:38376] connected! connfd 4!
    server recived 7 (7 total) bytes on fd 4
    client[localhost:38380] connected! connfd 5!
    server recived 9 (16 total) bytes on fd 5
    server recived 4 (20 total) bytes on fd 4

    client1

    [root@linuxkit-00155ddc0103 inet]# ./echoclient localhost 6667
    123456
    123456
    123
    123

    client2

    [root@linuxkit-00155ddc0103 inet]# ./echoclient localhost 6667
    87654321
    87654321

    sbuf.c fd池,插入,取出操作

    #include "sbuf.h"
    
    void sbuf_init(sbuf_t *sp, int n){
        sp->buf = calloc(n, sizeof(int));
        sp->n = n;
        sp->front = sp->rear = 0;
        sem_init(&sp->mutex,0,1);
        sem_init(&sp->slots,0,n);
        sem_init(&sp->items,0,0);
    }
    
    void snuf_deinit(sbuf_t *sp){
        free(sp->buf);
    }
    
    void sbuf_insert(sbuf_t *sp, int item){
        sem_wait(&sp->slots);
        sem_wait(&sp->mutex);
        sp->buf[(++sp->rear)%(sp->n)] = item;
        sem_post(&sp->mutex);
        sem_post(&sp->items);
    }
    
    int sbuf_remove(sbuf_t *sp){
        int item;
        sem_wait(&sp->items);
        sem_wait(&sp->mutex);
        item = sp->buf[(++sp->front)%(sp->n)];
        sem_post(&sp->mutex);
        sem_post(&sp->slots);
        return item;
    }
    

    sbuf结构

    #ifndef SBUF_H_
    #define SBUF_H_
    #include "mynet.h"
    #include <semaphore.h>
    
    typedef struct {
        int *buf;
        int n;
        int front;
        int rear;
        sem_t mutex;
        sem_t slots;
        sem_t items;
    }sbuf_t;
    
    void dbuf_init(sbuf_t *sp, int n);
    void sbuf_deinit(sbuf_t *p);
    void sbuf_insert(sbuf_t *sp, int item);
    int sbuf_remove(sbuf_t *sp);
    #endif
    

    echo_cnt.c

    #include "echo_cnt.h"
    
    static int byte_cnt;
    static sem_t mutex;
    
    static void init_echo_cnt(void){
        sem_init(&mutex,0,1);
        byte_cnt = 0;
    }
    
    void echo_cnt(int connfd){
        int n;
        char buf[MAXLINE];
        static pthread_once_t once = PTHREAD_ONCE_INIT;
    
        pthread_once(&once, init_echo_cnt);
        while((n=read(connfd,buf,MAXLINE))!=0){
            sem_wait(&mutex);
            byte_cnt += n;
            printf("server recived %d (%d total) bytes on fd %d\n",
                        n,byte_cnt,connfd);
            sem_post(&mutex);
            write(connfd,buf,n);
        }
    }
    

    主程序

    #include "mynet.h"
    #include "sbuf.h"
    #include "echo_cnt.h"
    #define NTHREADS 4
    #define SBUFSIZE 16
    
    int open_listenfd(char *port);
    void *thread(void *vargp);
    
    sbuf_t sbuf;
    
    int main(int argc, char **argv)
    {
        int listenfd, connfd, i;
        socklen_t clientlen;
        pthread_t tid;
        struct sockaddr_storage clientaddr;
        char client_hostname[MAXLINE], client_port[MAXLINE];
    
        if(argc!=2){
          fprintf(stderr,"usage: %s <port>\n",argv[0]);
          exit(0);
        }
        listenfd = open_listenfd(argv[1]);
        sbuf_init(&sbuf,SBUFSIZE);
        for(i = 0;i<NTHREADS;i++){
            pthread_create(&tid,NULL,thread,NULL);
        }
        while(1){
            clientlen = sizeof(struct sockaddr_storage);
            connfd = accept(listenfd, (SA*)&clientaddr,&clientlen);
            getnameinfo((SA*)&clientaddr,clientlen,client_hostname,MAXLINE,client_port,MAXLINE,0);
            sbuf_insert(&sbuf,connfd);
            printf("client[%s:%s] connected! connfd %d!\n",
                        client_hostname,client_port,(int)connfd);
        }
        exit(0);
    }
    
    void *thread(void* vargp){
        pthread_detach(pthread_self());
        while(1){
            int connfd = sbuf_remove(&sbuf);
            echo_cnt(connfd);
            close(connfd);
        }
        return NULL;
    }
    

    相关文章

      网友评论

          本文标题:服务器的并发模式

          本文链接:https://www.haomeiwen.com/subject/aghrgqtx.html