开放封闭原则
在一个已经正常运行的项目中,对项目功能的增加,需要遵循开放封闭原则.
简单来说,它规定已经实现的功能代码不允许被修改,但可以被扩展,即:
封闭:已实现的功能代码块不应该被修改
开放:对现有功能的扩展开放
不改变调用原则
对已经功能的增改,不能修改原模块(函数)的调用方式,因为一个模块(函数)可能已经在N个地方使用了,如果改变了调用方式,就需要修改N个地方
作用
装饰器的作用就是在不修改原函数的情况下添加新功能,相当于高阶函数(将函数作为参数传入)和嵌套函数的结合.
装饰器基本格式:
def 装饰器函数(传入的函数):
def 执行的嵌套函数(传入函数的参数):
装饰器语句
...
return 传入的函数(传入函数的参数)
...
装饰器语句
return 返回的嵌套函数
@装饰器函数
def 原函数
原函数模块...
无参数的装饰器
def w1(func):
def inner():
# 验证1
# 验证2
# 验证3
return func()
return inner
@w1
def f1():
print 'f1'
参数格式已知的装饰器
def w1(func):
def inner(arg1,arg2,arg3):
# 验证1
# 验证2
# 验证3
return func(arg1,arg2,arg3)
return inner
@w1
def f1(arg1,arg2,arg3):
print 'f1'
参数格式未知的装饰器
def w1(func):
def inner(*args,**kwargs):
# 验证1
# 验证2
# 验证3
return func(*args,**kwargs)
return inner
@w1
def f1(arg1,arg2,arg3):
print 'f1'
多个装饰器装饰一个函数
def w1(func):
def inner(*args,**kwargs):
# 验证1
# 验证2
# 验证3
return func(*args,**kwargs)
return inner
def w2(func):
def inner(*args,**kwargs):
# 验证1
# 验证2
# 验证3
return func(*args,**kwargs)
return inner
@w1
@w2
def f1(arg1,arg2,arg3):
print 'f1'
闭包函数
闭包的意义:返回的函数对象,不仅仅是一个函数对象,在该函数外还包裹了一层作用域,这使得,该函数无论在何处调用,优先使用自己外层包裹的作用域
应用领域:延迟计算(原来我们是传参,现在我们是包起来)
from urllib.request import urlopen
def index(url):
def get():
return urlopen(url).read()
return get
baidu=index('http://www.baidu.com')
print(baidu().decode('utf-8'))
python读取文件小案例
- 文件内容
[mysqld]
port=3306
socket =/tmp/mysql.sock
basedir=/tmp/mysql/basedir
datadir=/tmp/mysql/datadir
- 读取内容代码
import configparser
class BaseConfig(object):
def __init__(self, path):
config = configparser.ConfigParser()
config.read(path)
self.port = config.get("mysqld", "port")
self.socket = config.get("mysqld", "socket")
self.basedir = config.get("mysqld", "basedir")
self.datadir = config.get("mysqld", "datadir")
- 主程序中读取数据
#!/usr/bin/env python3
import subprocess
import os
from config import BaseConfig
import sys
def run_cmd(cmd):
process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, error = process.communicate()
out = out.decode("utf-8")
error = error.decode("utf-8")
r = process.returncode
return r, out, error
def get_values(path):
baseconfig_ini = BaseConfig(path)
port = baseconfig_ini.port
socket = baseconfig_ini.socket
basedir = baseconfig_ini.basedir
datadir = baseconfig_ini.datadir
if not os.path.exists(basedir) and not os.path.exists(datadir):
os.makedirs(basedir)
os.makedirs(datadir)
else:
if len(basedir) > 1 and len(datadir) > 1:
rm_dir = "rm -rf {} {}".format(basedir, datadir)
r, out, error = run_cmd(rm_dir)
if r != 0:
print("rm -rf {} {} error".format(basedir, datadir))
exit(-1)
os.makedirs(basedir)
os.makedirs(datadir)
return basedir, datadir
if __name__ == '__main__':
baseconfig = os.path.dirname(os.path.realpath(sys.argv[0]))
path = os.path.join(baseconfig, "mysql.ini")
basedir, datadir = get_values(path)
print(basedir)
print(datadir)
综合案例
[root@image-zhongzhuanjiqi pull]# cat config.py
import configparser
class BaseConfig(object):
def __init__(self, path):
config = configparser.ConfigParser()
config.read(path)
self.image_svc = config.get("harbor", "image_svc")
self.dest_dir = config.get("harbor", "dest_dir")
[root@image-zhongzhuanjiqi pull]# cat harbor.ini
[harbor]
image_svc=/root/qiaoning/.shell/pull/image.txt
dest_dir=/tmp/qiaoning
[root@image-zhongzhuanjiqi pull]# cat pull_image.py
from logger import logger
import os
import sys
import subprocess
import argparse
from config import BaseConfig
from concurrent.futures import ProcessPoolExecutor
def run_cmd(cmd):
process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, error = process.communicate()
out = out.decode("utf-8")
error = error.decode("utf-8")
r = process.returncode
return r, out, error
def get_image_list(image_file):
if not os.path.exists(image_file):
logger.info("{} is not such file".format(image_file))
sys.exit()
image_list = []
with open(image_file, mode='rt') as images:
for image in images:
if image[0].startswith("#") and not image:
continue
image = image.strip()
image_list.append(image)
image_list = set(image_list)
return image_list
def pull_image(image):
pull_image_cmd = "docker pull {}".format(image)
r, out, error = run_cmd(pull_image_cmd)
if r == 0:
logger.info("{} is pull ok".format(image))
else:
logger.error("{} is pull error error: {}".format(image, error))
sys.exit()
def save_image(image, dest_dir, new_image):
save_image_cmd = "docker save {} | gzip > {}/{}.tar".format(image, dest_dir, new_image)
r, out, error = run_cmd(save_image_cmd)
if r == 0:
logger.info("{} is save image ok".format(image))
else:
logger.error("{} is save error error: {}".format(image, error))
sys.exit()
def mutl_pull_image(config_path):
baseconfig_ini = BaseConfig(config_path)
image_file = baseconfig_ini.image_file
images = get_image_list(image_file)
ps_pool = ProcessPoolExecutor(mutl_number)
pull_list = []
for image in images:
image = image.strip()
result = ps_pool.submit(pull_image, image)
pull_list.append(result)
ps_pool.shutdown()
def mutl_save_image(config_path):
baseconfig_ini = BaseConfig(config_path)
dest_dir = baseconfig_ini.dest_dir
image_file = baseconfig_ini.image_file
images = get_image_list(image_file)
if os.path.exists(dest_dir):
os.removedirs(dest_dir)
os.makedirs(dest_dir)
ps_pool = ProcessPoolExecutor(max_workers=mutl_number)
save_image_list = []
for image in images:
image = image.strip()
new_image = image.split("/")[-1].replace(":", "-")
result = ps_pool.submit(save_image, image, dest_dir, new_image)
save_image_list.append(result)
ps_pool.shutdown()
if __name__ == '__main__':
argars = argparse.ArgumentParser(description="this is python scripts")
argars.add_argument("--config_path", type=str, help="--config_path", required=True)
argars.add_argument("--mutl_number", type=int, help="--mutl_number", required=False, default=10)
args = argars.parse_args()
config_path = args.config_path
mutl_number = args.mutl_number
mutl_pull_image(config_path)
mutl_save_image(config_path)
网友评论