前言
Ambari组件运维的脚本使用了Python。Python是一种脚本型语言,天生跨平台,不需要编译,开发和修改维护较为容易。Ambari使用Python对常见的运维命令进行了封装。这些运维命令大都位于ambari-common/src/main/python/resource_management/core/resources/
目录中。Ambari将这些命令的调用方法统一起来,用户不需要在脚本中编写系统命令(Linux shell或者Windows cmd脚本)。
最重要的是Ambari还实现了运维命令同时支持Linux和Windows系统。比如用户通过如下脚本创建一个文件:
File(path,
content=file_content,
owner=owner_user,
group=sample_group)
最终到系统层面执行的时候,Ambari会根据系统的类型,选择不同的执行方式。
本篇我们分析Ambari Python运维命令执行流程,以及Ambari如何实现同一套命令兼容不同操作系统。
阅读本文前需要提前了解的Python语言知识:
- Metaclass(元类)。Python元类是用于创建class的class。可以通过元类修改Python创建对象的过程。
- Descriptor(描述符)。描述符代理了class中属性的赋值和获取方式。类似于Java中setter和getter的作用。
执行逻辑分析
完整的代码逻辑结构和分析流程如下图所示:
Ambari Python 运维脚本逻辑结构和分析流程我们以操作文件的File
类为例,分析下它在Ambari中的执行流程。
File
这个类的源代码位于ambari-common/src/main/python/resource_management/core/resources/system.py
。代码和分析如下:
class File(Resource):
# 文件动作,可以是create和delete
action = ForcedListArgument(default="create")
# 操作文件的路径
# 默认值default可以配置为函数,返回File对象第一个不带key的参数值
path = ResourceArgument(default=lambda obj: obj.name)
# 是否备份操作文件
backup = ResourceArgument()
# 文件访问权限
mode = ResourceArgument()
# 文件所有者
owner = ResourceArgument()
# 文件所属group
group = ResourceArgument()
# 文件内容
content = ResourceArgument()
# whether to replace files with different content
# 如果path指向的文件存在,且内容和将要写入的内容不同,替换这个文件
replace = ResourceArgument(default=True)
# 文件编码
encoding = ResourceArgument()
"""
Grants x-bit for all the folders up-to the file
u - user who is owner
g - user from group
o - other users
a - all
The letters can be combined together.
"""
# 是否修改容纳此文件各层级目录的访问权限
cd_access = ResourceArgument()
actions = Resource.actions + ["create", "delete"]
我们可以看到File
中使用的参数都是通过XxxArgument
对象定义的。看名字应该是用来定义参数类型的。我们挑选几个有代表性的,分析源代码。
ResourceArgument
类位于ambari-common/src/main/python/resource_management/core/base.py
。代码如下:
class ResourceArgument(object):
# default默认值,required是否必填
def __init__(self, default=None, required=False):
self.required = False # Prevents the initial validate from failing
# default可以传入函数
if hasattr(default, '__call__'):
self.default = default
else:
# 校验default值
self.default = self.validate(default)
self.required = required
# 校验逻辑,如果必填,要求必须有值
def validate(self, value):
if self.required and value is None:
raise InvalidArgument("Required argument %s missing" % self.name)
return value
我们再来看看ForcedListArgument
的代码,它覆盖了父类的validate校验逻辑。如下所示:
class ForcedListArgument(ResourceArgument):
def validate(self, value):
# 先调用父类ResourceArgument的校验逻辑
value = super(ForcedListArgument, self).validate(value)
# 判断是否为tuple或者list类型
# 如果不是,转换成list类型
if not isinstance(value, (tuple, list)):
value = [value]
return value
除了上述两个类型的argument外还有BooleanArgument
,IntegerArgument
和PasswordArgument
等。Class代码和上面是类似的,不再赘述。这些类只是定义了参数的数据类型以及配套的校验方式,并不保存参数的值。大家可能会问它们的validate
方法在什么时候调用,别急,在后面分析到的时候会告诉大家。
我们回到File
类,它继承自Resource
。Resource
类代码第一行指定了metaclass。Resource
的metaclass为ResourceMetaclass
。Python的metaclass是用来创建对象的。metaclass可以在创建对象的时候对它做出修改。ResourceMetaclass
代码如下所示(位于ambari-common/src/main/python/resource_management/core/base.py
):
class ResourceMetaclass(type):
# def __new__(cls, name, bases, attrs):
# super_new = super(ResourceMetaclass, cls).__new__
# return super_new(cls, name, bases, attrs)
# 在创建Resource对象的时候执行
# 修改Resource对象的创建逻辑
def __init__(mcs, _name, bases, attrs):
# 获取父类中的_arguments属性,浅复制一份存储为Resource的_arguments属性(mcs实际上是Resource实例)
mcs._arguments = getattr(bases[0], '_arguments', {}).copy()
# 遍历类中定义的所有属性定义
# 可以从attrs拿到class中定义的所有属性,这个特性是下面__new__和__init__无法做到的
for key, value in list(attrs.items()):
# 如果属性是ResourceArgument类型
if isinstance(value, ResourceArgument):
# 将属性定义的name字段值设置为属性名
value.name = key
# 将是属性的key和value(XxxArgument对象本身)设置到_arguments中
# 即属性定义保存在了_arguments字典中
mcs._arguments[key] = value
# 将key包装为Accessor这个descriptor,存放到Resource实例中名字为key的属性中
setattr(mcs, key, Accessor(key))
ResourceMetaclass
的作用为将参数名和对应的参数定义存放到对象的_arguments
字典中。同时为这些参数值逐个配置Accessor
。Accessor
是描述符(descriptor)。它代理了属性值的访问和获取逻辑。Accessor
代码如下所示:
class Accessor(object):
def __init__(self, name):
# name为属性名称
self.name = name
def __get__(self, obj, cls):
try:
# name变量保存在持有Accessor对象的arguments字典中
# 从Resource对象的arguments字典获取属性值,key为属性名
# 和__set__方法对应
return obj.arguments[self.name]
except KeyError:
# 如果字典值不存在(该属性没有值)
# 获取属性定义的default(ResourceArgument的default变量)
val = obj._arguments[self.name].default
# default除了配置默认值外,还可以配置为函数
# 如果default能够被调用(说明它是函数)
if hasattr(val, '__call__'):
# 调用这个函数,传入obj
val = val(obj)
return val
# 给属性赋值的时候调用这个方法
def __set__(self, obj, value):
# 从Resource对象的_arguments字典中读取这个属性定义(XxxArgument),调用validate方法校验值
# 属性值保存在Resource对象的arguments字典中,key为属性名称
obj.arguments[self.name] = obj._arguments[self.name].validate(value)
Accessor
类相当于Java的set
和get
方法。将用户对XxxArgument
属性值的访问转化为对象内部arguments
字典内容的访问。真实的属性值在arguments
字典中保存(总结下,运维命令参数定义在_arguments
字典中,参数值在arguments
字典中)。同时在为属性赋值的时候调用其属性定义(XxxArgument
)中的validate
方法,对参数进行校验或者是转换后,再进行赋值。
分析完了metaclass和Accessor
,接下来我们分析Resource
类。代码如下:
class Resource(object):
# 配置metaclass为ResourceMetaclass 前面已分析过
__metaclass__ = ResourceMetaclass
# action是关键属性,Resource的子类会覆盖这个属性
# action决定了运维命令实际执行的方法名,类似Java反射方式动态查找方法名调用,后面分析
action = ForcedListArgument(default="nothing")
# 是否忽略错误
ignore_failures = BooleanArgument(default=False)
# 如果not_if条件(可以是boolean,函数或者是string形式的命令)返回true(或者命令执行后中状态码为0)
# 拒绝执行命令
not_if = ResourceArgument() # pass command e.g. not_if = ('ls','/root/jdk')
# 和not_if类似,如果返回false,拒绝执行命令。这两个相当于执行命令的前提条件
only_if = ResourceArgument() # pass command
# 执行命令前等待时间
initial_wait = ResourceArgument() # in seconds
# 默认actions为nothing,什么也不做
actions = ["nothing"]
# __new__方法修改创建实例的过程
# name为Resource对象的第一个不使用key传递的参数
# 比如前言中File用法示例的path参数
def __new__(cls, name, env=None, provider=None, **kwargs):
if isinstance(name, list):
# 如果name是list类型
# 复制一份到names_list
names_list = name[:]
while len(names_list) != 1:
# 递归,依次传入每个names_list的元素,创建Resource对象
cls(names_list.pop(0), env, provider, **kwargs)
# name取names_list最后一个元素
name = names_list[0]
# 获取Environment实例
env = env or Environment.get_instance()
# 获取provider实例
# provider是Ambari支持多系统的关键
# 可以在创建Resource实例的时候手工指定
# 如果没有指定,Ambari会通过find_provider方法找到匹配操作系统的provider
# 后面分析
provider = provider or getattr(cls, 'provider', None)
# 获取class名字
r_type = cls.__name__
if r_type not in env.resources:
env.resources[r_type] = {}
# 创建对象
obj = super(Resource, cls).__new__(cls)
# 将对象存放入env的resources字典中
env.resources[r_type][name] = obj
# 将对象存放到resource_list中
env.resource_list.append(obj)
return obj
# 初始化实例的时候调用,在__new__之后执行
def __init__(self, name, env=None, provider=None, **kwargs):
# 如果name是list类型,取最后一个
# 除了最后一个元素之外的会被递归调用创建对象,这里不用特殊处理,参见__new__方法
if isinstance(name, list):
name = name[-1]
# 如果实例有name属性,不再向下执行,一般情况到这里没有这个属性
if hasattr(self, 'name'):
return
# 创建env
self.env = env or Environment.get_instance()
self.name = name
# 获取provider
self.provider = provider or getattr(self, 'provider', None)
self.arguments = {}
# 逐个遍历__init__方法中的keyword参数
for key, value in kwargs.items():
try:
# 查找Resource实例是否定义了这个参数(init方法的kwargs的key必须和XxxArgument的名字相同)
# 如果没有定义,抛出异常
arg = self._arguments[key]
except KeyError:
raise Fail("%s received unsupported argument %s" % (self, key))
else:
try:
# 校验value,然后赋值给arguments[key]
# 和Accessor的__set__方法类似
self.arguments[key] = arg.validate(value)
except InvalidArgument, exc:
raise InvalidArgument("%s %s" % (self, exc))
# 如果不是测试模式,调用env的run方法执行
if not self.env.test_mode:
self.env.run()
def validate(self):
pass
def __repr__(self):
return unicode(self)
def __unicode__(self):
return u"%s[%s]" % (self.__class__.__name__, Logger._get_resource_name_repr(self.name))
# 序列化
def __getstate__(self):
return dict(
name=self.name,
provider=self.provider,
arguments=self.arguments,
env=self.env,
)
# 反序列化
def __setstate__(self, state):
self.name = state['name']
self.provider = state['provider']
self.arguments = state['arguments']
self.env = state['env']
self.validate()
通过上面分析发现实际执行运维命令的地方在Environment
。每个Resource
对象创建后都保存在了Environment
中。似乎谜底离我们不远了。我们继续分析Environment
的run
方法源代码(位于ambari-common/src/main/python/resource_management/core/environment.py
),如下所示:
def run(self):
# Run resource actions
# 遍历resource_list
# 所有的Resource对象会被加入这个list,详情见上面Resource代码分析
while self.resource_list:
# 取出并打印
resource = self.resource_list.pop(0)
Logger.info_resource(resource)
# 如果执行前需要等待,睡眠够等待时间
if resource.initial_wait:
time.sleep(resource.initial_wait)
# 检查not_if和only_if的条件
# _check_condition分别判断:
# 如果参数为boolean类型,直接返回
# 如果参数具有__call__(可调用),返回调用结果
# 如果参数是string,当作命令执行,执行返回的状态码如果为0返回True,其他情况返回False
if resource.not_if is not None and self._check_condition(
resource.not_if):
Logger.info("Skipping {0} due to not_if".format(resource))
continue
if resource.only_if is not None and not self._check_condition(
resource.only_if):
Logger.info("Skipping {0} due to only_if".format(resource))
continue
# 遍历resource对象中的action
# 根据是否忽略错误的配置,决定是否捕获处理异常
for action in resource.action:
if not resource.ignore_failures:
# 执行action
self.run_action(resource, action)
else:
try:
self.run_action(resource, action)
except Exception as ex:
Logger.info("Skipping failure of {0} due to ignore_failures. Failure reason: {1}".format(resource, ex.message))
pass
# Run delayed actions
# 最后才运行需要延迟执行的action
while self.delayed_actions:
action, resource = self.delayed_actions.pop()
self.run_action(resource, action)
通过上面Environment
代码的分析不难发现,Environment
将Ambari中所有创建出的Resource
对象组成了任务队列resource_list
,各个积压的任务(如果有的话)依次顺序执行。
接下来我们分析执行action的逻辑run_action
:
def run_action(self, resource, action):
# 通过find_provider方法查找系统和运维命令对应的provider
# resource.__class__.__name__是resource子类的名字
provider_class = find_provider(self, resource.__class__.__name__,
resource.provider)
# 创建出实例
provider = provider_class(resource)
try:
# 读取provider中名字为action_{action}方法
# action是从Resource的子类定义中获取到的
# 接下来分析provider代码的时候会发现大量action开头的方法
provider_action = getattr(provider, 'action_%s' % action)
except AttributeError:
raise Fail("%r does not implement action %s" % (provider, action))
# 执行provider_action
provider_action()
到这里为止我们接下来分析的重点为如何查找系统和运维命令对应的provider(find_provider
)以及provider的相关代码。
find_provider
的代码分析如下(位于ambari-common/src/main/python/resource_management/core/providers/__init__.py
):
PROVIDERS = dict(
# windows系统
winsrv=dict(
Service="resource_management.core.providers.windows.service.ServiceProvider",
ServiceConfig="resource_management.core.providers.windows.service.ServiceConfigProvider",
Execute="resource_management.core.providers.windows.system.ExecuteProvider",
File="resource_management.core.providers.windows.system.FileProvider",
Directory="resource_management.core.providers.windows.system.DirectoryProvider",
Package="resource_management.core.providers.package.choco.ChocoProvider"
),
# 默认情况(Linux系统)
default=dict(
File="resource_management.core.providers.system.FileProvider",
Directory="resource_management.core.providers.system.DirectoryProvider",
Link="resource_management.core.providers.system.LinkProvider",
Execute="resource_management.core.providers.system.ExecuteProvider",
ExecuteScript="resource_management.core.providers.system.ExecuteScriptProvider",
Mount="resource_management.core.providers.mount.MountProvider",
User="resource_management.core.providers.accounts.UserProvider",
Group="resource_management.core.providers.accounts.GroupProvider",
Service="resource_management.core.providers.service.ServiceProvider",
ServiceConfig="resource_management.core.providers.service.ServiceConfigProvider",
Package="resource_management.core.providers.packaging.PackageProvider",
),
)
def find_provider(env, resource, class_path=None):
# 这里传入的resource实际为resource子类的class名字
# 本文以File为例,File类继承了Resource,所以这里resource就是File
if not class_path:
# 获取所有的provider类全名
# 其中PROVIDERS为上面代码定义的类,主要为文件系统操作和命令执行,支持windows系统和linux系统
# LIBRARY_PROVIDERS位于ambari-common/src/main/python/resource_management/libraries/providers/__init__.py
# 主要为操作系统软件源管理和HDFS服务调用,模板生成和Xml文件操作等
providers = [PROVIDERS, LIBRARY_PROVIDERS]
# 遍历所有providers
for provider in providers:
os_family_provider = None
# system类定义位于ambari-common/src/main/python/resource_management/core/system.py
# os_family返回当前操作系统属于哪一系列的,比如redhat,debian等
# 找出适用于该系列操作系统的provider
if env.system.os_family in provider:
os_family_provider = provider[env.system.os_family]
else:
# take care of os extensions
# 同时也考虑操作系统所属的父系列(系列也分好多层级)是否属于某个provider的family
# 找出这个provider
for family in provider:
if OSCheck.is_in_family(env.system.os_family, family):
os_family_provider = provider[family]
# 找出适用于该系列操作系统的一系列provider中对应这个resource的provider
if os_family_provider and resource in os_family_provider:
class_path = os_family_provider[resource]
break
# 找出provider中名字为default的字典中,是否有resource对应的provider
if resource in provider["default"]:
class_path = provider["default"][resource]
break
try:
# 获取类的路径和类名称
mod_path, class_name = class_path.rsplit('.', 1)
except ValueError:
raise Fail("Unable to find provider for %s as %s" % (resource, class_path))
# 动态加载进来这个类
mod = __import__(mod_path, {}, {}, [class_name])
# 返回这个class
return getattr(mod, class_name)
上面的过程类似于Java的反射。通过System
类获取到运行环境的操作系统信息,然后根据这个信息找到专用于这个系统的一系列provider。接下来根据运维命令的class名称,找到对应的provider。以本文的File
为例。最终找到的provider为resource_management.core.providers.system.FileProvider
。我们接着分析它的代码(位于ambari-common/src/main/python/resource_management/core/providers/system.py
)。可以看到FileProvider
中定义了action_create
和action_delete
两个方法。回忆下Environment
的run_action
方法是怎么找到执行provider具体哪个方法的(前面已分析过):
provider_action = getattr(provider, 'action_%s' % action)
很明显,方法名字是action_
然后拼接Resource
子类的action
参数值。比如File
类,默认的action
参数值为create
,实际执行的就是FileProvider
类的action_create
方法。
FileProvider
的代码和分析如下:
class FileProvider(Provider):
# 创建文件的方法
def action_create(self):
# 找到path参数值
path = self.resource.path
# 判断路径是否是目录
# 这里sudo封装了一系列通过Python调用Linux shell命令的方法
if sudo.path_isdir(path):
raise Fail("Applying %s failed, directory with name %s exists" % (self.resource, path))
# 检查父目录是否存在
dirname = os.path.dirname(path)
if not sudo.path_isdir(dirname):
raise Fail("Applying %s failed, parent directory %s doesn't exist" % (self.resource, dirname))
# 是否写入
write = False
# 获取文件内容
content = self._get_content()
# 如果目标不存在,可以写入
if not sudo.path_exists(path):
write = True
reason = "it doesn't exist"
elif self.resource.replace:
# 到这里说明path指向的文件存在,需要替换
# 如果需要替换
if content is not None:
# 读取已经存在的文件原始内容
old_content = sudo.read_file(path, encoding=self.resource.encoding)
# 如果新老内容不同。需要写入
if content != old_content:
write = True
reason = "contents don't match"
if self.resource.backup:
# 如果需要备份,备份原文件内容到/tmp/resource_management/backup
self.resource.env.backup_file(path)
# 获取文件所有者和group信息
owner = self.resource.owner or 'root'
group = self.resource.group or 'root'
# 如果需要写入
if write:
Logger.info("Writing %s because %s" % (self.resource, reason))
def on_file_created(filename):
# 修改文件的元数据信息,包含所有者和group等权限信息
_ensure_metadata(filename, owner, group, mode=self.resource.mode, cd_access=self.resource.cd_access)
# sudo.create_file会在临时目录创建出文件,然后执行on_file_created方法,最后将创建好的文件移动到目标目录
Logger.info("Moving %s to %s" % (filename, path))
# 创建并写入文件
sudo.create_file(path, content, encoding=self.resource.encoding, on_file_created=on_file_created)
else:
# 如果不用写入,只需要修改所有者和group等权限信息
_ensure_metadata(path, owner, group, mode=self.resource.mode, cd_access=self.resource.cd_access)
# 删除文件的方法
def action_delete(self):
# 获取path
path = self.resource.path
# 如果是目录
if sudo.path_isdir(path):
raise Fail("Applying %s failed, %s is directory not file!" % (self.resource, path))
# 如果目标存在,删除之
if sudo.path_exists(path):
Logger.info("Deleting %s" % self.resource)
sudo.unlink(path)
# 获取文件内容
# 如果content是None,返回None
# 如果是string,返回这个string
# 如果可执行,返回执行结果
def _get_content(self):
content = self.resource.content
if content is None:
return None
elif isinstance(content, basestring):
return content
elif hasattr(content, "__call__"):
return content()
raise Fail("Unknown source type for %s: %r" % (self, content))
到这里为止我们以File
运维命令的执行全过程为例,分析完了Ambari是怎么使用同一套调用方式兼容多种不同系统的。Ambari的开发者灵活运用Python语言特性的方式非常值得我们学习。
本博客为作者原创,欢迎大家参与讨论和批评指正。如需转载请注明出处。
网友评论