Ansible的runner源码剖析第三部分(run函数的学习)
(1)run函数学习(api函数入口程序,非常重要)
def run(self):
'''1.判断主机列表是否存在'''
if not self.run_hosts:
self.run_hosts = self.inventory.list_hosts(self.pattern)
# hosts文件就是一堆包含ip的列表['192.168.1.101']
hosts = self.run_hosts
#空主机直接返回结果
if len(hosts) == 0:
self.callbacks.on_no_hosts()
return dict(contacted={}, dark={})
'''2.全局多进程变量,为实例自己'''
global multiprocessing_runner
multiprocessing_runner = self
results = None
# Check if this is an action plugin. Some of them are designed
# to be ran once per group of hosts. Example module: pause,
# run once per hostgroup, rather than pausing once per each
# host.
'''3.通过模块名找到插件'''
p = utils.plugins.action_loader.get(self.module_name, self)
'''4.优化进程数量,如果进程数量大于主机数量,按照主机数量来跑多进程'''
if self.forks == 0 or self.forks > len(hosts):
self.forks = len(hosts)
if (p and (getattr(p, 'BYPASS_HOST_LOOP', None)) or self.run_once):
# Expose the current hostgroup to the bypassing plugins
self.host_set = hosts
# We aren't iterating over all the hosts in this
# group. So, just choose the "delegate_to" host if that is defined and is
# one of the targeted hosts, otherwise pick the first host in our group to
# construct the conn object with.
if self.delegate_to is not None and self.delegate_to in hosts:
host = self.delegate_to
else:
host = hosts[0]
result_data = self._executor(host, None).result
# Create a ResultData item for each host in this group
# using the returned result. If we didn't do this we would
# get false reports of dark hosts.
results = [ ReturnData(host=h, result=result_data, comm_ok=True) \
for h in hosts ]
del self.host_set
'''5.把主机列表传递给函数去跑结果'''
elif self.forks > 1:
try:
'''去跑主机列表结果'''
results = self._parallel_exec(hosts)
except IOError, ie:
print ie.errno
if ie.errno == 32:
# broken pipe from Ctrl+C
raise errors.AnsibleError("interrupted")
raise
else:
results = [ self._executor(h, None) for h in hosts ]
#6.结果处理
return self._partition_results(results)
(2)_parallel_exec函数学习(跑主机结果列表函数)
def _parallel_exec(self, hosts):
''' handles mulitprocessing when more than 1 fork is required '''
manager = multiprocessing.Manager()
'''1任务队列'''
job_queue = manager.Queue()
for host in hosts:
job_queue.put(host)
'''2.结果队列'''
result_queue = manager.Queue()
try:
fileno = sys.stdin.fileno()
except ValueError:
fileno = None
workers = []
'''4.起forks进程数的进程去执行_executor_hook函数,函数的参数是任务队列,结果队列,以及new_stdin,按照最上面例子定义forks=5,则此处self.forks就是5'''
for i in range(self.forks):
new_stdin = None
if fileno is not None:
try:
new_stdin = os.fdopen(os.dup(fileno))
except OSError, e:
# couldn't dupe stdin, most likely because it's
# not a valid file descriptor, so we just rely on
# using the one that was passed in
pass
'''起5个进程同时跑_executor_hook函数,参数分别是任务队列job_queue,结果队列result_queue,new_stdin'''
prc = multiprocessing.Process(target=_executor_hook,
args=(job_queue, result_queue, new_stdin))
prc.start()
workers.append(prc)
try:
'''6.保证每个子进程都执行完毕'''
for worker in workers:
worker.join()
except KeyboardInterrupt:
for worker in workers:
worker.terminate()
worker.join()
results = []
try:
'''7.结果队列不为空的话,不断取出结果,放到result里'''
while not result_queue.empty():
results.append(result_queue.get(block=False))
except socket.error:
raise errors.AnsibleError("<interrupted>")
return results
(3)_executor_hook函数
def _executor_hook(job_queue, result_queue, new_stdin):
# attempt workaround of https://github.com/newsapps/beeswithmachineguns/issues/17
# this function also not present in CentOS 6
if HAS_ATFORK:
atfork()
signal.signal(signal.SIGINT, signal.SIG_IGN)
'''1.任务队列不为空,不断跑任务'''
while not job_queue.empty():
try:
'''2.取一个IP出来,然后去跑IP,结果为return_data'''
host = job_queue.get(block=False)
'''3.调用全局变量下的实例方法_executor,结果输出到结果队列中'''
return_data = multiprocessing_runner._executor(host, new_stdin)
result_queue.put(return_data)
except Queue.Empty:
pass
except:
traceback.print_exc()
(4)_executor函数
def _executor(self, host, new_stdin):
''' handler for multiprocessing library '''
try:
fileno = sys.stdin.fileno()
except ValueError:
fileno = None
try:
self._new_stdin = new_stdin
if not new_stdin and fileno is not None:
try:
self._new_stdin = os.fdopen(os.dup(fileno))
except OSError, e:
# couldn't dupe stdin, most likely because it's
# not a valid file descriptor, so we just rely on
# using the one that was passed in
pass
'''1.正常传递host,去跑数据'''
exec_rc = self._executor_internal(host, new_stdin)
'''2.如果返回类型不对,抛出异常'''
if type(exec_rc) != ReturnData:
raise Exception("unexpected return type: %s" % type(exec_rc))
# redundant, right?
'''3.如果没有联通的话,callbacks调用不可达函数处理数据'''
if not exec_rc.comm_ok:
self.callbacks.on_unreachable(host, exec_rc.result)
return exec_rc
except errors.AnsibleError, ae:
msg = str(ae)
self.callbacks.on_unreachable(host, msg)
return ReturnData(host=host, comm_ok=False, result=dict(failed=True, msg=msg))
except Exception:
msg = traceback.format_exc()
self.callbacks.on_unreachable(host, msg)
return ReturnData(host=host, comm_ok=False, result=dict(failed=True, msg=msg))
(5)_executor_internal函数
def _executor_internal(self, host, new_stdin):
''' executes any module one or more times '''
# We build the proper injected dictionary for all future
# templating operations in this run
inject = self.get_inject_vars(host)
# Then we selectively merge some variable dictionaries down to a
# single dictionary, used to template the HostVars for this host
temp_vars = self.inventory.get_variables(host, vault_password=self.vault_pass)
temp_vars = utils.combine_vars(temp_vars, inject['combined_cache'] )
temp_vars = utils.combine_vars(temp_vars, {'groups': inject['groups']})
temp_vars = utils.combine_vars(temp_vars, self.play_vars)
temp_vars = utils.combine_vars(temp_vars, self.play_file_vars)
temp_vars = utils.combine_vars(temp_vars, self.extra_vars)
hostvars = HostVars(temp_vars, self.inventory, vault_password=self.vault_pass)
# and we save the HostVars in the injected dictionary so they
# may be referenced from playbooks/templates
inject['hostvars'] = hostvars
host_connection = inject.get('ansible_connection', self.transport)
if host_connection in [ 'paramiko', 'ssh', 'accelerate' ]:
port = hostvars.get('ansible_ssh_port', self.remote_port)
if port is None:
port = C.DEFAULT_REMOTE_PORT
else:
# fireball, local, etc
port = self.remote_port
if self.inventory.basedir() is not None:
inject['inventory_dir'] = self.inventory.basedir()
if self.inventory.src() is not None:
inject['inventory_file'] = self.inventory.src()
# could be already set by playbook code
inject.setdefault('ansible_version', utils.version_info(gitinfo=False))
# allow with_foo to work in playbooks...
items = None
items_plugin = self.module_vars.get('items_lookup_plugin', None)
if items_plugin is not None and items_plugin in utils.plugins.lookup_loader:
basedir = self.basedir
if '_original_file' in inject:
basedir = os.path.dirname(inject['_original_file'])
filesdir = os.path.join(basedir, '..', 'files')
if os.path.exists(filesdir):
basedir = filesdir
try:
items_terms = self.module_vars.get('items_lookup_terms', '')
items_terms = template.template(basedir, items_terms, inject)
items = utils.plugins.lookup_loader.get(items_plugin, runner=self, basedir=basedir).run(items_terms, inject=inject)
except errors.AnsibleUndefinedVariable, e:
if 'has no attribute' in str(e):
# the undefined variable was an attribute of a variable that does
# exist, so try and run this through the conditional check to see
# if the user wanted to skip something on being undefined
if utils.check_conditional(self.conditional, self.basedir, inject, fail_on_undefined=True):
# the conditional check passed, so we have to fail here
raise
else:
# the conditional failed, so we skip this task
result = utils.jsonify(dict(changed=False, skipped=True))
self.callbacks.on_skipped(host, None)
return ReturnData(host=host, result=result)
except errors.AnsibleError, e:
raise
except Exception, e:
raise errors.AnsibleError("Unexpected error while executing task: %s" % str(e))
# strip out any jinja2 template syntax within
# the data returned by the lookup plugin
items = utils._clean_data_struct(items, from_remote=True)
if items is None:
items = []
else:
if type(items) != list:
raise errors.AnsibleError("lookup plugins have to return a list: %r" % items)
if len(items) and utils.is_list_of_strings(items) and self.module_name in [ 'apt', 'yum', 'pkgng', 'zypper' ]:
# hack for apt, yum, and pkgng so that with_items maps back into a single module call
use_these_items = []
for x in items:
inject['item'] = x
if not self.conditional or utils.check_conditional(self.conditional, self.basedir, inject, fail_on_undefined=self.error_on_undefined_vars):
use_these_items.append(x)
inject['item'] = ",".join(use_these_items)
items = None
def _safe_template_complex_args(args, inject):
# Ensure the complex args here are a dictionary, but
# first template them if they contain a variable
returned_args = args
if isinstance(args, basestring):
# If the complex_args were evaluated to a dictionary and there are
# more keys in the templated version than the evaled version, some
# param inserted additional keys (the template() call also runs
# safe_eval on the var if it looks like it's a datastructure). If the
# evaled_args are not a dict, it's most likely a whole variable (ie.
# args: {{var}}), in which case there's no way to detect the proper
# count of params in the dictionary.
templated_args = template.template(self.basedir, args, inject, convert_bare=True)
evaled_args = utils.safe_eval(args)
if isinstance(evaled_args, dict) and len(evaled_args) > 0 and len(evaled_args) != len(templated_args):
raise errors.AnsibleError("a variable tried to insert extra parameters into the args for this task")
# set the returned_args to the templated_args
returned_args = templated_args
# and a final check to make sure the complex args are a dict
if returned_args is not None and not isinstance(returned_args, dict):
raise errors.AnsibleError("args must be a dictionary, received %s" % returned_args)
return returned_args
# logic to decide how to run things depends on whether with_items is used
if items is None:
complex_args = _safe_template_complex_args(self.complex_args, inject)
return self._executor_internal_inner(host, self.module_name, self.module_args, inject, port, complex_args=complex_args)
elif len(items) > 0:
# executing using with_items, so make multiple calls
# TODO: refactor
if self.background > 0:
raise errors.AnsibleError("lookup plugins (with_*) cannot be used with async tasks")
all_comm_ok = True
all_changed = False
all_failed = False
results = []
for x in items:
# use a fresh inject for each item
this_inject = inject.copy()
this_inject['item'] = x
complex_args = _safe_template_complex_args(self.complex_args, this_inject)
'''调用用别的函数这次是真的去跑数据了'''
result = self._executor_internal_inner(
host,
self.module_name,
self.module_args,
this_inject,
port,
complex_args=complex_args
)
if 'stdout' in result.result and 'stdout_lines' not in result.result:
result.result['stdout_lines'] = result.result['stdout'].splitlines()
results.append(result.result)
if result.comm_ok == False:
all_comm_ok = False
all_failed = True
break
for x in results:
if x.get('changed') == True:
all_changed = True
if (x.get('failed') == True) or ('failed_when_result' in x and [x['failed_when_result']] or [('rc' in x) and (x['rc'] != 0)])[0]:
all_failed = True
break
msg = 'All items completed'
if all_failed:
msg = "One or more items failed."
rd_result = dict(failed=all_failed, changed=all_changed, results=results, msg=msg)
if not all_failed:
del rd_result['failed']
return ReturnData(host=host, comm_ok=all_comm_ok, result=rd_result)
else:
self.callbacks.on_skipped(host, None)
return ReturnData(host=host, comm_ok=True, result=dict(changed=False, skipped=True))
# *****************************************************
(6)_executor_internal_inner函数
def _executor_internal_inner(self, host, module_name, module_args, inject, port, is_chained=False, complex_args=None):
''' decides how to invoke a module '''
# late processing of parameterized become_user (with_items,..)
if self.become_user_var is not None:
self.become_user = template.template(self.basedir, self.become_user_var, inject)
# module_name may be dynamic (but cannot contain {{ ansible_ssh_user }})
module_name = template.template(self.basedir, module_name, inject)
if module_name in utils.plugins.action_loader:
if self.background != 0:
raise errors.AnsibleError("async mode is not supported with the %s module" % module_name)
handler = utils.plugins.action_loader.get(module_name, self)
elif self.background == 0:
handler = utils.plugins.action_loader.get('normal', self)
else:
handler = utils.plugins.action_loader.get('async', self)
if type(self.conditional) != list:
self.conditional = [ self.conditional ]
for cond in self.conditional:
if not utils.check_conditional(cond, self.basedir, inject, fail_on_undefined=self.error_on_undefined_vars):
result = dict(changed=False, skipped=True)
if self.no_log:
result = utils.censor_unlogged_data(result)
self.callbacks.on_skipped(host, result)
else:
self.callbacks.on_skipped(host, inject.get('item',None))
return ReturnData(host=host, result=utils.jsonify(result))
if getattr(handler, 'setup', None) is not None:
handler.setup(module_name, inject)
conn = None
actual_host = inject.get('ansible_ssh_host', host)
# allow ansible_ssh_host to be templated
actual_host = template.template(self.basedir, actual_host, inject, fail_on_undefined=True)
actual_port = port
actual_user = inject.get('ansible_ssh_user', self.remote_user)
actual_pass = inject.get('ansible_ssh_pass', self.remote_pass)
actual_transport = inject.get('ansible_connection', self.transport)
actual_private_key_file = inject.get('ansible_ssh_private_key_file', self.private_key_file)
actual_private_key_file = template.template(self.basedir, actual_private_key_file, inject, fail_on_undefined=True)
self.become = utils.boolean(inject.get('ansible_become', inject.get('ansible_sudo', inject.get('ansible_su', self.become))))
self.become_user = inject.get('ansible_become_user', inject.get('ansible_sudo_user', inject.get('ansible_su_user',self.become_user)))
self.become_pass = inject.get('ansible_become_pass', inject.get('ansible_sudo_pass', inject.get('ansible_su_pass', self.become_pass)))
self.become_exe = inject.get('ansible_become_exe', inject.get('ansible_sudo_exe', self.become_exe))
self.become_method = inject.get('ansible_become_method', self.become_method)
# select default root user in case self.become requested
# but no user specified; happens e.g. in host vars when
# just ansible_become=True is specified
if self.become and self.become_user is None:
self.become_user = 'root'
if actual_private_key_file is not None:
actual_private_key_file = os.path.expanduser(actual_private_key_file)
if self.accelerate and actual_transport != 'local':
#Fix to get the inventory name of the host to accelerate plugin
if inject.get('ansible_ssh_host', None):
self.accelerate_inventory_host = host
else:
self.accelerate_inventory_host = None
# if we're using accelerated mode, force the
# transport to accelerate
actual_transport = "accelerate"
if not self.accelerate_port:
self.accelerate_port = C.ACCELERATE_PORT
actual_port = inject.get('ansible_ssh_port', port)
# the delegated host may have different SSH port configured, etc
# and we need to transfer those, and only those, variables
self.delegate_to = inject.get('delegate_to', None)
if self.delegate_to:
self.delegate_to = template.template(self.basedir, self.delegate_to, inject)
if self.delegate_to is not None:
delegate = self._compute_delegate(actual_pass, inject)
actual_transport = delegate['transport']
actual_host = delegate['ssh_host']
actual_port = delegate['port']
actual_user = delegate['user']
actual_pass = delegate['pass']
actual_private_key_file = delegate['private_key_file']
self.become_pass = delegate.get('become_pass',delegate.get('sudo_pass'))
inject = delegate['inject']
# set resolved delegate_to into inject so modules can call _remote_checksum
inject['delegate_to'] = self.delegate_to
# user/pass may still contain variables at this stage
actual_user = template.template(self.basedir, actual_user, inject)
try:
actual_pass = template.template(self.basedir, actual_pass, inject)
self.become_pass = template.template(self.basedir, self.become_pass, inject)
except:
# ignore password template errors, could be triggered by password charaters #10468
pass
# make actual_user available as __magic__ ansible_ssh_user variable
inject['ansible_ssh_user'] = actual_user
try:
if actual_transport == 'accelerate':
# for accelerate, we stuff both ports into a single
# variable so that we don't have to mangle other function
# calls just to accommodate this one case
actual_port = [actual_port, self.accelerate_port]
elif actual_port is not None:
actual_port = int(template.template(self.basedir, actual_port, inject))
except ValueError, e:
result = dict(failed=True, msg="FAILED: Configured port \"%s\" is not a valid port, expected integer" % actual_port)
return ReturnData(host=host, comm_ok=False, result=result)
try:
if self.delegate_to or host != actual_host:
delegate_host = host
else:
delegate_host = None
conn = self.connector.connect(actual_host, actual_port, actual_user, actual_pass, actual_transport, actual_private_key_file, delegate_host)
default_shell = getattr(conn, 'default_shell', '')
shell_type = inject.get('ansible_shell_type')
if not shell_type:
if default_shell:
shell_type = default_shell
else:
shell_type = os.path.basename(C.DEFAULT_EXECUTABLE)
shell_plugin = utils.plugins.shell_loader.get(shell_type)
if shell_plugin is None:
shell_plugin = utils.plugins.shell_loader.get('sh')
conn.shell = shell_plugin
except errors.AnsibleConnectionFailed, e:
result = dict(failed=True, msg="FAILED: %s" % str(e))
return ReturnData(host=host, comm_ok=False, result=result)
tmp = ''
# action plugins may DECLARE via TRANSFERS_FILES = True that they need a remote tmp path working dir
if self._early_needs_tmp_path(module_name, handler):
tmp = self._make_tmp_path(conn)
# allow module args to work as a dictionary
# though it is usually a string
if isinstance(module_args, dict):
module_args = utils.serialize_args(module_args)
# render module_args and complex_args templates
try:
# When templating module_args, we need to be careful to ensure
# that no variables inadvertently (or maliciously) add params
# to the list of args. We do this by counting the number of k=v
# pairs before and after templating.
num_args_pre = self._count_module_args(module_args, allow_dupes=True)
module_args = template.template(self.basedir, module_args, inject, fail_on_undefined=self.error_on_undefined_vars)
num_args_post = self._count_module_args(module_args)
if num_args_pre != num_args_post:
raise errors.AnsibleError("A variable inserted a new parameter into the module args. " + \
"Be sure to quote variables if they contain equal signs (for example: \"{{var}}\").")
# And we also make sure nothing added in special flags for things
# like the command/shell module (ie. #USE_SHELL)
if '#USE_SHELL' in module_args:
raise errors.AnsibleError("A variable tried to add #USE_SHELL to the module arguments.")
complex_args = template.template(self.basedir, complex_args, inject, fail_on_undefined=self.error_on_undefined_vars)
except jinja2.exceptions.UndefinedError, e:
raise errors.AnsibleUndefinedVariable("One or more undefined variables: %s" % str(e))
# filter omitted arguments out from complex_args
if complex_args:
complex_args = dict(filter(lambda x: x[1] != self.omit_token, complex_args.iteritems()))
# Filter omitted arguments out from module_args.
# We do this with split_args instead of parse_kv to ensure
# that things are not unquoted/requoted incorrectly
args = split_args(module_args)
final_args = []
for arg in args:
if '=' in arg:
k,v = arg.split('=', 1)
if unquote(v) != self.omit_token:
final_args.append(arg)
else:
# not a k=v param, append it
final_args.append(arg)
module_args = ' '.join(final_args)
result = handler.run(conn, tmp, module_name, module_args, inject, complex_args)
# Code for do until feature
until = self.module_vars.get('until', None)
if until is not None and result.comm_ok:
inject[self.module_vars.get('register')] = result.result
cond = template.template(self.basedir, until, inject, expand_lists=False)
if not utils.check_conditional(cond, self.basedir, inject, fail_on_undefined=self.error_on_undefined_vars):
retries = template.template(self.basedir, self.module_vars.get('retries'), inject, expand_lists=False)
delay = self.module_vars.get('delay')
for x in range(1, int(retries) + 1):
# template the delay, cast to float and sleep
delay = template.template(self.basedir, delay, inject, expand_lists=False)
delay = float(delay)
time.sleep(delay)
tmp = ''
if self._early_needs_tmp_path(module_name, handler):
tmp = self._make_tmp_path(conn)
result = handler.run(conn, tmp, module_name, module_args, inject, complex_args)
result.result['attempts'] = x
vv("Result from run %i is: %s" % (x, result.result))
inject[self.module_vars.get('register')] = result.result
cond = template.template(self.basedir, until, inject, expand_lists=False)
if utils.check_conditional(cond, self.basedir, inject, fail_on_undefined=self.error_on_undefined_vars):
break
if result.result['attempts'] == retries and not utils.check_conditional(cond, self.basedir, inject, fail_on_undefined=self.error_on_undefined_vars):
result.result['failed'] = True
result.result['msg'] = "Task failed as maximum retries was encountered"
else:
result.result['attempts'] = 0
conn.close()
if not result.comm_ok:
# connection or parsing errors...
self.callbacks.on_unreachable(host, result.result)
else:
data = result.result
# https://github.com/ansible/ansible/issues/4958
if hasattr(sys.stdout, "isatty"):
if "stdout" in data and sys.stdout.isatty():
if not string_functions.isprintable(data['stdout']):
data['stdout'] = ''
if 'item' in inject:
result.result['item'] = inject['item']
result.result['invocation'] = dict(
module_args=module_args,
module_name=module_name
)
changed_when = self.module_vars.get('changed_when')
failed_when = self.module_vars.get('failed_when')
if (changed_when is not None or failed_when is not None) and self.background == 0:
register = self.module_vars.get('register')
if register is not None:
if 'stdout' in data:
data['stdout_lines'] = data['stdout'].splitlines()
inject[register] = data
# only run the final checks if the async_status has finished,
# or if we're not running an async_status check at all
if (module_name == 'async_status' and "finished" in data) or module_name != 'async_status':
if changed_when is not None and 'skipped' not in data:
data['changed'] = utils.check_conditional(changed_when, self.basedir, inject, fail_on_undefined=self.error_on_undefined_vars)
if failed_when is not None and 'skipped' not in data:
data['failed_when_result'] = data['failed'] = utils.check_conditional(failed_when, self.basedir, inject, fail_on_undefined=self.error_on_undefined_vars)
if is_chained:
# no callbacks
return result
if 'skipped' in data:
self.callbacks.on_skipped(host, inject.get('item',None))
if self.no_log:
data = utils.censor_unlogged_data(data)
if not result.is_successful():
ignore_errors = self.module_vars.get('ignore_errors', False)
self.callbacks.on_failed(host, data, ignore_errors)
else:
if self.diff:
self.callbacks.on_file_diff(conn.host, result.diff)
self.callbacks.on_ok(host, data)
return result
(7)run函数中'
---3.通过模块名找到插件---
p = utils.plugins.action_loader.get(self.module_name, self)
其中action_loader是一个PluginLoader类的实例
action_loader = PluginLoader(
'ActionModule',
'ansible.runner.action_plugins',
C.DEFAULT_ACTION_PLUGIN_PATH,
'action_plugins'
)
class PluginLoader(object):
'''
PluginLoader loads plugins from the configured plugin directories.
It searches for plugins by iterating through the combined list of
play basedirs, configured paths, and the python path.
The first match is used.
'''
def __init__(self, class_name, package, config, subdir, aliases={}):
self.class_name = class_name
self.package = package
self.config = config
self.subdir = subdir
self.aliases = aliases
if not class_name in MODULE_CACHE:
MODULE_CACHE[class_name] = {}
if not class_name in PATH_CACHE:
PATH_CACHE[class_name] = None
if not class_name in PLUGIN_PATH_CACHE:
PLUGIN_PATH_CACHE[class_name] = {}
self._module_cache = MODULE_CACHE[class_name]
self._paths = PATH_CACHE[class_name]
self._plugin_path_cache = PLUGIN_PATH_CACHE[class_name]
self._extra_dirs = []
self._searched_paths = set()
'''get方法'''
def get(self, name, *args, **kwargs):
'''此处的name就是传递进去的模块名字'''
''' instantiates a plugin of the given name using arguments '''
if name in self.aliases:
name = self.aliases[name]
'''通过find_plugin方法去找到模块路径'''
path = self.find_plugin(name)
if path is None:
return None
if path not in self._module_cache:
self._module_cache[path] = imp.load_source('.'.join([self.package, name]), path)
'''根据模块路径,加载模块方法'''
return getattr(self._module_cache[path], self.class_name)(*args, **kwargs)
'''通过模块名找到模块路径'''
def find_plugin(self, name, suffixes=None):
''' Find a plugin named name '''
if not suffixes:
if self.class_name:
suffixes = ['.py']
else:
suffixes = ['.py', '']
#frozenset就是集合,可以当做set看
potential_names = frozenset('%s%s' % (name, s) for s in suffixes)
for full_name in potential_names:
if full_name in self._plugin_path_cache:
return self._plugin_path_cache[full_name]
found = None
for path in [p for p in self._get_paths() if p not in self._searched_paths]:
if os.path.isdir(path):
full_paths = (os.path.join(path, f) for f in os.listdir(path))
for full_path in (f for f in full_paths if os.path.isfile(f)):
for suffix in suffixes:
if full_path.endswith(suffix):
full_name = os.path.basename(full_path)
break
else: # Yes, this is a for-else: http://bit.ly/1ElPkyg
continue
if full_name not in self._plugin_path_cache:
self._plugin_path_cache[full_name] = full_path
self._searched_paths.add(path)
for full_name in potential_names:
if full_name in self._plugin_path_cache:
return self._plugin_path_cache[full_name]
# if nothing is found, try finding alias/deprecated
if not name.startswith('_'):
for alias_name in ('_%s' % n for n in potential_names):
# We've already cached all the paths at this point
if alias_name in self._plugin_path_cache:
return self._plugin_path_cache[alias_name]
return None
网友评论