美文网首页
muduo源码分析之TcpServer模块

muduo源码分析之TcpServer模块

作者: shicoder | 来源:发表于2022-04-24 20:52 被阅读0次

    这次我们开始muduo源代码的实际编写,首先我们知道muduoLT模式,Reactor模式,下图为Reactor模式的流程图[来源1]

    image-20220220154310731

    然后我们来看下muduo的整体架构[来源1]

    muduo

    首先muduo有一个主反应堆mainReactor以及几个子反应堆subReactor,其中子反应堆的个数由用户使用setThreadNum函数设置,mainReactor中主要有一个Acceptor,当用户建立新的连接的时候,Acceptor会将connfd和对应的事件打包为一个channel然后采用轮询的算法,指定将该channel给所选择的subReactor,以后该subReactor就负责该channel的所有工作。

    TcpServer类

    我们按照从上到下的思路进行讲解,以下内容我们按照一个简单的EchoServer的实现思路来讲解,我们知道当我们自己实现一个Server的时候,会在构造函数中实例化一个TcpServer

    EchoServer(EventLoop *loop,
               const InetAddress &addr, 
               const std::string &name)
        : server_(loop, addr, name)
            , loop_(loop)
        {
            // 注册回调函数
            server_.setConnectionCallback(
                std::bind(&EchoServer::onConnection, this, std::placeholders::_1)
            );
    
            server_.setMessageCallback(
                std::bind(&EchoServer::onMessage, this,
                          std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)
            );
    
            // 设置合适的loop线程数量 loopthread 不包括baseloop
            server_.setThreadNum(3);
        }
    

    于是我们去看下TcpServer的构造函数是在干什么

    TcpServer::TcpServer(EventLoop *loop,
                    const InetAddress &listenAddr,
                    const std::string &nameArg,
                    Option option)
                    : loop_(CheckLoopNotNull(loop))
                    , ipPort_(listenAddr.toIpPort())
                    , name_(nameArg)
                    , acceptor_(new Acceptor(loop, listenAddr, option == kReusePort))
                    , threadPool_(new EventLoopThreadPool(loop, name_))
                    , connectionCallback_()
                    , messageCallback_()
                    , nextConnId_(1)
                    , started_(0)
    {
        // 当有新用户连接时候,会执行该回调函数
        acceptor_->setNewConnectionCallback(std::bind(&TcpServer::newConnection, this, 
            std::placeholders::_1, std::placeholders::_2));
    }
    

    我们只需要关注acceptor_(new Acceptor(loop, listenAddr, option == kReusePort))threadPool_(new EventLoopThreadPool(loop, name_))
    首先很明确的一点,构造了一个Acceptor,我们首先要知道Acceptor主要就是连接新用户并打包为一个Channel,所以我们就应该知道Acceptor按道理应该实现socketbindlistenaccept这四个函数。

    Acceptor::Acceptor(EventLoop *loop, const InetAddress &listenAddr, bool reuseport)
        : loop_(loop), acceptSocket_(createNonblocking()) // socket
          ,
          acceptChannel_(loop, acceptSocket_.fd()), listenning_(false)
    {
        acceptSocket_.setReuseAddr(true);
        acceptSocket_.setReusePort(true);
        acceptSocket_.bindAddress(listenAddr); // 绑定套接字
        // 有新用户的连接,执行一个回调(打包为channel)
        acceptChannel_.setReadCallback(std::bind(&Acceptor::handleRead, this));
    }
    

    其中Acceptor中有个acceptSocket_,其实就是我们平时所用的listenfd,构造函数中实现了socketbind,而其余的两个函数的使用在其余代码

    // 开启服务器监听
    void TcpServer::start()
    {
        // 防止一个TcpServer被start多次
        if (started_++ == 0) 
        {
            threadPool_->start(threadInitCallback_); // 启动底层的loop线程池,这里会按照设定了threadnum设置pool的数量
            loop_->runInLoop(std::bind(&Acceptor::listen, acceptor_.get()));
        }
    }
    

    我们知道,当我们设置了threadnum之后,就会有一个mainloop,那么这个loop_就是那个mainloop,其中可以看见这个loop_就只做一个事情Acceptor::listen

    void Acceptor::listen()
    {
        listenning_ = true;
        acceptSocket_.listen();         // listen
        acceptChannel_.enableReading(); // acceptChannel_ => Poller
    }
    
    

    这里就实现了listen函数,还有最后一个函数accept,我们慢慢向下分析,从代码可以知道acceptChannel_.enableReading()之后就会使得这个listenfd所在的channel对读事件感兴趣,那什么时候会有读事件呢,就是当用户建立新连接的时候,那么我们应该想一下,那当感兴趣的事件发生之后,listenfd应该干什么呢,应该执行一个回调函数呀。注意Acceptor构造函数中有这样一行代码acceptChannel_.setReadCallback(std::bind(&Acceptor::handleRead, this));这就是那个回调,我们去看下handleRead在干嘛。

    // listenfd有事件发生了,就是有新用户连接了
    void Acceptor::handleRead()
    {
        InetAddress peerAddr;
        int connfd = acceptSocket_.accept(&peerAddr);
        if (connfd >= 0)
        {
            // 若用户实现定义了,则执行,否则说明用户对新到来的连接没有需要执行的,所以直接关闭
            if (newConnectionCallback_)
            {
                newConnectionCallback_(connfd, peerAddr); // 轮询找到subLoop,唤醒,分发当前的新客户端的Channel
            }
            else
            {
                ::close(connfd);
            }
        }
        ...
    }
    

    这里是不是就实现了accept函数,至此当用户建立一个新的连接时候,Acceptor就会得到一个connfd和其对应的peerAddr返回给mainloop,这时候我们在注意到TcpServer构造函数中有这样一行代码acceptor_->setNewConnectionCallback(std::bind(&TcpServer::newConnection, this,std::placeholders::_1, std::placeholders::_2));我们给acceptor_设置了一个newConnectionCallback_,于是由上面的代码就可以知道,if (newConnectionCallback_)为真,就会执行这个回调函数,于是就会执行TcpServer::newConnection,我们去看下这个函数是在干嘛。

    void TcpServer::newConnection(int sockfd, const InetAddress &peerAddr)
    {
        // 轮询算法选择一个subloop来管理对应的这个新连接
        EventLoop *ioLoop = threadPool_->getNextLoop(); 
        char buf[64] = {0};
        snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_);
        ++nextConnId_;
        std::string connName = name_ + buf;
    
        LOG_INFO("TcpServer::newConnection [%s] - new connection [%s] from %s \n",
            name_.c_str(), connName.c_str(), peerAddr.toIpPort().c_str());
    
        // 通过sockfd获取其绑定的本地ip和端口
        sockaddr_in local;
        ::bzero(&local, sizeof local);
        socklen_t addrlen = sizeof local;
        if (::getsockname(sockfd, (sockaddr*)&local, &addrlen) < 0)
        {
            LOG_ERROR("sockets::getLocalAddr");
        }
        InetAddress localAddr(local);
    
        // 根据连接成功的sockfd,创建TcpConnection
        TcpConnectionPtr conn(new TcpConnection(
                                ioLoop,
                                connName,
                                sockfd,   // Socket Channel
                                localAddr,
                                peerAddr));
        connections_[connName] = conn;
        // 下面的回调时用户设置给TcpServer,TcpServer又设置给TcpConnection,TcpConnetion又设置给Channel,Channel又设置给Poller,Poller通知channel调用这个回调
        conn->setConnectionCallback(connectionCallback_);
        conn->setMessageCallback(messageCallback_);
        conn->setWriteCompleteCallback(writeCompleteCallback_);
    
        // 设置了如何关闭连接的回调
        conn->setCloseCallback(
            std::bind(&TcpServer::removeConnection, this, std::placeholders::_1)
        );
    
        // 直接调用connectEstablished
        ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));
    }
    

    这里就比较长了,我先说下大概他干了啥事情:首先通过轮询找到下一个subloop,然后将刚刚返回的connfd和对应的peerAddr以及localAddr构造为一个TcpConnectionsubloop,然后给这个conn设置了一系列的回调函数,比如读回调,写回调,断开回调等等。下一章我们来说下上面的代码最后几行在干嘛。

    自己的网址:www.shicoder.top
    欢迎加群聊天 452380935
    本文由博客一文多发平台 OpenWrite 发布!

    相关文章

      网友评论

          本文标题:muduo源码分析之TcpServer模块

          本文链接:https://www.haomeiwen.com/subject/auhxyrtx.html