编程入门19:Python任务调度

作者: starglow_leo | 来源:发表于2018-11-05 12:09 被阅读51次

    上一篇:编程入门18:Python生产环境

    我们知道现今的操作系统都支持“多任务”(Multitasking),虽然计算机的中央处理器(CPU)在同一时刻只能运行一个程序(双核心的话就是两个),但是由于CPU的速度极快,每秒能执行几十亿条机器语言指令,因此系统可以划分出微秒级的时间片,通过合理的任务调度快速切换执行程序,从人类的角度看就是在同时运行了。

    对于操作系统来说,任务调度和资源分配的基本单元是“进程”(Process),不同程序在不同进程中运行。同一程序也可能会启动多个进程以高效地执行多任务,例如浏览器同时下载多个在线资源,IDE一边接受代码输入一边调用解释器检查语法等等——任务切换时需要记住各自进行到了哪一步,这种信息就称为“上下文”(Context)。在Windows系统中右击任务栏选择“任务管理器”即可显示当前进程列表,你会发现即使未打开任何应用,也有上百个进程正在后台运行。


    19_proc.png

    之前我们编写的代码都是单路运行的,例如以下程序依次执行了两个工作任务:

    """xwork_1.py 主程序依次执行多个任务
    """
    import time
    
    
    def work(tasknum):
        t1 = time.perf_counter()
        print(f"任务{tasknum}开始……")
        time.sleep(3)
        print(f"任务{tasknum}完成!耗时{time.perf_counter() - t1}秒。")
    
    
    def main():
        work(1)
        work(2)
    
    
    if __name__ == "__main__":
        t1 = time.perf_counter()
        main()
        print(f"主程序耗时{time.perf_counter() - t1}秒。")
    

    任务函数work()用time.sleep()模拟耗时3秒的操作,这样依次执行两个任务的总耗时就是6秒:

    PS D:\Test\pyStudy> python -u "d:\Test\pyStudy\basic\xwork_1.py"
    任务1开始……
    任务1完成!耗时3.0185625689999998秒。
    任务2开始……
    任务2完成!耗时3.0008498049999996秒。
    主程序耗时6.020425788秒。
    

    多个耗时操作如果彼此没有关联,就可以通过“并发”(Concurrent)来避免无谓的等待。下面就让我们尝试编写并发执行多个任务的程序——Python标准库提供了multiprocessing模块来实现多进程,你可以调用进程类构造器multiprocessing.Process()创建进程实例,再调用实例的start()方法启动之,这样任务就能在不同进程中并发执行了:

    """xwork_2.py 多个进程并发执行多个任务
    """
    import time
    import multiprocessing
    
    
    def work(tasknum):
        t1 = time.perf_counter()
        print(f"任务{tasknum}开始……")
        time.sleep(3)
        print(f"任务{tasknum}完成!耗时{time.perf_counter() - t1}秒。")
    
    
    def main():
        multiprocessing.Process(target=work, args=(1,)).start()
        multiprocessing.Process(target=work, args=(2,)).start()
    
    
    if __name__ == "__main__":
        t1 = time.perf_counter()
        main()
        print(f"主程序耗时{time.perf_counter() - t1}秒。")
    

    以上程序用两个进程并发执行两个任务,在3秒之后同时完成,而在默认进程中执行的主程序因为没有耗时操作所以率先结束了(如果你希望等其它进程都完成再退出主程序,可以在进程启动后再调用实例的join()方法):

    PS D:\Test\pyStudy> python -u "d:\Test\pyStudy\basic\xwork_2.py"
    主程序耗时0.06413474699999999秒。
    任务1开始……
    任务2开始……
    任务1完成!耗时2.993581393秒。
    任务2完成!耗时2.993595187秒。
    

    在单个进程内部也可以同时运行多个子任务,称为“线程”(Thread),线程相比进程更为轻量,建立和释放速度更快——通常耗时操作可分为两种:例如密码破解需要CPU进行大量运算,这称为CPU密集型应用;而网络爬虫主要处理数据的输入和输出(Input/Output),这称为IO密集型应用。前者宜采用多进程,后者则宜采用多线程。使用线程要引入标准库的threading模块,具体写法与使用进程类似:

    """xwork_3.py 多个线程并发执行多个任务
    """
    import time
    import threading
    import sys
    
    
    def work(tasknum):
        t1 = time.perf_counter()
        sys.stdout.write(f"任务{tasknum}开始……\n")
        time.sleep(3)
        sys.stdout.write(f"任务{tasknum}完成!耗时{time.perf_counter() - t1}秒。\n")
    
    
    def main():
        threading.Thread(target=work, args=(1,)).start()
        threading.Thread(target=work, args=(2,)).start()
    
    
    if __name__ == "__main__":
        t1 = time.perf_counter()
        main()
        print(f"主程序耗时{time.perf_counter() - t1}秒。")
    

    使用多线程要注意所谓“线程安全”问题——例如当多个线程都想用print()输出信息时,可能会因为没有抢到资源而出现异常。因此以上程序的工作函数输出信息用的是线程安全的sys.stdout.write(),运行结果如下:

    PS D:\Test\pyStudy> python -u "d:\Test\pyStudy\basic\xwork_3.py"
    任务1开始……
    任务2开始……
    主程序耗时0.0009290429999999975秒。
    任务2完成!耗时3.008762067秒。
    任务1完成!耗时3.0092474399999998秒。
    

    Python 3.4新增了一种更适合IO密集型应用的特性“异步IO”(Asynchronous I/O),在不开多进程或多线程的情况下也能实现多任务并发。简单来说,当程序发起一个普通IO操作时,它会“阻塞”(Block)当前任务的执行直到操作结束,而所谓异步IO就是不等待IO操作结束就继续执行,这需要创建一个内部事件循环来轮番处理所有任务的状态。异步IO调度任务的基本单元称为“协程”(Coroutine)——函数之类的程序构件可统称为子程序或“例程”(Routine),一般都是从起点进入从终点退出;而协程是一种特殊例程,在进入后可以多次中断转往其他操作再返回(其实就是之前介绍过的生成器),这样就能有任意多个任务在事件循环中被切换执行了。

    通过异步IO实现多任务并发需要引入标准库的asyncio模块并使用async/await语句:

    1. async def 定义协程函数用来返回协程对象/async with 指定异步上下文管理器用来生成可等待对象
    2. asyncio.create_task()/.gather() 将一个/多个协程打包为任务排入计划日程
    3. await 指定任务或其他可等待对象等待其完成并返回执行结果
    4. 在主程序中获取事件循环来运行作为顶层入口的协程函数

    具体写法如下所示:

    """xwork_4.py 多个协程并发执行多个任务
    """
    import time
    import asyncio
    
    
    async def work(tasknum):
        t1 = time.perf_counter()
        print(f"任务{tasknum}开始……")
        await asyncio.sleep(3)
        print(f"任务{tasknum}完成!耗时{time.perf_counter() - t1}秒。")
    
    
    async def main():
        tasks = asyncio.gather(work(1), work(2))
        await tasks
    
    
    if __name__ == "__main__":
        t1 = time.perf_counter()
        loop = asyncio.get_event_loop()
        loop.run_until_complete(main())
        # 在Spyder中用下面这句才行,因为IPyhon已启动事件循环
        # asyncio.run_coroutine_threadsafe(main(), loop)
        # 在Python 3.7中用下面这句即可,不必再去获取事件循环
        # asyncio.run(main())
        print(f"主程序耗时{time.perf_counter() - t1}秒。")
    

    请注意协程版work()要用asyncio.sleep()来模拟异步IO耗时操作,因为time.sleep()会阻塞事件循环。程序运行结果如下:

    PS D:\Test\pyStudy> python -u "d:\Test\pyStudy\basic\xwork_4.py"
    任务1开始……
    任务2开始……
    任务1完成!耗时3.0036561820000003秒。
    任务2完成!耗时3.0038537950000004秒。
    主程序耗时3.019819151秒。
    

    下面的示例是之前百度图片爬虫的协程版,程序比原来复杂一点但性能提升了许多倍——请注意urllib.request同样也是阻塞式的,所以要改用基于异步IO的第三方包aiohttp:

    """xwork_webcrawler.py 协程版百度图片搜索并批量下载
    """
    import asyncio
    import aiohttp
    from urllib.parse import quote
    import os
    import re
    import time
    url = "https://image.baidu.com/search/flip?tn=baiduimage&word="
    keyword = "CG原画"
    folder = "img"
    path = os.path.abspath(".")
    headers = {
        'User-Agent':
        'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:61.0) \
                Gecko/20100101 Firefox/61.0'
    }
    
    
    async def fetch(session, url):
        async with session.get(url, timeout=20) as res:
            return await res.text()
    
    
    async def store(session, url):
        t1 = time.perf_counter()
        async with session.get(url, timeout=20) as res:
            with open(os.path.join(path, folder, url.split("/")[-1]), "wb") as f:
                while True:
                    chunk = await res.content.read(512)
                    if not chunk:
                        break
                    f.write(chunk)
        print(f"保存图片{url}耗时{time.perf_counter() - t1}秒。")
    
    
    async def main():
        if not os.path.exists(folder):
            os.mkdir(folder)
        async with aiohttp.ClientSession(headers=headers) as session:
            t1 = time.perf_counter()
            html = await fetch(session, url + quote(keyword))
            links = re.findall(r'"objURL":"(.+?)"', html)
            print(f"提取文本耗时{time.perf_counter() - t1}秒。")
            for i in links:
                await asyncio.create_task(store(session, i))
    
    
    if __name__ == "__main__":
        t1 = time.perf_counter()
        try:
            loop = asyncio.get_event_loop()
            loop.run_until_complete(main())
        except Exception as e:
            print(repr(e))
        print(f"主程序耗时{time.perf_counter() - t1}秒。")
    

    你还可以尝试配合使用协程和进程,在发挥异步IO执行效率的同时充分利用CPU的多个核心。

    ——编程原来是这样……

    编程小提示:源码管理

    当你学到这里,可以算是完全入门了,但在开始实际编程之前,你还应当了解“源码管理”(Source Code Management,简称SCM)或者叫“版本管理”因为它适用于任何需要保留修改记录的项目——最流行的源码管理工具是Git,官网下载页 https://git-scm.com/downloads

    如果用Windows还可以下载图形界面的外壳TortoiseGit https://tortoisegit.org/

    要学习Git直接看官方文档就好,有中文版 https://git-scm.com/book/zh/v2

    许多网站提供免费Git服务,最大的一家是GitHub(https://github.com/),用国内的站点例如“码云”(https://gitee.com/)网速会快些——本教程的源码就放在这里,使用以下Git命令即可克隆到本机:

    git clone https://gitee.com/freesand/pyStudy.git
    

    如果你用VSCode,推荐安装这个插件GitLens:


    19_scm.png

    相关文章

      网友评论

        本文标题:编程入门19:Python任务调度

        本文链接:https://www.haomeiwen.com/subject/brvjxqtx.html