美文网首页
python调用系统命令的简单小案例

python调用系统命令的简单小案例

作者: Joening | 来源:发表于2023-11-04 21:36 被阅读0次

    subprocess小案例

    调用命令

    import subprocess
    
    
    def run_cmd(cmd):
        process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        out, error = process.communicate()
        out = out.decode('utf-8')
        error = error.decode('utf-8')
        r = process.returncode
        return r, str(out), str(error)
    
    
    def command():
        cmd = "docker images "
        r, out, error = run_cmd(cmd)
        if r == 0:
            print(out)
        else:
            print(error)
    
    
    if __name__ == '__main__':
        command()
    
    

    push chart 小案例

    import os
    import sys
    import subprocess
    
    
    def run_cmd(cmd):
        process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        out, error = process.communicate()
        r = process.returncode
        return r, out, error
    
    
    def for_chart_name(chart_file_name):
        if not os.path.exists(chart_file_name):
            exit(1)
        with open(chart_file_name, mode='rt') as chart_name:
            chart_lst = list()
            for line in chart_name:
                if line.startswith("#") or not line:
                    continue
                line = line.strip()
                chart_lst.append(line)
            return chart_lst
    
    
    def command():
        chart_names = for_chart_name(chart_file_name)
        for chart_name in chart_names:
            error_list = list()
            chartName = chart_name.split(',')[0]
            chartVersion = chart_name.split(',')[-1]
            helmpull_cmd = "helm pull {}/{} --version {}".format("itg", chartName, chartVersion)
            r, out, error = run_cmd(helmpull_cmd)
            if r == 0:
                print("{} pull ok".format(chartName))
            else:
                print("{} pull error 失败原因是: {}".format(chartName, error.decode('utf-8')))
                error_list.append(error.decode('utf-8'))
                exit(-1)
            return error_list
    
    
    if __name__ == '__main__':
        basedir = os.path.dirname(os.path.realpath(sys.argv[0]))
        chart_file = "chart.txt"
        chart_file_name = os.path.join(basedir, chart_file)
        ret = command()
        print(ret)
    
    
    import subprocess
    import os
    
    images = '''
    flannel/flannel:v0.21.5
    flannel/flannel-cni-plugin:v1.1.2
    registry.aliyuncs.com/google_containers/kube-apiserver:v1.23.5
    registry.aliyuncs.com/google_containers/kube-proxy:v1.23.5
    registry.aliyuncs.com/google_containers/kube-controller-manager:v1.23.5
    registry.aliyuncs.com/google_containers/kube-scheduler:v1.23.5
    registry.aliyuncs.com/google_containers/etcd:3.5.1-0
    registry.aliyuncs.com/google_containers/coredns:v1.8.6
    registry.aliyuncs.com/google_containers/pause:3.6
    '''
    dest_dir = "/tmp/qiaoning"
    
    
    def run_cmd(cmd):
        process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        out, error = process.communicate()
        r = process.returncode
        return r, out, error
    
    
    def image(images, dest_dir):
        image_list = []
        for imagename in images.split():
            imagename = imagename.strip()
            image_list.append(imagename)
        if not os.path.exists(dest_dir):
            os.makedirs(dest_dir)
        if len(os.listdir(dest_dir)) > 1:
            print("请清理磁盘{}目录下的内容".format(dest_dir))
            exit(1)
        return image_list, dest_dir
    
    
    def command():
        image_name, destdir = image(images, dest_dir)
        for name in image_name:
            docker_pull_image_cmd = 'docker pull {}'.format(name)
            r, out, error = run_cmd(docker_pull_image_cmd)
            if r != 0:
                print(name, '错误原因是:', error.encode('utf-8'))
            newname = name.split('/')[-1].replace(':', '-')
            docker_save_image_cmd = 'docker save {} | gzip > {}/{}.tar.gz'.format(name, destdir, newname)
            r, out, error = run_cmd(docker_save_image_cmd)
            if r != 0:
                print(name, '错误原因是:', error.encode('utf-8'))
    
    
    if __name__ == '__main__':
        command()
    

    小案例

    import subprocess
    import os
    import argparse
    
    
    def commad(cmd):
        porcess = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        out, error = porcess.communicate()
        r = porcess.returncode
        return r, out, error
    
    
    def check_directory(dest_directory):
        if not os.path.exists(dest_directory):
            os.mkdir(dest_directory)
        if len(os.listdir(dest_directory)) > 1:
            print(dest_directory, "目录内容请清理")
            exit(2)
        return dest_directory
    
    
    def get_image_file_list(image_file_svc):
        if not os.path.exists(image_file_svc):
            print(image_file_svc, 'no such file or directory')
            exit(1)
        lst = list()
        with open(image_file_svc, mode='rt', encoding='utf-8') as file_name:
            for item in file_name:
                if item.startswith('#') or not item:
                    continue
                item = item.strip()
                lst.append(item)
        return lst
    
    
    def pull_image(dockerurl, username, password):
        print("[*** 检查docker二进制命令是否存在 ***]")
        images_list = get_image_file_list(image_file_svc)
        check_docker = "which %s" % 'docker'
        r, out, error = commad(check_docker)
        if r != 0:
            print('docker is not command')
            exit(1)
        print("[*** 登录docker私有仓库地址 地址为 %s ***]" % dockerurl)
        docker_login = f'docker login --username "{username}" --password "{password}" {dockerurl}'
        print(docker_login)
        # r, out, error = commad(docker_login)
        # if r != 0:
        #     print('docker login error 错误原因是:', error.decode('utf-8'))
        #     exit(1)
        print("[*** 开始pull镜像 ***]")
        for images in images_list:
            docke_pull_cmd = "docker pull {}".format(images)
            r, out, error = commad(docke_pull_cmd)
            if r == 0:
                print(images, 'is pull ok')
            else:
                print('错误原因是:', error.decode('utf8'))
    
    
    def save_image():
        dest_dir = check_directory(dest_directory)
        image_list = get_image_file_list(image_file_svc)
        print('[*** 开始打包镜像 打包到 {} ***]'.format(dest_dir))
        for image in image_list:
            new_image = image.split('/')[-1].replace(':', '-')
            docker_tag = f'docker save {image} | gzip > {dest_dir}{new_image}.tar.gz'
            r, out, error = commad(docker_tag)
            if r == 0:
                print(image, "save ok")
            else:
                print(image, 'save 失败原因是:', error.decode('utf-8'))
    
    
    if __name__ == '__main__':
        argars = argparse.ArgumentParser(description="this is python file")
        argars.add_argument('--image_file_svc', required=True, help='--image_file_svc')
        argars.add_argument('--username', required=True, help='--username')
        argars.add_argument('--password', required=True, help='--password')
        argars.add_argument('--dockerurl', required=True, help='--dockerurl')
        argars.add_argument('--dest_directory', required=True, help='--dest_directory')
        argars = argars.parse_args()
        username = argars.username
        password = argars.password
        dockerurl = argars.dockerurl
        dest_directory = argars.dest_directory
        image_file_svc = argars.image_file_svc
        print('========================================= 开始执行 ==============================')
        check_directory(dest_directory)
        pull_image(dockerurl, username, password)
        save_image()
        print('========================================= 执行结束 ===============================')
    

    改tag 推送镜像小案例

    #!/usr/bin/python3
    # -*- coding: utf-8 -*-
    import os
    import subprocess
    import sys
    
    
    def run_cmd(cmd):
        process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        out, error = process.communicate()
        r = process.returncode
        return r, str(out), str(error)
    
    
    def for_image_list(image_svc):
        with open(image_svc, mode='rt') as images:
            for image in images:
                image = image.strip()
                new_image = image.replace(old_suffix.strip(), new_suffix.strip())
                new_image = new_image.strip()
                print("docker tag {} {}".format(image, new_image))
                docker_tag_cmd = "docker tag {} {}".format(image, new_image)
                r, out, error = run_cmd(docker_tag_cmd)
                if r == 0:
                    print("{}执行成功".format(image))
                else:
                    print("{}执行失败 错误原因是: {}".format(image, error))
                    exit(1)
    
                print("=========================== 推送镜像 {}========================".format(new_image))
                print("docker push {}".format(new_image))
                docker_push_cmd = "docker push {}".format(new_image)
                r, out, error = run_cmd(docker_push_cmd)
                if r == 0:
                    print("{} 推送成功".format(new_image))
                else:
                    print("{} 推送失败".format(new_image))
                    exit(1)
    
    
    if __name__ == '__main__':
        basedir = os.path.dirname(os.path.realpath(sys.argv[0]))
        imageslist = "imagelist.txt"
        image_csv = os.path.join(basedir, imageslist)
        old_suffix = "hub-vpc.jdcloud.com"
        new_suffix = "xxxxx"
        for_image_list(image_csv)
    
    
    
    #!/usr/bin/python
    # -*- coding: utf-8 -*-
    import os
    import sys
    import subprocess
    import argparse
    
    
    def run_cmd(cmd):
        process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        out, error = process.communicate()
        r = process.returncode
        return r, str(out), str(error)
    
    
    def for_image_list(image_csv):
        if not os.path.exists(image_csv):
            print("{}文件不存在".format(image_csv))
            exit(1)
        image_list = list()
        with open(image_csv, mode='rt') as images:
            for image in images:
                if image.startswith('#') or not image:
                    continue
                image = image.strip()
                image_list.append(image)
        return image_list
    
    
    def image_tag_push_cmd(old_suffix, new_suffix):
        images = for_image_list(image_svc)
        for image in images:
            image = image.strip()
            new_image = image.replace(old_suffix, new_suffix)
            new_image = new_image.strip()
            image_tag_cmd = "docker tag {} {}".format(image, new_image)
            r, out, error = run_cmd(image_tag_cmd)
            if r != 0:
                print("{} tag失败 失败原因是: {}".format(image, error))
                exit(1)
            image_push_cmd = "docker push {}".format(new_image)
            r, out, error = run_cmd(image_push_cmd)
            if r != 0:
                print("{} push失败 失败原因是: {}".format(image, error))
    
    
    if __name__ == '__main__':
        basedir = os.path.dirname(os.path.realpath(sys.argv[0]))
        old_suffix = "hub-vpc.jdcloud.com"
        new_suffix = "xxxxxxxxx"
        argars = argparse.ArgumentParser(description="this is python file")
        argars.add_argument('--image_svc', required=True, help="请输入你要推送的镜像列表")
        argars = argars.parse_args()
        image_svc = argars.image_svc
        image_tag_push_cmd(old_suffix, new_suffix)
    
    

    python脚本的编写思路

    面向过程编程

    1. 导入各种外部库
    
    2. 设计各种全局变量
    
    3. 写一个函数完成某个功能
    4. 写一个函数完成某个功能
    5. 写一个函数完成某个功能
    6. 写一个函数完成某个功能
    7. 写一个函数完成某个功能
    8. ......
    
    9. 写一个main函数作为程序入口
    

    在面向过程编程中,许多重要的数据被放置在全局变量区,这样它们就可以被所有函数访问。每个函数都可以具有自己的局部变量数据,封装某些功能代码,无需重复编写,日后仅需调用函数即可。从代码的组织形式来看,面向函数编程的一般套路就是根据业务逻辑从上到下垒代码。

    面向对象编程

    1. 导入各种外部库
    
    2. 设计各种全局变量
    
    3. 决定你要的类
    4. 给每个类提供完整的一组操作
    5. 明确地使用继承来表现不同类之间的共同点
    6. ......
    
    7. 根据需要,决定是否写一个main函数作为程序入口
    

    在面向对象编程中,将函数和变量进一步封装成类。类是面向对象程序的基本元素,它将数据(类属性)和操作(类方法)紧密地连接在一起,并保护数据不会被外界的函数意外地改变。类和和类的实例(也称对象)是面向对象的核心概念,是与面向过程编程的根本区别。面向对象编程并非必须,而要看你的程序怎么设计方便,但是就目前来说,基本上都是使用面向对象编程。
    面向过程的编程思路通常包括以下几个关键概念:
    分解问题:首先需要将整个问题分解为一系列可以顺序执行的子任务或操作,每个子任务对应一个具体的处理过程。
    定义函数:针对每个子任务或操作,可以定义相应的函数来实现具体的逻辑。函数可以接受参数并返回结果,有助于模块化代码和提高重用性。
    流程控制:使用条件语句(如if-else语句)和循环结构(如for循环、while循环)来控制程序的执行流程,根据不同的条件执行不同的操作。
    数据处理:在面向过程的编程中,通常会对数据进行操作和处理,这可能涉及到数据的读取、修改、存储等操作。
    顺序执行:最终,整个程序按照一定的顺序执行各个子任务或操作,以完成整体的问题解决过程。

    面向过程的编程思路注重过程和操作的顺序和逻辑,通过将整个问题分解为可执行的步骤,并按照特定的顺序执行这些步骤来实现程序的功能。与面向对象编程相比,面向过程更加直观和直接,适用于一些简单的脚本和小型程序的编写。

    面向过程的简单小案例

    import os
    
    # 定义函数:获取指定目录下的所有文件
    def get_files_in_directory(directory):
        files = []
        for filename in os.listdir(directory):
            if os.path.isfile(os.path.join(directory, filename)):
                files.append(filename)
        return files
    
    # 定义函数:处理文件并输出结果
    def process_files(files):
        results = []
        for file in files:
            # 处理文件的具体操作(这里只是简单示意)
            result = f"Processed {file}"
            results.append(result)
        return results
    
    # 主程序
    def main():
        directory = "/path/to/directory"
        files = get_files_in_directory(directory)
        results = process_files(files)
        
        # 输出结果
        for result in results:
            print(result)
    
    # 调用主程序
    main()
    

    相关文章

      网友评论

          本文标题:python调用系统命令的简单小案例

          本文链接:https://www.haomeiwen.com/subject/jjtzidtx.html