美文网首页
skynet源码分析(8)--skynet的网络

skynet源码分析(8)--skynet的网络

作者: 天一阁图书管理员 | 来源:发表于2017-08-21 12:44 被阅读565次

    作者:shihuaping0918@163.com,转载请注明作者
    网络部分是一个服务器最基础最核心的部分,这个技术也已经是非常成熟了,现在已经很少有人自己实现一个网络相关的库了。skynet的网络库是自己实现的。

    网络底层的技术在windows上是完成端口(IOCP),在linux上是EPOLL,在mac/freebsd上是kqueue。这些技术都是能够承载高负载高并发网络请求的。IOCP和EPOLL都是异步IO,kqueue我不熟悉,就不敢评论了。

    实际上云风只实现了epoll和kqueue,windows上的变种请自行搜索吧。

    epoll和kqueue的实现分别在skynet_epoll.h和epoll_kqueue.h当中。epoll的函数其实就是epoll_create/epoll_ctl/epoll_del/epoll_wait这几个,要注意的是skynet中的epoll_create的参数是1024。所以连接数上不去的话很可能就是这里限制了。

    skynet在skynet_poll.h中根据平台的不同包含了不同的头文件,屏蔽了平台相关性。然后在socket_server.c中实现了网络服务的逻辑。

    然后skynet在skynet_socket.c中对socket_server.c中的逻辑再次做了一个封装,还添加了socket客户端相关的函数,就是connect/send/close之类的函数。

    为了方便lua层使用socket,在lua-socket.c中再将对skynet_socket.c进行了一次封装。这个封装就是c语言层和lua语言层的相互转换。目前只支持tcp和udp,基于tcp上的http/websocket之类统统是不支持的。

    前面讲到socket有一个单独的线程,这个线程的代码如下:

    static void *
    thread_socket(void *p) {
        struct monitor * m = p;
        skynet_initthread(THREAD_SOCKET);
        for (;;) {
            int r = skynet_socket_poll(); //看这里
            if (r==0)
                break;
            if (r<0) {
                CHECK_ABORT
                continue;
            }
            wakeup(m,0);
        }
        return NULL;
    }
    

    socket线程一直调用skynet_socket_poll,这和普通网络服务器写法是一样的。普通网络服务器也是创建socket,绑定socket,添加到epoll,然后epoll_wait等待事件的发生。

    skynet中的的连接会有一个状态流转的过程,可以理解为状态机。

    #define SOCKET_TYPE_LISTEN 3  //监听
    #define SOCKET_TYPE_CONNECTING 4 //连接中
    #define SOCKET_TYPE_CONNECTED 5 //已连接
    #define SOCKET_TYPE_HALFCLOSE 6 //半双工,半连接
    #define SOCKET_TYPE_PACCEPT 7 //有连接进来
    #define SOCKET_TYPE_BIND 8 //绑定
    

    再来重点看一下socket_server.c这个文件,它做了很多事情,代码量也比较大,有1800多行。这个文件要详细的讲呢,一是篇幅会特别大,二是大部分是网络操作,也和skynet本身没太大关联性。所以就不会细讲了,只会挑一些我认为比较重要的东西去讲。这一篇主要讲一下对网络的控制命令。

    下面列出来的都是消息类型,用一个字符来表示,很不直接,很不好记
    /*
        The first byte is TYPE
    
        S Start socket
        B Bind socket
        L Listen socket
        K Close socket
        O Connect to (Open)
        X Exit
        D Send package (high)
        P Send package (low)
        A Send UDP package
        T Set opt
        U Create UDP socket
        C set udp address
     */
    //请求消息头,上面的操作都是通过发送消息来实现的
    struct request_package {
        uint8_t header[8];  // 6 bytes dummy
        union {
            char buffer[256];
            struct request_open open;
            struct request_send send;
            struct request_send_udp send_udp;
            struct request_close close;
            struct request_listen listen;
            struct request_bind bind;
            struct request_start start;
            struct request_setopt setopt;
            struct request_udp udp;
            struct request_setudp set_udp;
        } u;
        uint8_t dummy[256];
    };
    
    这里就是处理请求的地方
    // return type
    static int
    ctrl_cmd(struct socket_server *ss, struct socket_message *result) {
        int fd = ss->recvctrl_fd;
        // the length of message is one byte, so 256+8 buffer size is enough.
        uint8_t buffer[256];
        uint8_t header[2];
        block_readpipe(fd, header, sizeof(header));
        int type = header[0];
        int len = header[1];
        block_readpipe(fd, buffer, len);
        // ctrl command only exist in local fd, so don't worry about endian.
        switch (type) {
        case 'S':
            return start_socket(ss,(struct request_start *)buffer, result);
        case 'B':
            return bind_socket(ss,(struct request_bind *)buffer, result);
        case 'L':
            return listen_socket(ss,(struct request_listen *)buffer, result);
        case 'K':
            return close_socket(ss,(struct request_close *)buffer, result);
        case 'O':
            return open_socket(ss, (struct request_open *)buffer, result);  //在这里面调用connect
        case 'X':
            result->opaque = 0;
            result->id = 0;
            result->ud = 0;
            result->data = NULL;
            return SOCKET_EXIT;
        case 'D':
            return send_socket(ss, (struct request_send *)buffer, result, PRIORITY_HIGH, NULL);
        case 'P':
            return send_socket(ss, (struct request_send *)buffer, result, PRIORITY_LOW, NULL);
        case 'A': {
            struct request_send_udp * rsu = (struct request_send_udp *)buffer;
            return send_socket(ss, &rsu->send, result, PRIORITY_HIGH, rsu->address);
        }
        case 'C':
            return set_udp_address(ss, (struct request_setudp *)buffer, result);
        case 'T':
            setopt_socket(ss, (struct request_setopt *)buffer);
            return -1;
        case 'U':
            add_udp_socket(ss, (struct request_udp *)buffer);
            return -1;
        default:
            fprintf(stderr, "socket-server: Unknown ctrl %c.\n",type);
            return -1;
        };
    
        return -1;
    }
    
    

    以‘O'为例分析一下这个流程,skynet是基于消息的,这个是它的设计理念。所以对socket的操作,也都是基于消息的。它是怎么做的呢,首先创建一个recvctrl_fd,把消息发到recvctrl_fd。然后每次socket_server_poll被调用的时候,会select这个recvctrl_fd,看有没消息。如果有就调用上面的crl_cmd函数。

    过程大致都讲清楚了,现在来看'O’这个命令,也就是打开一个连接是怎么一个过程。
    1.lua层的connect函数对应的是lconnect函数,

    static int
    lconnect(lua_State *L) {
        size_t sz = 0;
        const char * addr = luaL_checklstring(L,1,&sz);
        char tmp[sz];
        int port = 0;
        const char * host = address_port(L, tmp, addr, 2, &port);
        if (port == 0) {
            return luaL_error(L, "Invalid port");
        }
        struct skynet_context * ctx = lua_touserdata(L, lua_upvalueindex(1));
        int id = skynet_socket_connect(ctx, host, port);  //将Lua数据转为c数据以后调c函数
        lua_pushinteger(L, id);
    
        return 1;
    }
    
    

    2.lconnect函数调用了skynet_socket_connect函数。

    int 
    skynet_socket_connect(struct skynet_context *ctx, const char *host, int port) {
        uint32_t source = skynet_context_handle(ctx);
        return socket_server_connect(SOCKET_SERVER, source, host, port);
    }
    

    3.skynet_socket_connect函数又调了socket_server_connect

    int 
    socket_server_connect(struct socket_server *ss, uintptr_t opaque, const char * addr, int port) {
        struct request_package request;
        int len = open_request(ss, &request, opaque, addr, port);
        if (len < 0)
            return -1;
        send_request(ss, &request, 'O', sizeof(request.u.open) + len); //注意'O’
        return request.u.open.id;
    }
    

    4.socket_server_connect调用了一个叫send_request的函数

    static void
    send_request(struct socket_server *ss, struct request_package *request, char type, int len) {
        request->header[6] = (uint8_t)type;
        request->header[7] = (uint8_t)len;
        for (;;) {
    //注意write和ss->send_ctrl_fd
            ssize_t n = write(ss->sendctrl_fd, &request->header[6], len+2);
            if (n<0) {
                if (errno != EINTR) {
                    fprintf(stderr, "socket-server : send ctrl command error %s.\n", strerror(errno));
                }
                continue;
            }
            assert(n == len+2);
            return;
        }
    }
    

    到这个函数以后,比较清楚地看到,数据被发送到sendctrl_fd这个描述符上了。
    5.从send_request来看,数据明明是发到了sendctrl_fd上面,而上面讲的取数据是从recvctrl_fd上取的,明显对不上。这到底是怎么回事?是的,确实是这样,还有一段代码没有看的话,这个确实是解释不通的。

        int fd[2];
        poll_fd efd = sp_create();
        if (sp_invalid(efd)) {
            fprintf(stderr, "socket-server: create event pool failed.\n");
            return NULL;
        }
        if (pipe(fd)) { //管道操作,fd[0]为读取端,fd[1]为写入端
            sp_release(efd);
            fprintf(stderr, "socket-server: create socket pair failed.\n");
            return NULL;
        }
        if (sp_add(efd, fd[0], NULL)) {
            // add recvctrl_fd to event poll
            fprintf(stderr, "socket-server: can't add server fd to event pool.\n");
            close(fd[0]);
            close(fd[1]);
            sp_release(efd);
            return NULL;
        }
    
        struct socket_server *ss = MALLOC(sizeof(*ss));
        ss->event_fd = efd;
        ss->recvctrl_fd = fd[0];  //其它这个是管道的读取端
        ss->sendctrl_fd = fd[1]; //这个是管道的写入端
        ss->checkctrl = 1;
    

    6.上面的几行代码说明了sendctrl_fd是管道的写入端,recvctrl_fd是管道的读取端,这就解释了上面5的疑问。因为管道写入端的数据都会到读取端。所以从sendctrl_fd写进去,会到recvctrl_fd里。

    7.socket_server_poll中select函数检查recvctrl_fd,如果有消息,进入case控制语句,然后到'O‘,调用open_socket,在这个函数里,会调用大家熟悉的connect函数。打开一个连接到目标ip和端口的连接。

    为什么操作一个网络要费这么大的劲呢,绕来绕去非常的不直观。因为skynet是基于消息的,而且每个服务都有一个monitor,每个消息处理的时候要尽可能的短,这样才不会阻塞服务里其它的请求。而connect这种明显是阻塞的,当然也可以写成非阻塞的,但是非阻塞的话,你需要不断地挂起,因为非阻塞实际上是基于select技术来实现的。而不断地挂起,这个就很麻烦,写起来很痛苦而且很容易出错。因此云风把这些都放到网络线程中来做,这样就不会影响工作线程。但是这样做也有它的缺点,那就是网络线程可能会被阻塞,网络线程被阻塞就会导致服务无响应。或者导致大量的数据包积累,引起波峰。

    相关文章

      网友评论

          本文标题:skynet源码分析(8)--skynet的网络

          本文链接:https://www.haomeiwen.com/subject/oxsfdxtx.html