SCFCLI和VSCode的插件很早之前就有了,也有很多用户在使用这两个工具, 使用它们可以通过已经写的Yaml直接把函数部署到线上,非常方便。但是眼尖的小伙伴已经发现这样一件事:SCFCLI和VSCode插件貌似很久都没更新了,而且云函数的小伙伴在极力推动大家使用Serverless Framework。可以看到SCFCLI在6月份的时候更新频繁,而现在,更新缓慢。
同时,细心的小伙伴还发现了,Serverless Framework貌似比SCFCLI好用啊!因为它不仅仅可以部署函数,还可以部署APIGW,COS,CDN......最主要,通过Serverless Frameworek还可以快速的把常用的框架,例如Express、Flask、Django.....等直接部署到云函数上,这是多方便的一件事情啊!
就问还在用SCFCLI的小伙伴,你们酸没酸?
有的小伙伴在跃跃欲试之后,终于下定决心,将SCFCLI放弃掉,开始使用Serverless Framework,那么问题来了,这两个东西的Yaml写的不一样,是完全不一样,那么应该如何把已有的SCFCLI/VSCode插件的Yaml快速转换成为Serverless Framework的Yaml呢?
下面就是几个简单的实现方法:
通过网页进行转换
作为社区爱好者,我必然要提供一个简单的网页,来做这个事情:
http://serverless.0duzhan.com/app/scf_2_serverless/
通过这个网址,你只需要输入基于SCFCLI的Yaml,就可以快速转换:
image转换结果是可以同时生成Component的Yaml和Plugin的Yaml
通过接口自主转换
接口地址:
- Component:http://service-8d3fi753-1256773370.bj.apigw.tencentcs.com/release/scf_2_serverless/components/
- Plugin:http://service-8d3fi753-1256773370.bj.apigw.tencentcs.com/release/scf_2_serverless/plugin/
输入参数:yaml,字符串类型,就是之前SCFCLI/VSCODE插件的Yaml内容
输出参数:error,布尔类型,是否出错;result,字符串类型,结果(error为true,此处输出错误信息,为false时输出新的yaml结果)
以Python语言为例,将原有的Yaml转换成为Component的Yaml可以这样操作:
import urllib.request
import json
with open("template.yaml", 'r') as f:
yamlData = f.read()
url = "http://service-8d3fi753-1256773370.bj.apigw.tencentcs.com/release/scf_2_serverless/components/"
data = {
"yaml": yamlData
}
yamlResult = json.loads(urllib.request.urlopen(urllib.request.Request(url=url, data=json.dumps(data).encode("utf-8"))).read().decode("utf-8"))
print(yamlResult)
with open("output.yaml", "w") as f:
f.write(yamlResult['result'])
这样就可以把已有的SCFCLI Yaml转换成Serverless Component的Yaml了,Plugin的转换方法同理,只需要更换一个一下url地址就好了。
自己写本地脚本来转
def getBaseFunctionComponents(functionInformation, functionName=None, tempInputs=None):
tempInputs = tempInputs if tempInputs else {}
if functionName:
tempInputs["name"] = functionName
if isinstance(functionInformation, dict):
for eveFunctionKey, eveFunctionValue in functionInformation.items():
if eveFunctionKey not in ["Events", "Environment", "VpcConfig", "Type", "Role"]:
tempKey = str.lower(eveFunctionKey[0]) + eveFunctionKey[1:]
tempInputs[tempKey] = eveFunctionValue
else:
if eveFunctionKey == "Environment":
if eveFunctionValue and "Variables" in eveFunctionValue:
tempEnvironment = {
"variables": eveFunctionValue["Variables"]
}
tempInputs["environment"] = tempEnvironment
elif eveFunctionKey == "VpcConfig":
tempVpcConfig = {}
if eveFunctionValue and "SubnetId" in eveFunctionValue:
tempSubnetId = eveFunctionValue["SubnetId"]
tempVpcConfig["subnetId"] = tempSubnetId
if eveFunctionValue and "VpcId" in eveFunctionValue:
tempVpcId = eveFunctionValue["VpcId"]
tempVpcConfig["vpcId"] = tempVpcId
tempInputs["vpcConfig"] = tempVpcConfig
elif eveFunctionKey == "Events":
tempEvents = []
if isinstance(eveFunctionValue, dict):
for eveEventKey, eveEventValue in eveFunctionValue.items():
if isinstance(eveEventValue["Properties"], dict):
tempEvent = {}
if eveEventValue["Type"] == "APIGW":
tempEvent['apigw'] = {
"name": eveEventKey,
"parameters": {}
}
tempParameter = {}
tempEndpoints = {"path": "/" + functionName}
for eveParameterKey, eveParameterValue in eveEventValue["Properties"].items():
if eveParameterKey == "StageName":
tempParameter["environment"] = eveParameterValue
elif eveParameterKey == "ServiceId" and eveParameterValue:
tempParameter["serviceId"] = eveParameterValue
elif eveParameterKey == "HttpMethod":
tempEndpoints["method"] = eveParameterValue
elif eveParameterKey == "IntegratedResponse":
tempEndpoints["function"] = {"isIntegratedResponse": eveParameterValue}
tempParameter["endpoints"] = [tempEndpoints, ]
tempEvent['apigw']["parameters"] = tempParameter
elif eveEventValue["Type"] == "COS":
tempEvent['cos'] = {
"name": eveEventKey,
"parameters": {}
}
tempParameter = {}
for eveParameterKey, eveParameterValue in eveEventValue["Properties"].items():
if eveParameterKey == "Filter":
tempFilter = {}
for eveFilterKey, eveFilterValue in eveParameterValue.items():
tempKey = str.lower(eveFilterKey[0]) + eveFilterKey[1:]
tempFilter[tempKey] = eveFilterValue
tempParameter["filter"] = tempFilter
else:
tempKey = str.lower(eveParameterKey[0]) + eveParameterKey[1:]
tempParameter[tempKey] = eveParameterValue
tempEvent['cos']["parameters"] = tempParameter
elif eveEventValue["Type"] == "Timer":
tempEvent['timer'] = {
"name": eveEventKey,
"parameters": {}
}
tempParameter = {}
for eveParameterKey, eveParameterValue in eveEventValue["Properties"].items():
tempKey = str.lower(eveParameterKey[0]) + eveParameterKey[1:]
tempParameter[tempKey] = eveParameterValue
tempEvent['timer']["parameters"] = tempParameter
elif eveEventValue["Type"] == "CMQ":
tempEvent['cmq'] = {
"name": eveEventKey,
"parameters": {}
}
tempParameter = {}
for eveParameterKey, eveParameterValue in eveEventValue["Properties"].items():
tempKey = str.lower(eveParameterKey[0]) + eveParameterKey[1:]
tempParameter[tempKey] = eveParameterValue
tempEvent['cmq']["parameters"] = tempParameter
elif eveEventValue["Type"] == "CKafka":
tempEvent['ckafka'] = {
"name": eveEventKey,
"parameters": {}
}
tempParameter = {}
for eveParameterKey, eveParameterValue in eveEventValue["Properties"].items():
tempKey = str.lower(eveParameterKey[0]) + eveParameterKey[1:]
tempParameter[tempKey] = eveParameterValue
tempEvent['ckafka']["parameters"] = tempParameter
tempEvents.append(tempEvent)
if tempEvents:
tempInputs["events"] = tempEvents
return tempInputs
def getFunctionComponents(functionName, function, tempInputs):
isFunction = False
if isinstance(function, dict):
for eveKey, eveValue in function.items():
if eveKey == "Type" and eveValue == "TencentCloud::Serverless::Function":
isFunction = True
if isFunction:
for eveKey, eveValue in function.items():
if eveKey == "Type" and eveValue == "TencentCloud::Serverless::Function":
continue
else:
tempInputs = getBaseFunctionComponents(eveValue, functionName, tempInputs)
serverlessPluginYaml = {
"component": '@serverless/tencent-scf',
"inputs": tempInputs
}
return serverlessPluginYaml
else:
return False
def getEventPlugin(eventType, eventName, eventSource, deepList=[]):
tempEvent = {}
tempEvent[eventType] = {
"name": eventName,
"parameters": {}
}
tempParameter = {}
for eveParameterKey, eveParameterValue in eventSource["Properties"].items():
tempKey = str.lower(eveParameterKey[0]) + eveParameterKey[1:]
if deepList and eveParameterKey in deepList:
tempDeepData = {}
for eveFilterKey, eveFilterValue in eveParameterValue.items():
tempThisKey = str.lower(eveFilterKey[0]) + eveFilterKey[1:]
tempDeepData[tempThisKey] = eveFilterValue
tempParameter[tempKey] = tempDeepData
continue
tempParameter[tempKey] = eveParameterValue
tempEvent[eventType]["parameters"] = tempParameter
return tempEvent
def getBaseFunctionPlugin(functionInformation, functionName=None, tempInputs=None):
tempInputs = tempInputs if tempInputs else {}
if functionName:
tempInputs["name"] = functionName
if isinstance(functionInformation, dict):
for eveFunctionKey, eveFunctionValue in functionInformation.items():
if eveFunctionKey not in ["Events", "Environment", "VpcConfig", "Type", "Role"]:
tempKey = str.lower(eveFunctionKey[0]) + eveFunctionKey[1:]
tempInputs[tempKey] = eveFunctionValue
else:
if eveFunctionKey == "Environment":
if eveFunctionValue and "Variables" in eveFunctionValue:
tempEnvironment = {
"variables": eveFunctionValue["Variables"]
}
tempInputs["environment"] = tempEnvironment
elif eveFunctionKey == "VpcConfig":
tempVpcConfig = {}
if eveFunctionValue and "SubnetId" in eveFunctionValue:
tempSubnetId = eveFunctionValue["SubnetId"]
tempVpcConfig["subnetId"] = tempSubnetId
if eveFunctionValue and "VpcId" in eveFunctionValue:
tempVpcId = eveFunctionValue["VpcId"]
tempVpcConfig["vpcId"] = tempVpcId
tempInputs["vpcConfig"] = tempVpcConfig
elif eveFunctionKey == "Events":
tempEvents = []
if isinstance(eveFunctionValue, dict):
for eveEventKey, eveEventValue in eveFunctionValue.items():
if isinstance(eveEventValue["Properties"], dict):
tempEvent = {}
if eveEventValue["Type"] == "APIGW":
tempEvent = getEventPlugin("apigw", eveEventKey, eveEventValue)
elif eveEventValue["Type"] == "COS":
tempEvent = getEventPlugin("cos", eveEventKey, eveEventValue, ["Filter"])
elif eveEventValue["Type"] == "Timer":
tempEvent = getEventPlugin("timer", eveEventKey, eveEventValue)
elif eveEventValue["Type"] == "CMQ":
tempEvent['cmq'] = getEventPlugin("cmq", eveEventKey, eveEventValue)
elif eveEventValue["Type"] == "CKafka":
tempEvent = getEventPlugin("ckafka", eveEventKey, eveEventValue)
tempEvents.append(tempEvent)
if tempEvents:
tempInputs["events"] = tempEvents
return tempInputs
def getFunctionPlugin(functionName, function, tempInputs):
isFunction = False
if isinstance(function, dict):
for eveKey, eveValue in function.items():
if eveKey == "Type" and eveValue == "TencentCloud::Serverless::Function":
isFunction = True
if isFunction:
for eveKey, eveValue in function.items():
if eveKey == "Type" and eveValue == "TencentCloud::Serverless::Function":
continue
else:
return getBaseFunctionPlugin(eveValue, functionName, tempInputs)
else:
return False
def doComponents(scfYaml):
try:
yamlData = yaml.load(scfYaml)
functions = {}
if "Globals" in yamlData:
inputs = getBaseFunctionComponents(yamlData["Globals"]["Function"])
if isinstance(yamlData['Resources'], dict):
for eveKey, eveValue in yamlData['Resources'].items():
for eveNamespaceKey, eveNamespaceValue in eveValue.items():
tempInputs = inputs.copy()
if eveNamespaceKey == "Type" and eveNamespaceValue == "TencentCloud::Serverless::Namespace":
continue
tempFunction = getFunctionComponents(eveNamespaceKey, eveNamespaceValue, tempInputs)
if tempFunction:
functions[eveNamespaceKey] = tempFunction
return {
"error": False,
"result": yaml.safe_dump(functions)
}
except:
return {
"error": True,
"result": "Scf Yaml未能正常转换为Serverless Component Yaml"
}
def doPlugin(scfYaml):
try:
yamlData = yaml.load(scfYaml)
# 获取Provider
print("获取Provider")
if "Globals" in yamlData:
provider = getBaseFunctionPlugin(yamlData["Globals"]["Function"])
provider["name"] = "tencent"
provider["credentials"] = "~/credentials"
# 获取service
print("获取Service")
service = "Tencent-Serverless-Framework"
# 获取插件
print("获取Plugin")
plugin = ["serverless-tencent-scf"]
# 获取函数
print("获取Function")
functions = {}
if isinstance(yamlData['Resources'], dict):
for eveKey, eveValue in yamlData['Resources'].items():
for eveNamespaceKey, eveNamespaceValue in eveValue.items():
tempInputs = {}
if eveNamespaceKey == "Type" and eveNamespaceValue == "TencentCloud::Serverless::Namespace":
continue
tempFunction = getFunctionPlugin(eveNamespaceKey, eveNamespaceValue, tempInputs)
if tempFunction:
functions[eveNamespaceKey] = tempFunction
serverlessJson = {
"service": service,
"provider": provider,
"plugins": plugin,
"functions": functions
}
return {
"error": False,
"result": yaml.safe_dump(serverlessJson)
}
except Exception as e:
print(e)
return {
"error": True,
"result": "Scf Yaml未能正常转换为Serverless Plugin Yaml"
}
是的,就是这么粗暴,上来就扔代码。
使用方法很简单,如果是转Plugin:
pluginYaml = doPlugin(scfYaml)
如果是转Component:
componentYaml = doComponent(scfYaml)
代码不是很好看,但是可以用,所以各位大佬轻喷就好。另外代码开源了,可以参考:https://github.com/anycodes/ServerlessPractice/tree/master/scf_2_serverless
image
网友评论