美文网首页
skynet集群学习

skynet集群学习

作者: lindx | 来源:发表于2018-05-17 20:09 被阅读0次

    在了解cluster之前,先看看example下的cluster1.luacluster2.lua例子 ,为了方便理解,我对这两个例子做了相应的修改:

    --cluster1.lua
    local skynet = require "skynet"
    local cluster = require "skynet.cluster"
    local snax = require "skynet.snax"
    require "skynet.manager"
    
    
    skynet.start(function()
        cluster.reload {
            db = "127.0.0.1:2528",
            db2 = "127.0.0.1:2529",
        }
        local sdb = skynet.newservice("simpledb")
        skynet.name("sdb", sdb)
    
        print(skynet.call(sdb, "lua", "SET", "a", "foobar"))
    
        cluster.open "db"
        cluster.open "db2"
    end)
    
    --cluster2.lua
    local skynet = require "skynet"
    local cluster = require "skynet.cluster"
    
    skynet.start(function()
        print(cluster.call("db", "sdb", "GET", "a"))
    end)
    

    现在就来具体分析了解一下

    --cluster1.lua
        cluster.reload {
            db = "127.0.0.1:2528",
            db2 = "127.0.0.1:2529",
        }
    
    --clusterd.lua
    local function loadconfig(tmp)
        ...
        for name,address in pairs(tmp) do
            assert(address == false or type(address) == "string")
            if node_address[name] ~= address then
                -- address changed,用rawget是为了不触对发元表的访问
                if rawget(node_channel, name) then
                    node_channel[name] = nil    -- reset connection
                end
                node_address[name] = address
            end
            ...
        end
    end
    
    function command.reload(source, config)
        loadconfig(config)
        skynet.ret(skynet.pack(nil)) 
    end
    

    cluster.reload 的作用主要是先将节点名和与其相当于的地址保存到表node_address中,目的是为了后续发起远程请求用到,如cluster.send或者cluster.call。

    --cluster1.lua
    local sdb = skynet.newservice("simpledb")
    skynet.name("sdb", sdb)
    
    
    --clusterd.lua
    local register_name = {}
    function command.register(source, name, addr)
        assert(register_name[name] == nil)
        addr = addr or source
        local old_name = register_name[addr]
        if old_name then
            register_name[old_name] = nil
        end
        register_name[addr] = name
        register_name[name] = addr
        skynet.ret(nil)
        skynet.error(string.format("Register [%s] :%08x", name, addr))
    end
    
    
    

    创建一个simpledb服务,并为 sdb 服务的 addr 起一个别名"sdb",这里我做了稍微的修改,不使用原来例子的 cluster.register("sdb", sdb) 。主要方便 cluster2.lua 用节点名 + 服务名来做远程访问。

    --cluster1.lua
        cluster.open "db"
        cluster.open "db2"
    
    --cluster.lua
    function cluster.open(port)
        if type(port) == "string" then
            skynet.call(clusterd, "lua", "listen", port)
        else
            skynet.call(clusterd, "lua", "listen", "0.0.0.0", port)
        end
    end
    
    --clusterd.lua
    function command.listen(source, addr, port)
        local gate = skynet.newservice("gate")
        if port == nil then
            local address = assert(node_address[addr], addr .. " is down")
            addr, port = string.match(address, "([^:]+):(.*)$")
        end
        skynet.call(gate, "lua", "open", { address = addr, port = port })
        skynet.ret(skynet.pack(nil))
    end
    
    --gate.lua
    ...
    gateserver.start(handler)
    
    --gateserver.lua
    function gateserver.start(handler)
        assert(handler.message)
        assert(handler.connect)
    
        function CMD.open( source, conf )
            ...
            skynet.error(string.format("Listen on %s:%d", address, port))
            socket = socketdriver.listen(address, port)  --监听ip地址和port端口号
            socketdriver.start(socket)
            if handler.open then
                return handler.open(source, conf)
            end
        end
    ...
    
    
    

    接下来是 cluster.open "db" 和 "db2" ,通过node_address来获取其之前保存 db 和 db2 的addr,然后创建gate网关,调用gate 的open方法,因为在gate的消息分发函数是写在gateserver.lua文件里面的,所以 skynet.call(gate, "lua", "open", { address = addr, port = port }) 其实是跑到了 gateserver.lua 里面的open方法中,当调用完成时,就开始监听地址和端口号了。

    接下来再看看 cluster2.lua 是如何远程调用 cluster1.luad 的:

    --cluster2.lua
    print(cluster.call("db", "sdb", "GET", "a"))
    

    这里的远程调用也很简单,只需要知道cluster1.lua 节点的监听地址和它提供了哪些服务(通过 skynet.name 起的别名来查找)。

    接着,再看看 cluster.call("db", "sdb", "GET", "a") 是如何发送数据给 cluster1节点的sdb服务的。

    --cluster.lua
    function cluster.call(node, address, ...)
        return skynet.call(clusterd, "lua", "req", node, address, skynet.pack(...))
    end
    
    --clusterd.lua
    local function send_request(source, node, addr, msg, sz)
        local session = node_session[node] or 1
        -- msg is a local pointer, cluster.packrequest will free it
        local request, new_session, padding = cluster.packrequest(addr, session, msg, sz)
        node_session[node] = new_session
    
        -- node_channel[node] may yield or throw error
        local c = node_channel[node]
    
        return c:request(request, session, padding)
    end
    
    function command.req(...)
        local ok, msg, sz = pcall(send_request, ...)
        if ok then
            if type(msg) == "table" then
                skynet.ret(cluster.concat(msg))
            else
                skynet.ret(msg)
            end
        else
            skynet.error(msg)
            skynet.response()(false)
        end
    end
    

    这里,先对用户的数据进行第一层打包skynet.pack(...),对于skynet.pack 是如何打包数据的,由于篇幅有限,将再以后的章节中具体再描述。大家只需要先知道它对数据打包后会返回 一个用户自定义类型 msg 和长度 sz,就好了。
    cluster.packrequest(addr, session, msg, sz) 就是对 skynet.pack 打包后得到的 msg 再一次打包,其实也就是加上头部信息,并重新放到一块新的内存中。

    --lua-cluster.c
    //宏定义
    #define TEMP_LENGTH 0x8200    //十进制 33280
    #define MULTI_PART 0x8000     //十进制 32768
    
    // 对session打包,占用buf 4个字节
    static void
    fill_uint32(uint8_t * buf, uint32_t n) {
        buf[0] = n & 0xff;
        buf[1] = (n >> 8) & 0xff;
        buf[2] = (n >> 16) & 0xff;
        buf[3] = (n >> 24) & 0xff;
    }
    
    //对消息长度打包,占用buf 2个字节
    static void
    fill_header(lua_State *L, uint8_t *buf, int sz) {
        assert(sz < 0x10000);
        buf[0] = (sz >> 8) & 0xff;  //sz 左移8位,得到高8位数据
        buf[1] = sz & 0xff;         //sz & 0xff,屏蔽高位数据,得到sz低8位数据
    }
    
    static int
    packreq_string(lua_State *L, int session, void * msg, uint32_t sz, int is_push) {
        size_t namelen = 0;
        const char *name = lua_tolstring(L, 1, &namelen);
        if (name == NULL || namelen < 1 || namelen > 255) {
            skynet_free(msg);
            luaL_error(L, "name is too long %s", name);
        }
    
        uint8_t buf[TEMP_LENGTH];
        if (sz < MULTI_PART) {
            fill_header(L, buf, sz+6+namelen);
            buf[2] = 0x80;
            buf[3] = (uint8_t)namelen;
            memcpy(buf+4, name, namelen);
            fill_uint32(buf+4+namelen, is_push ? 0 : (uint32_t)session);
            memcpy(buf+8+namelen,msg,sz);
    
            lua_pushlstring(L, (const char *)buf, sz+8+namelen);
            return 0;
        } else {
            int part = (sz - 1) / MULTI_PART + 1;
            fill_header(L, buf, 10+namelen);
            buf[2] = is_push ? 0xc1 : 0x81; // multi push or request
            buf[3] = (uint8_t)namelen;
            memcpy(buf+4, name, namelen);
            fill_uint32(buf+4+namelen, (uint32_t)session);
            fill_uint32(buf+8+namelen, sz);
    
            lua_pushlstring(L, (const char *)buf, 12+namelen);
            return part;
        }
    }
    
    static int
    packrequest(lua_State *L, int is_push) {
        void *msg = lua_touserdata(L,3);
        if (msg == NULL) {
            return luaL_error(L, "Invalid request message");
        }
        uint32_t sz = (uint32_t)luaL_checkinteger(L,4);
        int session = luaL_checkinteger(L,2);
        ...
        int addr_type = lua_type(L,1);
        int multipak;
        ...
        multipak = packreq_string(L, session, msg, sz, is_push);
        ...
        uint32_t new_session = (uint32_t)session + 1;
        ...
        lua_pushinteger(L, new_session);
        ...
        skynet_free(msg);
        return 2;
        ...
    }
    
    static int
    lpackrequest(lua_State *L) {
        return packrequest(L, 0);
    }
    
    static int
    lpackpush(lua_State *L) {
        return packrequest(L, 1);
    }
    

    由于 packrequest 函数会对 addr 地址进行判断,是否是数字或者是字符串,然后再按其类型打包。这里就以 addr 是字符串为例,先从 cluster.packrequest(addr, session, msg, sz) 中获取第3个参数 msg,判断如果 msg 为空的活,就没有必要再进行打包了,接着再获取第4个参数 sz,第2个参数 session,这里会对session进行加一操作,得到一个新的new_session,目的就是 new_session 用来标识远程会话记录,在上上面的代码中有所体现

    node_session[node] = new_session

    packreq_string(L, session, msg, sz, is_push); 会对 sz 长度进行判断,如果大于 MULTI_PART (32k字节)的话,并且 packrequest的第二个参数是 0 的话就是,表明 rpc 是一次请求 + 响应过程,那么 buf[2] = 0x81。如果是1,表示这一次请求是推送的,不需要有回复,那么 buf[2] = 0xc1,并且会根据 part 进行多次发送。如果 sz 小于32 k字节,那么就好办了, buf 存储的内容如下:

    • 第0~1个字节 : 度信息(msg消息长度+5+namelen服务名长度)
    • 第2个字节 : type类型
    • 第3个字节:服务名长度
    • 第4~namelen个字节(namelen个字节):服务名
    • 第4+namelen~4+namelen+4个字节(4个字节):session
    • 之后就是存储 msg 消息的内容了

    此时,整个 c 层调用完之后,将会得到 buf 和 new_session(padding只有在sz大于32k时才存在)。

    --clusterd.lua
    local function open_channel(t, key)
        ...
        local address = node_address[key]    --在
        ...
        if address then
            local host, port = string.match(address, "([^:]+):(.*)$")
            c = sc.channel {
                host = host,
                port = tonumber(port),
                response = read_response,
                nodelay = true,
            }
            succ, err = pcall(c.connect, c, true)    -- 发起远程连接
            if succ then
                t[key] = c
                ct.channel = c
            end
        else
            err = "cluster node [" .. key .. "] is down."
        end
        ...
        return c
    end
    
    --设置 node_channel 元表为 open_channel 
    local node_channel = setmetatable({}, { __index = open_channel })
    
    local function send_request(source, node, addr, msg, sz)
        local session = node_session[node] or 1
        -- msg is a local pointer, cluster.packrequest will free it
        local request, new_session, padding = cluster.packrequest(addr, session, msg, sz)
        node_session[node] = new_session
    
        --再对下面两行代码进行分析
        -- node_channel[node] may yield or throw error
        local c = node_channel[node]
    
        return c:request(request, session, padding)
    end
    

    此时,在执行 local c = node_channel[node] 的时候,就已经发起了远程连接的请求了。为什么呢,在学习lua语法时,有个元表的概念,如果索引 key 在本 table 中找不到,并且存在元表的情况下,那么它会去元表再找一次。此时,就会调用到 open_channel 方法,发起远程连接。
    连接请求的发送函数在socketchannel.lua文件中:

    --socketchannel.lua
    local function connect_once(self)
        ...
        local fd,err = socket.open(self.__host, self.__port)  --调用 c底层的网络API
        ...
    end
    
    local function try_connect(self , once)
        local t = 0
        while not self.__closed do
            local ok, err = connect_once(self)
            ...
        end
    end
    
    local function block_connect(self, once)
        ...
        if #self.__connecting > 0 then
            -- connecting in other coroutine
            local co = coroutine.running()
            table.insert(self.__connecting, co)
            skynet.wait(co)
        else
            self.__connecting[1] = true
            err = try_connect(self, once)
            self.__connecting[1] = nil
            for i=2, #self.__connecting do
                local co = self.__connecting[i]
                self.__connecting[i] = nil
                skynet.wakeup(co)
            end
        end
        ...
    end
    
    function channel:connect(once)
        ...
        return block_connect(self, once)
    end
    

    紧接着再看看 c:request(request, session, padding) 又调用了哪些函数:

    --socketchannel.lua
    function channel:request(request, response, padding)
        assert(block_connect(self, true))   -- connect once 由于之前已经发起连接过了,这里不会再去连接,大家可以放心。
        local fd = self.__sock[1]
    
        if padding then
            -- padding may be a table, to support multi part request
            -- multi part request use low priority socket write
            -- now socket_lwrite returns as socket_write
            if not socket_lwrite(fd , request) then
                sock_err(self)
            end
            --这里就将之前大于32k的数据包分多次发送
            for _,v in ipairs(padding) do
                if not socket_lwrite(fd, v) then
                    sock_err(self)
                end
            end
        else
            --小于32k 的数据包一次发送完
            if not socket_write(fd , request) then
                sock_err(self)
            end
        end
    
        if response == nil then
            -- no response
            return
        end
        --发送完数据,那么就要挂起当前协程,等待对方响应消息了。
        return wait_for_response(self, response)
    end
    

    在等待函数 wait_for_response() 中,又做了哪些事呢。

    --socketchannel.lua
    local function wait_for_response(self, response)
        local co = coroutine.running()
        push_response(self, response, co)
        skynet.wait(co)  --挂起当前协程
    
        local result = self.__result[co]  -- 存放 本次 co 的错误码
        self.__result[co] = nil
        local result_data = self.__result_data[co]  --存放远程服务返回的数据,这里就是你最想要的结果数据了
        self.__result_data[co] = nil
    
        if result == socket_error then
            if result_data then
                error(result_data)
            else
                error(socket_error)
            end
        else
            assert(result, result_data)
            return result_data  --如果远程调用没有错误,就返回数据,这个数据还是经过打包的。
        end
    end
    

    那么大家可能会问,既然挂起了,在什么时候会被唤醒呢,还记得之前讲过的
    local function open_channel(t, key) 函数吗, 这个函数里面有这么一段代码:

    --clusterd.lua
    c = sc.channel {
        host = host,
        port = tonumber(port),
        response = read_response,  --设置读取响应结果的回调函数
        nodelay = true,
    }
    

    就是这个读取响应函数起的作用。接着再来仔细看看:

    --clusterd.lua
    local function read_response(sock)
        local sz = socket.header(sock:read(2))     --阻塞的读取socket数据
        local msg = sock:read(sz)                  --读取内容
        return cluster.unpackresponse(msg)  -- session, ok, data, padding 稍后介绍到
    end
    
    --socketchannel.lua
    local function dispatch_by_session(self)
        local response = self.__response
        -- response() return session
        while self.__sock do
            --这里的 response 函数,就是之前设置的 read_response 函数了。
            --这里会一直阻塞,直到回调函数返回,等待结果。
            local ok , session, result_ok, result_data, padding = pcall(response, self.__sock)  --这里的result_data就是对方响应的内容了,经skynet.pack打包。
            if ok and session then
                local co = self.__thread[session]
                if co then
                    if padding and result_ok then
                        -- If padding is true, append result_data to a table (self.__result_data[co])
                        local result = self.__result_data[co] or {}
                        self.__result_data[co] = result
                        table.insert(result, result_data)
                    else
                        self.__thread[session] = nil
                        self.__result[co] = result_ok
                        if result_ok and self.__result_data[co] then
                            table.insert(self.__result_data[co], result_data)
                        else
                            self.__result_data[co] = result_data
                        end
                        skynet.wakeup(co)    --在这里被换醒了,wait_for_response 函数就可以往下走了
                    end
                ...
            end
        end
        exit_thread(self)
    end
    
    local function dispatch_function(self)
        if self.__response then
            return dispatch_by_session  --假设需要有响应结果,那么就会返回这个函数(根据cluster.call决定)
        else
            return dispatch_by_order  --假设不需要有响应结果,那么就会返回这个函数(根据cluster.send决定)
        end
    end
    
    local function connect_once(self)
        ...
        --fork一个协程出来,在下一帧执行
        --这里就是要等待响应结果的关键入口
        self.__dispatch_thread = skynet.fork(dispatch_function(self), self) 
        ...
    end
    

    这里,又涉及到一个 c层的关键调用,read_response 函数中的 cluster.unpackresponse(msg),看看它做了些什么:

    //lua-cluster.c
    static int
    lunpackresponse(lua_State *L) {
        size_t sz;
        const char * buf = luaL_checklstring(L, 1, &sz);
        if (sz < 5) {
            return 0;
        }
        uint32_t session = unpack_uint32((const uint8_t *)buf);  //session占4个字节,跟打包一一对应
        lua_pushinteger(L, (lua_Integer)session);            //将session压入栈中,作为函数的第一个返回数据
        switch(buf[4]) {
        case 0: // error
            lua_pushboolean(L, 0);
            lua_pushlstring(L, buf+5, sz-5);
            return 3;
        case 1: // ok
        case 4: // multi end
            lua_pushboolean(L, 1);
            lua_pushlstring(L, buf+5, sz-5);
            return 3;
        case 2: // multi begin
            if (sz != 9) {
                return 0;
            }
            sz = unpack_uint32((const uint8_t *)buf+5);
            lua_pushboolean(L, 1);
            lua_pushinteger(L, sz);
            lua_pushboolean(L, 1);
            return 4;
        case 3: // multi part
            lua_pushboolean(L, 1);
            lua_pushlstring(L, buf+5, sz-5);
            lua_pushboolean(L, 1);
            return 4;
        default:
            return 0;
        }
    }
    

    lunpackresponse 函数主要对 msg 内容进行第一层解包,主要是根据头部消息来解。包头有:

    • 0~3个字节:session
    • 第4个字节:type
    • 第5个字节开始:skynet.pack打包的内容了
    • 最后一个字节(可能有,也可能没有,主要数据包不超过32k,就不会有):padding

    到了这里,再一层一层的返回,我们就可以看到返回的结果了:

    skynet.lua
    function skynet.call(addr, typename, ...)
        local p = proto[typename]
        local session = c.send(addr, p.id , nil , p.pack(...))
        if session == nil then
            error("call to invalid address " .. skynet.address(addr))
        end
        return p.unpack(yield_call(addr, session))    --这里再进行第二层解包,最后就是用户想要的远程响应结果了
    end
    
    --clusterd.lua
    function command.req(...)
        local ok, msg, sz = pcall(send_request, ...)
        if ok then
            --数据原路返回
            if type(msg) == "table" then
                skynet.ret(cluster.concat(msg))
            else
                skynet.ret(msg)
            end
        ...
    end
    
    --cluster.lua
    function cluster.call(node, address, ...)
        -- skynet.pack(...) will free by cluster.core.packrequest
        return skynet.call(clusterd, "lua", "req", node, address, skynet.pack(...))
    end
    

    好了,到此,我们可以了解到 cluster2.lua 是如何发起请求数据,以及如何获取响应结果了,也完成了远程调用的一半内容了。

    接下来,再看看cluster1.lua 在接收到数据后是如何转发到相应的服务,以及服务是如何回消息的。
    之前也有提到过,cluster.open "db" 最终会创建 gate 网关来监听。

    --gate.lua
    function handler.message(fd, msg, sz)
        -- recv a package, forward it
        local c = connection[fd]
        local agent = c.agent        --由于之前clusterd.lua在创建 gate 服务时,并没有指定 agent,所以这里的 agent 是 nil
        if agent then
            skynet.redirect(agent, c.client, "client", 1, msg, sz)
        else
            skynet.send(watchdog, "lua", "socket", "data", fd, netpack.tostring(msg, sz))  --转发到 clusterd.lua 的socket方法
        end
    end
    
    
    --gateserver.lua
        local function dispatch_msg(fd, msg, sz)
            if connection[fd] then
                handler.message(fd, msg, sz)    --回调 gate.lua 的 message 方法
            else
                skynet.error(string.format("Drop message from fd (%d) : %s", fd, netpack.tostring(msg,sz)))
            end
        end
    
        MSG.data = dispatch_msg
    
        --注册 socket消息
        skynet.register_protocol {
            name = "socket",
            id = skynet.PTYPE_SOCKET,   -- PTYPE_SOCKET = 6
            unpack = function ( msg, sz )
                return netpack.filter( queue, msg, sz)
            end,
            dispatch = function (_, _, q, type, ...)
                queue = q
                if type then
                    MSG[type](...)    --设置回调函数
                end
            end
        }
    
    --clusterd.lua
    function command.listen(source, addr, port)
        local gate = skynet.newservice("gate")
        ...
        skynet.call(gate, "lua", "open", { address = addr, port = port })
    end
    

    这里再次回顾一下 clusterd.lua 是如何创建 gate 服务的。以及如何接收远程发送过来的消息。接下来,就看看gate 再接收消息后,clusterd.lua又是如何来处理的。

    --clusterd.lua
    function command.socket(source, subcmd, fd, msg)
        if subcmd == "data" then
            local sz
            local addr, session, msg, padding, is_push = cluster.unpackrequest(msg)
            if padding then                    --(1)
                local requests = large_request[fd]
                if requests == nil then
                    requests = {}
                    large_request[fd] = requests
                end
                local req = requests[session] or { addr = addr , is_push = is_push }
                requests[session] = req
                table.insert(req, msg)
                return
            else    
                local requests = large_request[fd]
                if requests then
                    local req = requests[session]
                    if req then
                        requests[session] = nil
                        table.insert(req, msg)
                        msg,sz = cluster.concat(req)
                        addr = req.addr
                        is_push = req.is_push
                    end
                end
                if not msg then
                    local response = cluster.packresponse(session, false, "Invalid large req")
                    socket.write(fd, response)
                    return
                end
            end
            local ok, response
            if addr == 0 then
                local name = skynet.unpack(msg, sz)
                local addr = register_name[name]
                if addr then
                    ok = true
                    msg, sz = skynet.pack(addr)
                else
                    ok = false
                    msg = "name not found"
                end
            elseif is_push then        --(2)
                skynet.rawsend(addr, "lua", msg, sz)
                return  -- no response
            else    --(3)
                ok , msg, sz = pcall(skynet.rawcall, addr, "lua", msg, sz) 
            end
            if ok then     
                response = cluster.packresponse(session, true, msg, sz)
                if type(response) == "table" then
                    for _, v in ipairs(response) do
                        socket.lwrite(fd, v)
                    end
                else
                    socket.write(fd, response)
                end
            else
                response = cluster.packresponse(session, false, msg)  --根据 session 返回给对应的请求方
                socket.write(fd, response)
            end
        elseif subcmd == "open" then
            skynet.error(string.format("socket accept from %s", msg))
            skynet.call(source, "lua", "accept", fd)
        else
            large_request[fd] = nil
            skynet.error(string.format("socket %s %d %s", subcmd, fd, msg or ""))
        end
    end
    

    为了方便起见,这里假设padding 为 nil,数据包不超过32k,那么就不会走流程(1)处代码。如果是对方节点发起的请求是 cluster.send 方式(推送方式),则走流程(2)。如果是 cluster.call 方式(请求响应),则走流程(3)。
    对于流程(2),调用 skynet.rawsend(addr, "lua", msg, sz), 就是对消息进行派发,发送给指定的 addr 服务。addr 可以是字符串也可以是数字,但对于我们之前说的,addr 就是 "sdb" 字符串。它不需要响应,所以这里直接返回,就是 (3)上一行代码 return -- no response
    对于流程(3),在调用 ok , msg, sz = pcall(skynet.rawcall, addr, "lua", msg, sz) 完后,会得到响应消息。如果调用成功后,那么就会对 msg 进行打包,加上头部消息,从而通过 socket.write(fd, response) 发送回去,这样就完成了一次远程过程调用。

    对于skynet.rawsendskynet.rawcall 不是很了解的,可以先看看 skynet源码赏析

    现在对 cluster.unpackrequest(msg) 进行分析,看看是如何解包的。

    //lua-cluster.c
    static int
    unpackreq_string(lua_State *L, const uint8_t * buf, int sz) {
        if (sz < 2) {
            return luaL_error(L, "Invalid cluster message (size=%d)", sz);
        }
        size_t namesz = buf[1];  //获取服务名长度
        if (sz < namesz + 6) {
            return luaL_error(L, "Invalid cluster message (size=%d)", sz);
        }
        lua_pushlstring(L, (const char *)buf+2, namesz);   //返回服务名
        uint32_t session = unpack_uint32(buf + namesz + 2); 
        lua_pushinteger(L, (uint32_t)session);   //返回session
        lua_pushlstring(L, (const char *)buf+2+namesz+4, sz - namesz - 6);  //返回消息内容 msg
        if (session == 0) {
            lua_pushnil(L);
            lua_pushboolean(L,1);   // is_push, no reponse
            return 5;
        }
    
        return 3;
    }
    
    static int
    lunpackrequest(lua_State *L) {
        size_t ssz;
        const char *msg = luaL_checklstring(L,1,&ssz);
        int sz = (int)ssz;
        switch (msg[0]) {
        ...
        case '\x80':  //地址是一个字符串,且内容不超过 32k
            return unpackreq_string(L, (const uint8_t *)msg, sz);
        ...
        }
    }
    

    与之前讲到的 packreq_string(L, session, msg, sz, is_push); 相对应。先获取服务名长度namesz。再通过namesz获取服务名,之后就是 session,最后就是消息体了。
    再看看 cluster.packresponse(session, false, msg),是如何对 msg 打包加上头部的吧。

    //lua-cluster.c
    static int
    lpackresponse(lua_State *L) {
        uint32_t session = (uint32_t)luaL_checkinteger(L,1);
        // clusterd.lua:command.socket call lpackresponse,
        // and the msg/sz is return by skynet.rawcall , so don't free(msg)
        int ok = lua_toboolean(L,2);
        void * msg;
        size_t sz;
        
        if (lua_type(L,3) == LUA_TSTRING) {  //
            msg = (void *)lua_tolstring(L, 3, &sz);  //msg指向消息体
        } 
        ...
        //接下来就是打包头部信息了 
        uint8_t buf[TEMP_LENGTH];
        fill_header(L, buf, sz+5);
        fill_uint32(buf+2, session);
        buf[6] = ok;
        memcpy(buf+7,msg,sz);
        lua_pushlstring(L, (const char *)buf, sz+7);
    
        return 1;
    }
    
    

    头部信息有:

    • 0~1个字节:消息长度
    • 2~5个字节:session
    • 第6个字节:状态码
    • 第7个字节起:msg消息

    到此,就将完了skynet集群部分大概是如何建立、以及如何相互通信的了。当然,还有一些细节部分没仔细分析,不过对于大家来说,应该不是什么难事了(o´ω`o)。

    相关文章

      网友评论

          本文标题:skynet集群学习

          本文链接:https://www.haomeiwen.com/subject/kfavdftx.html