如何通过lua启动lua服务
从skynet.newservice看起
以main.lua为例,看skynet如何启动一个lua服务:
skynet.error("Server start")
skynet.uniqueservice("protoloader")
if not skynet.getenv "daemon" then
local console = skynet.newservice("console")
end
skynet.newservice("debug_console",8000)
skynet.newservice("simpledb")
local watchdog = skynet.newservice("watchdog")
skynet.call(watchdog, "lua", "start", {
port = 8888,
maxclient = max_client,
nodelay = true,
})
skynet.error("Watchdog listen on", 8888)
skynet.exit()
end)
skynet.newservice("watchdog")启动名为watchdog的服务,我们接下来看一下整个启动流程:
1)skynet.newservice的定义如下:
function skynet.newservice(name, ...)
return skynet.call(".launcher", "lua" , "LAUNCH", "snlua", name, ...)
end
表示给.launcher发送一个lua类型的消息,后面跟的是消息内容
这里关注两个地方:
1 .launcher是什么服务
在bootstrap.lua中会启动launcher服务,这里.launcher就是我们启动skynet时启动的launcher服务。newservice就是给launcher发送一个LAUNCH消息。
local launcher = assert(skynet.launch("snlua","launcher"))
skynet.name(".launcher", launcher)
2 sktnet.call()
function skynet.call(addr, typename, ...)
local tag = session_coroutine_tracetag[running_thread]
if tag then
c.trace(tag, "call", 2)
c.send(addr, skynet.PTYPE_TRACE, 0, tag)
end
local p = proto[typename]
local session = c.send(addr, p.id , nil , p.pack(...)) --------①
if session == nil then
error("call to invalid address " .. skynet.address(addr))
end
return p.unpack(yield_call(addr, session)) ----------------②
end
在skynet.call里面,我们可以发现,①处我们会调用send函数发送消息。c.send此时会调用lua与c++进行交互。在lua-skynet.lua中调用lsend函数。调用send_mesaage函数。
static int
lsend(lua_State *L) {
return send_message(L, 0, 2);
}
static int
send_message(lua_State *L, int source, int idx_type) {
struct skynet_context * context = lua_touserdata(L, lua_upvalueindex(1));
uint32_t dest = (uint32_t)lua_tointeger(L, 1);
const char * dest_string = NULL;
if (dest == 0) {
if (lua_type(L,1) == LUA_TNUMBER) {
return luaL_error(L, "Invalid service address 0");
}
dest_string = get_dest_string(L, 1);
}
int type = luaL_checkinteger(L, idx_type+0);
int session = 0;
if (lua_isnil(L,idx_type+1)) {
type |= PTYPE_TAG_ALLOCSESSION;
} else {
session = luaL_checkinteger(L,idx_type+1);
}
int mtype = lua_type(L,idx_type+2);
switch (mtype) {
case LUA_TSTRING: {
size_t len = 0;
void * msg = (void *)lua_tolstring(L,idx_type+2,&len);
if (len == 0) {
msg = NULL;
}
if (dest_string) {
session = skynet_sendname(context, source, dest_string, type, session , msg, len);------①
} else {
session = skynet_send(context, source, dest, type, session , msg, len); -------②
}
break;
}
case LUA_TLIGHTUSERDATA: {
void * msg = lua_touserdata(L,idx_type+2);
int size = luaL_checkinteger(L,idx_type+3);
if (dest_string) {
session = skynet_sendname(context, source, dest_string, type | PTYPE_TAG_DONTCOPY, session, msg, size);
} else {
session = skynet_send(context, source, dest, type | PTYPE_TAG_DONTCOPY, session, msg, size);
}
break;
}
default:
luaL_error(L, "invalid param %s", lua_typename(L, lua_type(L,idx_type+2)));
}
if (session < 0) {
if (session == -2) {
// package is too large
lua_pushboolean(L, 0);
return 1;
}
// send to invalid address
// todo: maybe throw an error would be better
return 0;
}
lua_pushinteger(L,session);
return 1;
}
其中,①skynet_sendname里面也会调用skynet_send()函数。②就是讲该消息skynet_context_push到launcher服务对应的消息队列中。在工作线程中会一直调用skynet_context_pop函数,当取到对应的消息时,调用launcher服务的callback函数。具体的disptach函数处理过程上一章节已经讲过。这里不再讲。
接收到的是LAUNCHER消息,我们调用
---launcher.lua
function command.LAUNCH(_, service, ...)
launch_service(service, ...)
return NORET
end
---launcher.lua
local function launch_service(service, ...)
local param = table.concat({...}, " ")
local inst = skynet.launch(service, param) -------------①
local session = skynet.context()
local response = skynet.response()
if inst then
services[inst] = service .. " " .. param
instance[inst] = response
launch_session[inst] = session
else
response(false)
return
end
return inst
end
最后调用到launch_service中的①。我们接下来看一下①是如何处理的。
---manager.lua
function skynet.launch(...)
local addr = c.command("LAUNCH", table.concat({...}," "))-------①
if addr then
return tonumber("0x" .. string.sub(addr , 2))
end
end
①会去调用lua与c++交互里面的lcommand函数进行处理
---lua-skynet.c
static int
lcommand(lua_State *L) {
struct skynet_context * context = lua_touserdata(L, lua_upvalueindex(1));
const char * cmd = luaL_checkstring(L,1);
const char * result;
const char * parm = NULL;
if (lua_gettop(L) == 2) {
parm = luaL_checkstring(L,2);
}
result = skynet_command(context, cmd, parm); ------①
if (result) {
lua_pushstring(L, result);
return 1;
}
return 0;
}
①会调用
---lua-skynet.c
static struct command_func cmd_funcs[] = {
{ "TIMEOUT", cmd_timeout },
{ "REG", cmd_reg },
{ "QUERY", cmd_query },
{ "NAME", cmd_name },
{ "EXIT", cmd_exit },
{ "KILL", cmd_kill },
{ "LAUNCH", cmd_launch },
{ "GETENV", cmd_getenv },
{ "SETENV", cmd_setenv },
{ "STARTTIME", cmd_starttime },
{ "ABORT", cmd_abort },
{ "MONITOR", cmd_monitor },
{ "STAT", cmd_stat },
{ "LOGON", cmd_logon },
{ "LOGOFF", cmd_logoff },
{ "SIGNAL", cmd_signal },
{ NULL, NULL },
};
const char *
skynet_command(struct skynet_context * context, const char * cmd , const char * param) {
struct command_func * method = &cmd_funcs[0];
while(method->name) {
if (strcmp(cmd, method->name) == 0) {
return method->func(context, param);--------①
}
++method;
}
return NULL;
}
①最后会执行cmd_launch函数。
---skynet_server.c
static const char *
cmd_launch(struct skynet_context * context, const char * param) {
size_t sz = strlen(param);
char tmp[sz+1];
strcpy(tmp,param);
char * args = tmp;
char * mod = strsep(&args, " \t\r\n");
args = strsep(&args, "\r\n");
struct skynet_context * inst = skynet_context_new(mod,args);
if (inst == NULL) {
return NULL;
} else {
id_to_hex(context->result, inst->handle);
return context->result;
}
}
最后会调用skynet_context_new()启动watchdog服务。
skynet如何启动一个lua服务
在启动skynet的时候会通过skynet_main调用skynet_start
//skynet_start.c
void
skynet_start(struct skynet_config * config) {
// register SIGHUP for log file reopen
struct sigaction sa;
sa.sa_handler = &handle_hup;
sa.sa_flags = SA_RESTART;
sigfillset(&sa.sa_mask);
sigaction(SIGHUP, &sa, NULL);
if (config->daemon) {
if (daemon_init(config->daemon)) {
exit(1);
}
}
skynet_harbor_init(config->harbor);
skynet_handle_init(config->harbor);
skynet_mq_init(); //-----------①初始化消息队列
skynet_module_init(config->module_path);//--------------② 初始化module
skynet_timer_init(); //------------③ //初始化定时器
skynet_socket_init(); //-----------④ //初始化socket
skynet_profile_enable(config->profile);
struct skynet_context *ctx = skynet_context_new(config->logservice, config->logger);//----⑤启动logger服务
if (ctx == NULL) {
fprintf(stderr, "Can't launch %s service\n", config->logservice);
exit(1);
}
skynet_handle_namehandle(skynet_context_handle(ctx), "logger");
bootstrap(ctx, config->bootstrap);//-------------------⑥
start(config->thread);//-------------------------------⑦
// harbor_exit may call socket send, so it should exit before socket_free
skynet_harbor_exit();
skynet_socket_free();
if (config->daemon) {
daemon_exit(config->daemon);
}
}
skynet_start是启动skynet服务器的入口程序,接下来将主要讲①--⑦这几个部分
先讲bootstrap函数
//skynet_start.c
static void
bootstrap(struct skynet_context * logger, const char * cmdline) {
int sz = strlen(cmdline);
char name[sz+1];
char args[sz+1];
sscanf(cmdline, "%s %s", name, args); //name:snlua args: bootstrap
struct skynet_context *ctx = skynet_context_new(name, args);//-----------①
if (ctx == NULL) {
skynet_error(NULL, "Bootstrap error : %s\n", cmdline);
skynet_context_dispatchall(logger);
exit(1);
}
}
此处调用skynet_context_new函数启动新的服务,根据conf配置文件,服务名为snlua,参数为bootstrap。
先来分析一下skynet_context_new
skynet_server.c
struct skynet_context *
skynet_context_new(const char * name, const char *param) {
struct skynet_module * mod = skynet_module_query(name); //---①从module列表中查找出对应得module
if (mod == NULL)
return NULL;
void *inst = skynet_module_instance_create(mod); //---②创建mod对应实例
if (inst == NULL)
return NULL;
struct skynet_context * ctx = skynet_malloc(sizeof(*ctx)); //----③建立一个context,用于关联mod,inst和对应消息队列等信息
CHECKCALLING_INIT(ctx)
ctx->mod = mod;
ctx->instance = inst;
ctx->ref = 2;
ctx->cb = NULL;
ctx->cb_ud = NULL;
ctx->session_id = 0;
ctx->logfile = NULL;
ctx->init = false;
ctx->endless = false;
ctx->cpu_cost = 0;
ctx->cpu_start = 0;
ctx->message_count = 0;
ctx->profile = G_NODE.profile;
// Should set to 0 first to avoid skynet_handle_retireall get an uninitialized handle
ctx->handle = 0;
ctx->handle = skynet_handle_register(ctx); //----④
struct message_queue * queue = ctx->queue = skynet_mq_create(ctx->handle);//----⑤创建该服务的消息队列
// init function maybe use ctx->handle, so it must init at last
context_inc();
CHECKCALLING_BEGIN(ctx)
int r = skynet_module_instance_init(mod, inst, ctx, param);//----⑥module初始化
CHECKCALLING_END(ctx)
if (r == 0) {
struct skynet_context * ret = skynet_context_release(ctx);
if (ret) {
ctx->init = true;
}
skynet_globalmq_push(queue);
if (ret) {
skynet_error(ret, "LAUNCH %s %s", name, param ? param : "");
}
return ret;
} else {
skynet_error(ctx, "FAILED launch %s", name);
uint32_t handle = ctx->handle;
skynet_context_release(ctx);
skynet_handle_retire(handle);
struct drop_t d = { handle };
skynet_mq_release(queue, drop_message, &d);
return NULL;
}
}
skynet_context_new经历下面几个步骤:
1 从modules列表中,查找对应的服务模块,如果找到则返回,否则到modules的path中去查找对应的so库,创建一个skynet_module对象,将so库加载到内存,并将访问该so库的句柄和skynet_module对象关联(_try_open做了这件事),
//skynet_module.c
struct skynet_module *
skynet_module_query(const char * name) {
struct skynet_module * result = _query(name);
if (result)
return result;
SPIN_LOCK(M)
result = _query(name); // double check
if (result == NULL && M->count < MAX_MODULE_TYPE) {
int index = M->count;
void * dl = _try_open(M,name); //----①
if (dl) {
M->m[index].name = name;
M->m[index].module = dl;
if (open_sym(&M->m[index]) == 0) {//------②
M->m[index].name = skynet_strdup(name);
M->count ++;
result = &M->m[index];
}
}
}
SPIN_UNLOCK(M)
return result;
}
//skynet_module.c
static int
open_sym(struct skynet_module *mod) {
mod->create = get_api(mod, "_create");
mod->init = get_api(mod, "_init");
mod->release = get_api(mod, "_release");
mod->signal = get_api(mod, "_signal");
return mod->init == NULL;
}
其中,①会将so中的module加载到内存,②将so库中的xxx_create,xxx_init,xxx_signal,xxx_release四个函数地址赋值给skynet_module的create、init、signal和release四个函数中,这样这个skynet_module对象,就能调用so库中,对应的四个接口。
2 ②创建一个服务实例即skynet_context对象,他包含一个次级消息队列指针,服务模块指针(skynet_module对象,便于他访问module自定义的create、init、signal和release函数),由服务模块调用create接口创建的数据实例等。
3 ④将新创建的服务实例(skynet_context对象)注册到全局的服务列表中
//skynet_handle.c
struct handle_storage {
struct rwlock lock;
uint32_t harbor;
uint32_t handle_index; //最后一个skynet_context的下标 + 1
int slot_size; //格子总数
struct skynet_context ** slot;
int name_cap;
int name_count;
struct handle_name *name;
};
static struct handle_storage *H = NULL;
uint32_t
skynet_handle_register(struct skynet_context *ctx) {
struct handle_storage *s = H;
rwlock_wlock(&s->lock);
for (;;) {
int i;
uint32_t handle = s->handle_index;
for (i=0;i<s->slot_size;i++,handle++) {
if (handle > HANDLE_MASK) {
// 0 is reserved
handle = 1;
}
int hash = handle & (s->slot_size-1);
if (s->slot[hash] == NULL) {
s->slot[hash] = ctx;
s->handle_index = handle + 1;
rwlock_wunlock(&s->lock);
handle |= s->harbor;
return handle;
}
}
assert((s->slot_size*2 - 1) <= HANDLE_MASK);
struct skynet_context ** new_slot = skynet_malloc(s->slot_size * 2 * sizeof(struct skynet_context *));
memset(new_slot, 0, s->slot_size * 2 * sizeof(struct skynet_context *));
for (i=0;i<s->slot_size;i++) {
int hash = skynet_context_handle(s->slot[i]) & (s->slot_size * 2 - 1);
assert(new_slot[hash] == NULL);
new_slot[hash] = s->slot[i];
}
skynet_free(s->slot);
s->slot = new_slot;
s->slot_size *= 2;
}
}
4 ⑤初始化服务模块(skynet_module创建的数据实例),并在初始化函数中,注册新创建的skynet_context实例的callback函数
//service_snlua.c
static int
launch_cb(struct skynet_context * context, void *ud, int type, int session, uint32_t source , const void * msg, size_t sz) {
assert(type == 0 && session == 0);
struct snlua *l = ud;
skynet_callback(context, NULL, NULL);
int err = init_cb(l, context, msg, sz);
if (err) {
skynet_command(context, "EXIT", NULL);
}
return 0;
}
int
snlua_init(struct snlua *l, struct skynet_context *ctx, const char * args) {
int sz = strlen(args);
char * tmp = skynet_malloc(sz);
memcpy(tmp, args, sz);
skynet_callback(ctx, l , launch_cb);
const char * self = skynet_command(ctx, "REG", NULL);
uint32_t handle_id = strtoul(self+1, NULL, 16);//字符串转换为长整型 16进制
// it must be first message
skynet_send(ctx, 0, handle_id, PTYPE_TAG_DONTCOPY,0, tmp, sz);
return 0;
}
在bootstrap启动的是snlua服务,看一下这个服务是如何初始化的,首先设置callback函数,然后给自己发送一条协议,收到之后调用callback。
这里将launch_cb作为该snlua服务的callback函数,完成注册以后,向自己发送了一个消息,本snlua服务在接收到消息以后,就会调用launch_cb函数,此时,snlua服务的回调函数会被赋空值,并进行一次snlua绑定的lua_State的初始化
static int
init_cb(struct snlua *l, struct skynet_context *ctx, const char * args, size_t sz) {
lua_State *L = l->L;
l->ctx = ctx;
lua_gc(L, LUA_GCSTOP, 0); //LUA_GCSTOP停止gc LUA_GCRESTART重启 gc LUA_GCCOLLECT执行gc
lua_pushboolean(L, 1); /* signal for libraries to ignore env. vars. */
lua_setfield(L, LUA_REGISTRYINDEX, "LUA_NOENV");
luaL_openlibs(L);
lua_pushlightuserdata(L, ctx);
lua_setfield(L, LUA_REGISTRYINDEX, "skynet_context");
luaL_requiref(L, "skynet.codecache", codecache , 0);
lua_pop(L,1);
const char *path = optstring(ctx, "lua_path","./lualib/?.lua;./lualib/?/init.lua");
lua_pushstring(L, path);
lua_setglobal(L, "LUA_PATH");
const char *cpath = optstring(ctx, "lua_cpath","./luaclib/?.so");
lua_pushstring(L, cpath);
lua_setglobal(L, "LUA_CPATH");
const char *service = optstring(ctx, "luaservice", "./service/?.lua");
lua_pushstring(L, service);
lua_setglobal(L, "LUA_SERVICE");
const char *preload = skynet_command(ctx, "GETENV", "preload");
lua_pushstring(L, preload);
lua_setglobal(L, "LUA_PRELOAD");
lua_pushcfunction(L, traceback);
assert(lua_gettop(L) == 1);
const char * loader = optstring(ctx, "lualoader", "./lualib/loader.lua");
int r = luaL_loadfile(L,loader);
if (r != LUA_OK) {
skynet_error(ctx, "Can't load %s : %s", loader, lua_tostring(L, -1));
report_launcher_error(ctx);
return 1;
}
lua_pushlstring(L, args, sz);
r = lua_pcall(L,1,0,1);
if (r != LUA_OK) {
skynet_error(ctx, "lua loader error : %s", lua_tostring(L, -1));
report_launcher_error(ctx);
return 1;
}
lua_settop(L,0);
if (lua_getfield(L, LUA_REGISTRYINDEX, "memlimit") == LUA_TNUMBER) {
size_t limit = lua_tointeger(L, -1);
l->mem_limit = limit;
skynet_error(ctx, "Set memory limit to %.2f M", (float)limit / (1024 * 1024));
lua_pushnil(L);
lua_setfield(L, LUA_REGISTRYINDEX, "memlimit");
}
lua_pop(L, 1);
lua_gc(L, LUA_GCRESTART, 0);
return 0;
}
c初始化lua_State,先是将服务指针,skynet_context保存起来,以方便lua层调c的时候使用,然后就是一些配置设置,如设置lua服务脚本的存放路径,c服务so库的存放路径,lualib的存放路径等(加载和调用的时候,回到这些路径里找),然后该lua_State会加载一个用于执行指定脚本的loader.lua脚本,并将参数传给这个脚本(参数就是snlua服务绑定的lua脚本名称和传给这个脚本的参数拼起来的字符串),这里应该就是bootstrap.lua这个脚本。我们下面专门分析这个脚本。
5 将该服务实例(skynet_context实例)的次级消息队列,插入到全局消息队列中。
经过上面的步骤,一个c服务模块就被创建出来了,在回调函数被指定以后,其他服务发送给他的消息,会被pop出来,最终传给服务对应的callback函数,最后达到驱动服务的目的。
bootstrap.lua的启动
在前面skynet_start中调用bootstrap已经加载了bootstrap.lua这个服务,我们接下来看一下这个脚本是怎么执行的。
----bootstrap.lua
skynet.start(function()
local sharestring = tonumber(skynet.getenv "sharestring" or 4096)
memory.ssexpand(sharestring)
local standalone = skynet.getenv "standalone"
local launcher = assert(skynet.launch("snlua","launcher"))----------①
skynet.name(".launcher", launcher)--------------------②
local harbor_id = tonumber(skynet.getenv "harbor" or 0)
if harbor_id == 0 then
assert(standalone == nil)
standalone = true
skynet.setenv("standalone", "true")
local ok, slave = pcall(skynet.newservice, "cdummy")
if not ok then
skynet.abort()
end
skynet.name(".cslave", slave)
else
if standalone then
if not pcall(skynet.newservice,"cmaster") then
skynet.abort()
end
end
local ok, slave = pcall(skynet.newservice, "cslave")
if not ok then
skynet.abort()
end
skynet.name(".cslave", slave)
end
if standalone then
local datacenter = skynet.newservice "datacenterd"
skynet.name("DATACENTER", datacenter)
end
skynet.newservice "service_mgr"-----------------③
pcall(skynet.newservice,skynet.getenv "start" or "main")
skynet.exit()
end)
---skynet.lua
function skynet.start(start_func)
c.callback(skynet.dispatch_message)
init_thread = skynet.timeout(0, function()
skynet.init_service(start_func)
init_thread = nil
end)
end
调用执行skynet.start(),由于在上面把回调函数置为NULL,首选调用c.callback设置回调函数。然后skynet.init_service(start_func)执行start_func函数。首先会启动launcher服务①,还会启动service_mgr服务③。
launcher服务是在skynet刚刚启动的时候就创建出来的,所有的lua服务都是通过launcher服务启动,launcher服务会用来管理当前所有的lua服务。
网友评论