美文网首页服务器开发
一起来写web server 05 -- 多线程进阶版本

一起来写web server 05 -- 多线程进阶版本

作者: Yihulee | 来源:发表于2016-10-29 11:26 被阅读4次

    这个版本的web server比第4版稍微做了一点改进,那就是由主线程统一接收连接,然后连接的处理由子线程来完成.因此,这里就引入了条件变量以及同步互斥的问题.

    同步机制

    muduo库中有一个关于同步机制的封装,我这里就直接采用了.我这里来介绍一下这个封装吧.

    下面是Conditon这个类的代码:

    class Condition : noncopyable
    {
        private:
            MutexLock& mutex_; /* 之前的锁的一个引用 */
            pthread_cond_t pcond_; /* 系统定义的条件变量的类型 */
            ... ...
    }
    

    这个类的构造函数用于初始化同步变量:

    explicit Condition(MutexLock& mutex)
            : mutex_(mutex)
        {
            pthread_cond_init(&pcond_, NULL); /* 初始化同步变量 */
        }
    

    析构函数就销毁掉同步变量:

    ~Condition()
        {
            pthread_cond_destroy(&pcond_); /* 销毁条件变量 */
        }
    

    等待某个条件:

    void wait()
        {
            MutexLock::UnassignGuard ug(mutex_);
            pthread_cond_wait(&pcond_, mutex_.getPthreadMutex()); /* 等待Mutex */
        }
    

    通知单个线程:

    void notify()
        {
            pthread_cond_signal(&pcond_); /* 唤醒一个线程 */
        }
    

    条件变量只有一种正确的使用方式,几乎不可能用错,对于wait端:

    1. 必须与mutex一起使用,该布尔表达式的读写需受此mutex保护.
    2. mutex已经上锁的时候才能调用wait().
    3. 把判断布尔条件和wait()放到while循环中.
      写成代码是这个样子的:
    MutexLock mutex;
    Condition cond(mutex);
    std::deque<int> queue;
    
    int dequeue() {
        MutexLockGuard lock(mutex); /* 加锁 */
        while (queue.empty()) {
            cond.wait(); 
        }
        assert(!queue.empty());
        int top = queue.front();
        queue.pop_front();
        return top;
    }
    

    对于sinal/broadcast端:

    1. 不一定要在mutex已经上锁的情况下调用signal(理论上).
    2. signal之前一般要修改布尔表达式.
    3. 修改布尔表达式通常要用mutex保护.
    4. 注意区分signalbroadcast:"broadcast"通常用于表明状态变化,而signal表示资源可用.
      写成代码是:
    void enqueue(int x) 
    {
        MutexLockGuard lock(mutex); // 加锁
        queue.push_back(x);
        cond.signal(); // 可以移出临界区之外
    }
    

    以上引自linux多线程服务端编程.

    我来谈一下我的理解:

    cond中之所以需要mutex,是因为在执行到

    while (condition) {
     cond.wait();
    }
    

    时,需要将cond中持有的mutex解锁.一旦接收到signal,它需要重新抢夺这个mutex,抢到了,才能从wait函数中返回.

    为什么cond.wait()要放入while循环中呢?一方面是因为spurious wakeup,之所以会有这个东西,是速度的考量,一般来说,即使没有spurious wakeup,你也要这么写代码,举个栗子.

    在生产者消费者模型之中,消费者1获得锁,发现queue为空,wait,消费者2获得锁,发现queue为空,wait,生产者3获得锁,将生产的产品放入queue,调用signal,并且释放了mutex,t1,t2被唤醒,可以预见的是,这两者只会有一个获得锁,消费完这个产品,然后另一个获得锁,发现为空,还是得继续等待,这就是while的由来,当然,至于signal为什么会唤醒多个线程,man手册上就是这么说的.

    我们的代码

    ```cpp
    /*-
    * 线程池的加强版本.主要是主线程统一接收连接,其余都是工作者线程,这里的布局非常类似于一个生产者.
    * 多个消费者.
    */
    
    #define MAXNCLI 100
    
    MutexLock mutex; /* 全局的锁 */
    Condition cond(mutex); /* 全局的条件变量 */
    int clifd[MAXNCLI], iget, iput;
    
    int main(int argc, char *argv[])
    {
        int listenfd = Open_listenfd(8080); /* 8080号端口监听 */
        signal(SIGPIPE, SIG_IGN);
        pthread_t tids[10];
        void* thread_main(void *);
    
        for (int i = 0; i < 10; ++i) {
            int *arg = (int *)Malloc(sizeof(int));
            *arg = i;
            Pthread_create(&tids[i], NULL, thread_main, (void *)arg);
        }
        struct sockaddr cliaddr; /* 用于存储对方的ip信息 */
        socklen_t clilen;
        for (; ; ) {
            int connfd = Accept(listenfd, &cliaddr, &clilen);
            {
                MutexLockGuard lock(mutex); /* 加锁 */
                clifd[iput] = connfd; /* 涉及到对共享变量的修改,要加锁 */
                if (++iput == MAXNCLI) iput = 0;
                if (iput == iget) unix_error("clifd is not big enough!\n");
            }
            cond.notify(); /* 通知一个线程有数据啦! */
        }
        return 0;
    }
    

    线程的代码是这样的:

    void*
    thread_main(void *arg)
    {
        int connfd;
        printf("thread %d starting\n", *(int *)arg);
        Free(arg);
        for ( ; ;) {
            {
                MutexLockGuard lock(mutex); /* 加锁 */
                while (iget == iput) { /* 没有新的连接到来 */
                    /*-
                    * 代码必须用while循环来等待条件变量,原因是spurious wakeup
                    */
                    cond.wait(); /* 这一步会原子地unlock mutex并进入等待,wait执行完毕会自动重新加锁 */
                }
                connfd = clifd[iget]; /* 获得连接套接字 */
                if (++iget == MAXNCLI) iget = 0;
            }
            doit(connfd);
            close(connfd);
        }
    }
    

    总结

    这个版本在原来的版本上增加了同步互斥操作,在某种程度上增加了难度.

    具体代码还是看这里吧!:https://github.com/lishuhuakai/Spweb

    相关文章

      网友评论

        本文标题:一起来写web server 05 -- 多线程进阶版本

        本文链接:https://www.haomeiwen.com/subject/oeqyuttx.html