美文网首页
Acceptor——封装socket系统调用并集成到Reacto

Acceptor——封装socket系统调用并集成到Reacto

作者: 老杜振熙 | 来源:发表于2020-12-04 09:40 被阅读0次

    注:本文为阅读了muduo网络库源码及作者著作之后对于网络库的复现和笔记

    功能

    我们定义一个class Acceptor,其功能是:让服务器在指定的端口处进行监听,如果在端口监听到连接,则执行由class Acceptor的类用户注册的回调函数。

    底层API

    首先梳理一下与Acceptor相关的底层API调用。

    • int socket(int domain, int type, int protocol)
      用于创建本地socket fd,domain指示网络的通信所在域,通常选择AF_INET即可,代表IPV4;type指示socket fd类型,对于TCP协议,因为是流式协议,加上我们的网络库的非阻塞特性,而通常还需指定CLOSE ON EXECVE,所以通常需要输入SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXECprotocol指示具体协议,这里我们选用IPPROTO_TCP

    • int bind(int sockfd, const struct sockaddr *addr, socklen_t addrLen)
      用于将socket()生成的socket fd与服务器的监听地址(IP加端口)进行绑定。

    • int listen(sockfd, int backlog)
      对生成的套接字进行实际的监听,backlog指定能够同时容纳的最多监听个数。

    • int accept(int sockfd, struct sockaddr *peerAddr, socklen_t addrLen )
      用于监听到了“来电”之后,接受对应的TCP连接,返回对端(客户端)的socket fd,并将对端的地址填入到peerAddr中。

    模块拆分

    之前的Reactor模式是一个经典的IO多路复用模式,我们已经用一个class EventLoop抽象出了整个多路复用的网络模型,接下来就是将这个模型用起来,去构建实际的socket网络程序了。

    socket网络编程设计到了繁多的底层API接口,在现代C++特性之中,自然也要对其进行合理的封装,才能发挥出语言的最大优势。首先绘制一下服务端的对于TCP连接的接收过程,如图1所示。

    对于底层API而言,"socket() 到 bind() "是一条龙式的操作,因此可以定义一个class Socket来封装这个过程。服务器根据这一套流程建立本地的socket。随后当开始决定监听之后,则开始进行"listen()",当探测到连接请求的时候, 开始" accept()",得到对端的socket以及网络地址,然后就可以调用用户注册的回调函数了。需要注意的是图1中的handleRead()子框,这个子框才是Acceptor中的Channel的readable回调。原因其实很简单,因为对于服务器而言,当探测到有客户端发起连接请求之后,服务器的callback应该是先建立连接,再执行用户回调,那么“建立连接+执行用户回调”的整个过程才是“客户端发起连接请求”这个readable事件的回调。

    图1. Acceptor工作流程示意

    代码实战

    /* Socket.h */
    #ifndef SOCKET_H
    #define SOCKET_H
    
    #include "muduo/base/noncopyable.h"
    #include <muduo/net/InetAddress.h>
    #include <boost/noncopyable.hpp>
    
    class Socket: boost::noncopyable
    {
    private:
      const int sockfd_;
    
    public:
      int fd() const { return sockfd_; }
      void bindAddress(const muduo::net::InetAddress& localaddr);
      void listen();
    
      // fill the peeraddr and return peer connection fd
      // If failed, return -1;
      int accept(muduo::net::InetAddress* peeraddr);
    
      // SO_REUSEADDR
      void setReuseAddr(bool on);
    
      // SO_REUSEPORT
      void setReusePort(bool on);
      explicit Socket(const int &fd);
      ~Socket();
    };
    
    #endif /* SOCKET_H */
    
    
    #include "Socket.h"
    #include "muduo/base/Logging.h"
    #include "muduo/net/InetAddress.h"
    #include "SockOptions.h"
    
    #include <netinet/in.h>
    #include <netinet/tcp.h>
    #include <stdio.h>  // snprintf
    
    Socket::Socket(const int &fd):
      sockfd_(fd)
    {
    
    }
    
    Socket::~Socket(){
      sockoptions::close(sockfd_);
    }
    
    void Socket::bindAddress(const muduo::net::InetAddress &localaddr){
      sockoptions::bindOrDie(sockfd_, localaddr.getSockAddr());
    }
    
    void Socket::listen(){
      sockoptions::listenOrDie(sockfd_);
    }
    
    int Socket::accept(muduo::net::InetAddress *peeraddr){
      sockaddr_in6 tmpAddr;
      bzero(&tmpAddr, sizeof tmpAddr);
      int connfd = sockoptions::accept(sockfd_, &tmpAddr);
      peeraddr->setSockAddrInet6(tmpAddr);
      return connfd;
    }
    
    void Socket::setReuseAddr(bool on){
      int reused = on;
      int len = static_cast<socklen_t>(sizeof reused);
      int ret = ::setsockopt(sockfd_, SOL_SOCKET, SO_REUSEADDR, &reused, len);
      if(ret < 0){
        LOG_ERROR << "setReuseAddr falied";
      }
    }
    
    void Socket::setReusePort(bool on){
      int reused = on;
      int len = static_cast<socklen_t>(sizeof reused);
      int ret = ::setsockopt(sockfd_, SOL_SOCKET, SO_REUSEPORT, &reused, len);
      if(ret < 0){
        LOG_ERROR << "setReuseAddr falied";
      }
    }
    
    
    /* Acceptor.h */
    #ifndef ACCEPTOR_H
    #define ACCEPTOR_H
    
    #include <functional>
    #include <muduo/net/InetAddress.h>
    #include <memory>
    #include <boost/noncopyable.hpp>
    
    #include "../Reactor/Channel.h"
    #include "Socket.h"
    
    class EventLoop;
    
    class Acceptor : boost::noncopyable
    {
      // sending fd handle is not an ideal solution, better solution is 
      // sending a Socket object which uses RAII
      using ConnCallback = std::function<void (int, const muduo::net::InetAddress &)>;
      // using ConnCallback = std::function<void (Socket, const muduo::net::InetAddress &)>;
    private:
      EventLoop *loop_;
      ConnCallback cb_;
      std::unique_ptr<Socket> socket_;
      Channel socketChannel_;
      bool listening_;
    
    public:
      Acceptor(EventLoop *loop, const muduo::net::InetAddress & localAddr);
     ~Acceptor();
    
     bool isListening() const {
       return listening_;
     }
    
     void listen();
     void handleRead();
     void setNewConnectionCallback(const ConnCallback &func);
     
    };
    
    #endif /* ACCEPTOR_H */
    
    
    /* Accpetor.cc */
    #include <muduo/base/Logging.h>
    
    #include "../Reactor/EventLoop.h"
    #include "../Reactor/Channel.h"
    #include "Acceptor.h"
    #include "SockOptions.h"
    #include "Socket.h"
    
    Acceptor::Acceptor(EventLoop *loop, const muduo::net::InetAddress &localAddr)
      :loop_(loop),
      socket_(new Socket (sockoptions::createNonblockingOrDie(AF_INET))),
      socketChannel_(loop, socket_->fd()),
      listening_(false)
    {
      socket_->setReuseAddr(true);
      socket_->bindAddress(localAddr);
      socketChannel_.setReadCallback(std::bind(&Acceptor::handleRead, this));
    }
    
    Acceptor::~Acceptor()
    {
    
    }
    
    void Acceptor::listen(){
      loop_->assertInLoopThread();
      listening_ = true;
      socket_->listen(); // call the socket API `listen()`
      socketChannel_.enableRead(); // ready to call the callback
    }
    
    void Acceptor::handleRead(){
      muduo::net::InetAddress addr; // this is the perr address
      int connfd = socket_->accept(&addr);
      if(connfd < 0){
        LOG_FATAL << "Acceptor - socket accept failed";
        return ;
      }
      if(cb_){
        cb_(connfd, addr);
      } else {
        LOG_ERROR << "Acceptor - NewConnectionCallback unset";
      }
    }
    
    void Acceptor::setNewConnectionCallback(const ConnCallback &func){
      cb_ = func;
    }
    
    
    
    /* sockOptions.h */
    #include <arpa/inet.h>
    
    
    namespace sockoptions
    {
    
    int createNonblockingOrDie(sa_family_t family);
    void bindOrDie(int sockfd, const struct sockaddr* addr);
    void listenOrDie(int sockfd);
    void close(int sockfd);
    int  accept(int sockfd, struct sockaddr_in6* addr);
    const struct sockaddr* sockaddr_cast(const struct sockaddr_in* addr);
    const struct sockaddr* sockaddr_cast(const struct sockaddr_in6* addr);
    struct sockaddr* sockaddr_cast(struct sockaddr_in6* addr);
    
    }
    
    
    /* sockOptions.h */
    #include "./SockOptions.h"
    
    #include "muduo/base/Logging.h"
    #include "muduo/base/Types.h"
    #include "muduo/net/Endian.h"
    
    #include <errno.h>
    #include <fcntl.h>
    #include <stdio.h>  // snprintf
    #include <sys/socket.h>
    #include <sys/uio.h>  // readv
    #include <unistd.h>
    
    int sockoptions::createNonblockingOrDie(sa_family_t family){
      // stream == tcp
      int sockfd = ::socket(family,
                            SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 
                            IPPROTO_TCP);
      if(sockfd < 0){
        LOG_FATAL << "Create socket failed!";
      }
      return sockfd;
    }
    
    void sockoptions::bindOrDie(int sockfd, const struct sockaddr *addr){
      socklen_t len = static_cast<socklen_t>(sizeof(*addr));
      int ret = ::bind(sockfd, addr, len);
      if(ret < 0){
        LOG_FATAL << "Bind address failed!";
      }
    }
    
    void sockoptions::listenOrDie(int sockfd){
      int ret = ::listen(sockfd, SOMAXCONN);
      if(ret < 0){
        LOG_FATAL << "Listen socket failed!";
      }
    }
    
    void sockoptions::close(int sockfd){
      int ret = ::close(sockfd);
      if(ret < 0){
        LOG_FATAL << "Close sockfd falied!";
      }
    }
    
    int sockoptions::accept(int sockfd, struct sockaddr_in6 *addr){
      struct sockaddr *sa = sockaddr_cast(addr);
      socklen_t len = static_cast<socklen_t>(sizeof(*addr));
      int connfd = ::accept(sockfd, sa, &len);
      if(connfd < 0){
        LOG_FATAL << "Accept socket failed!";
      }
      return connfd;
    }
    
    const struct sockaddr * sockoptions::sockaddr_cast(const struct sockaddr_in *addr){
      return static_cast<const struct sockaddr *>(muduo::implicit_cast<const void*>(addr));
    }
    
    const struct sockaddr * sockoptions::sockaddr_cast(const struct sockaddr_in6 *addr){
      return static_cast<const struct sockaddr *>(muduo::implicit_cast<const void*>(addr));
    }
    
    struct sockaddr * sockoptions::sockaddr_cast(struct sockaddr_in6 *addr){
      return static_cast<struct sockaddr *>(muduo::implicit_cast<void *>(addr));
    }
    
    
    
    /* main_Accpetor.cc */
    #include "../Reactor/EventLoop.h"
    #include "Acceptor.h"
    #include <muduo/net/InetAddress.h>
    
    void callbackFunc(int connfd, const muduo::net::InetAddress &addr){
      printf("A new connection comming from %s\n", addr.toIpPort().c_str());
      char msg[] = "Hello, I can hear you calling me\n";
      ::write(connfd, msg, sizeof msg);
    }
    
    int main(int argc, char *argv[])
    {
      EventLoop loop;
      muduo::net::InetAddress localAddr(2333);
      muduo::net::InetAddress localAddr2(3332);
      Acceptor acceptor(&loop, localAddr);
      Acceptor acceptor2(&loop, localAddr2);
      acceptor.setNewConnectionCallback(&callbackFunc);
      acceptor2.setNewConnectionCallback(&callbackFunc);
      acceptor.listen();
      acceptor2.listen();
      loop.loop();
      return 0;
    }
    
    

    运行结果

    Acceptor运行结果(新建一个终端去监听本地的2333端口和3332端口)

    实现过程中的一些知识点总结

    1. sockaddrsockaddr_in
      两者之间是相互补充的关系。大多数诸如::bind()的底层socket API使用struct sockaddr *作为入参类型,但sockaddr的小缺陷是,它将IP端口和IP地址混在了一个变量中,故赋值时不太方便。sockaddr_in弥补了这一问题,它将端口和地址进行了分离,同时为了适配sockaddr,又填充了一些不使用的变量,使得两种结构体的内存分布完全一致,因此两种类型的指针可以相互转换。总结一下:sockaddr_in简化了变量的赋值,sockaddr用于函数的传参。

    2. implicit_cast
      该类型转换应该是为了更安全的进行精确类型转换,当使用implicit_cast的时候,编译器会去检查该转换是否安全。(不过该cast暂时还没有纳入标准库)

    3. SOMAXCONN
      在调用::listen()的时候需要指定最多可以支持多少连接请求,为此系统定义了一个专门的宏SOMAXCONN用来表示系统所支持的最多请求个数。

    4. setsockopt()
      网络通信会在不同的层级或者协议之中拥有不同的设置选项,setsockopt()的功能就是将设置这些选项都抽象到了顶层的socket这一层级。比如说要设置socket底层API层级的某些选项,抑或是设置TCP协议中的某些选项,都通过该函数来执行。

    • FD_CLOEXEC
      默认情况下,使用fork() + execve()开启新的子进程时,父进程打开的文件描述符fd不会被关闭。为了让用户能够自己控制是否在execve()后关闭fd,系统设定了一个全局的变量,变量中的各个标志位代表不同的fd,如果设定了FD_CLOEXEC,则会在启动子进程时关闭对应的fd。相应的SOCK_CLOEXECTFD_CLOEXEC可以理解是对FD_CLOEXEC的继承,它们分别为timer fdsocket fd服务

    TcpServer

    TcpServer是Acceptor的直接使用者,其实只需要将TcpServer的Callback注册到Acceptor里面即可

    相关文章

      网友评论

          本文标题:Acceptor——封装socket系统调用并集成到Reacto

          本文链接:https://www.haomeiwen.com/subject/fciziktx.html