最近一年一直从事webrtc的开发,webrtc的服务性能测试也达到了大多大厂宣扬的并发性能,甚至略超过声网。这两天在我们的工程上实现了比较简单的Webrtc的CDN调度算法和故障转移,其中涉及到了如何根据服务器的资源进行调度,比如上行下行带宽/CPU占有率/内存占有率等,于是采用了webrtc服务器主动上报节点信息和资源到etcd。
api7项目的王院生实现了https://github.com/api7/lua-resty-etcd,不过它是基于openresty的ngx.socket和ngx.json和ngx.base64的,我们是不能再引入openresty的,于是去掉了这些依赖,不过工程中关于API的封装还是参考了他的实现。
目前etcd-lua使用的是lua5.3,且仅在etcd API V3上测试过,没有测试过v2。
工程放开源在https://github.com/zhiyong0804/etcd-lua
写了个例子如下,本来想写个API接口介绍的,但是怎奈作为程序员最痛恨的就是“没有文档,没有注释,要写文档,要写注释”。如果有人需要使用的话,看这个例子和源码就能理解API接口怎么样了,本来也不复杂的。
json = require('cjson')
sr = require('systeminfo')
etcd = require('etcd').new({
hosts = {
"http://ip1:port1",
"http://ip2:port2",
"http://ip3:port3"
},
prefix = "/v3alpha"
})
local leaseInfo = {}
local system = {}
local net = "eth0"
local bandwidth = 100*1024*1024
local interface = "eth0"
local usage = {} -- 当前服务器资源占用配置
local started = false
local cpu_core = 0
local key = "/test/127.0.0.1"
function report_system_info()
local cpu_usage = sr.getCpuOccupy()
local mem_usage = sr.calcMemUsage()
local rx_usage, tx_usage = sr.getTxrxUsage(interface)
if not started then
system["cpu"] = 0
system["mem"] = 0
system["rx"] = 0
system["tx"] = 0
started = true
else
system["cpu"] = sr.calcCpuUsage(usage["cpu"], cpu_usage)
system["mem"] = mem_usage
system["rx"], system["tx"] = sr.calcTxrxUsage(usage["rx"], usage["tx"], rx_usage, tx_usage, bandwidth)
end
-- 将当前获取的所有系统信息保存起来,以便计算资源占有率
usage["cpu"] = cpu_usage
usage["mem"] = mem_usage
usage["rx"] = rx_usage
usage["tx"] = tx_usage
system["country_code"] = "CN"
system["updated"] = os.time()
if etcd ~= nil then
local res, status = etcd:set(key, system, {timeout = 1, lease=leaseInfo.ID})
print("res : "..dumpTable(res).." status : ")
etcd:keepalive(leaseInfo.ID)
end
print("collect system info" .. dumpTable(system))
local res, status = etcd:get(key, {timeout = 1, lease=leaseInfo.ID})
print("get key response : "..dumpTable(res).." status : "..status)
end
-- Methods
function init()
print("Initializing...")
if etcd == nil then
print("Initializ failed since etcd is nil")
return nil
end
print("Initialized")
-- 申请key的租约, 一秒钟, 租约ID由服务器分配
local res, status = etcd:grant(1)
if res == nil then
print("err is : " .. err)
end
print("grant etcd lease response : " .. dumpTable(res))
if status ~= 200 or res.ID == nil then
print("grant etcd key lease failed.")
return nil;
end
leaseInfo = {
ID = res.ID,
ttl = 1,
}
cpu_core = sr.getCpuCore()
report_system_info()
end
-- Helper for logging tables
-- https://stackoverflow.com/a/27028488
function dumpTable(o)
if type(o) == 'table' then
local s = '{ '
for k,v in pairs(o) do
if type(k) ~= 'number' then k = '"'..k..'"' end
s = s .. '['..k..'] = ' .. dumpTable(v) .. ','
end
return s .. '} '
else
return tostring(o)
end
end
return init()
网友评论