美文网首页Python学习
Python学习笔记-第12天:异步编程(2)和单元测试

Python学习笔记-第12天:异步编程(2)和单元测试

作者: 6d1bf2ffc4f3 | 来源:发表于2019-12-19 18:09 被阅读0次

    第十二天 异步编程(2)和单元测试

    今天计划学习Python的多线程编程异步编程,学习项目及练习源码地址:
    GitHub源码

    协程

    参见昨天的学习记录

    无阻塞

    异步程序依然会假死freezing

    freezing案例:

    import asyncio
    import time
    import threading
    
    #定义一个异步操作
    async def hello1(a,b):
        print(f"异步函数开始执行")
        await asyncio.sleep(3)
        print("异步函数执行结束")
        return a+b
    
    #在一个异步操作里面调用另一个异步操作
    async def main():
        c=await hello1(10,20)
        print(c)
        print("主函数执行")
    
    loop = asyncio.get_event_loop()
    tasks = [main()]
    loop.run_until_complete(asyncio.wait(tasks))
    
    loop.close()
    
    '''运行结果为:
    异步函数开始执行(在此处要等待3秒)
    异步函数执行结束
    30
    主函数执行
    '''
    

    例子中,hello1是一个耗时3s的异步任务,main也是一个异步方法,但是main需要调用hello1的返回值,所以必须登台hello1执行完成才能继续执行main,这说明异步也是会有阻塞的。

    而之前定义的异步函数不用等待是因为事件循环将所有的异步操作‘gather’起来,在多个操作间不同的游走切换,来回调用所有没有等待。

    也可以理解为,事件循环只有一个异步操作在处理,没有可以切换执行的目标,所以只能等待当前的操作完成。

    多线程+asyncio解决调用时freezing

    为了让一个协程函数在不同的线程中执行,我们可以使用以下两个函数:

    1. loop.call_soon_threadsafe(callback, *args),这是一个很底层的API接口,一般很少使用
    2. asyncio.run_coroutine_threadsafe(coroutine,loop) 第一个参数为需要异步执行的协程函数,第二个loop参数为在新线程中创建的事件循环loop,注意一定要是在新线程中创建哦,该函数的返回值是一个concurrent.futures.Future类的对象,用来获取协程的返回结果。 future = asyncio.run_coroutine_threadsafe(coro_func(), loop) 在新线程中运行协程result = future.result()等待获取Future的结果

    示例代码:

    import asyncio 
    
    import asyncio,time,threading
    
    #需要执行的耗时异步任务
    async def func(num):
        print(f'准备调用func,大约耗时{num}')
        await asyncio.sleep(num)
        print(f'耗时{num}之后,func函数运行结束')
    
    #定义一个专门创建事件循环loop的函数,在另一个线程中启动它
    def start_loop(loop):
        asyncio.set_event_loop(loop)
        loop.run_forever()
    
    #定义一个main函数
    def main():
        coroutine1 = func(3)
        coroutine2 = func(2)
        coroutine3 = func(1)
    
        new_loop = asyncio.new_event_loop()                        #在当前线程下创建时间循环,(未启用),在start_loop里面启动它
        t = threading.Thread(target=start_loop,args=(new_loop,))   #通过当前线程开启新的线程去启动事件循环
        t.start()
    
        asyncio.run_coroutine_threadsafe(coroutine1,new_loop)  #这几个是关键,代表在新线程中事件循环不断“游走”执行
        asyncio.run_coroutine_threadsafe(coroutine2,new_loop)
        asyncio.run_coroutine_threadsafe(coroutine3,new_loop)
    
        for i in "iloveu":
            print(str(i)+"    ")
    
    if __name__ == "__main__":
        main()
    
    '''运行结果为:
    i    准备调用func,大约耗时3
    l    准备调用func,大约耗时2
    o    准备调用func,大约耗时1
    v
    e
    u
    耗时1之后,func函数运行结束
    耗时2之后,func函数运行结束
    耗时3之后,func函数运行结束
    '''
    

    第一步:定义需要异步执行的一系列操作,及一系列协程函数;

    第二步:在主线程中定义一个新的线程,然后在新线程中产生一个新的事件循环;

    第三步:在主线程中,通过asyncio.run_coroutine_threadsafe(coroutine,loop)这个方法,将一系列异步方法注册到新线程的loop里面去,这样就是新线程负责事件循环的执行。

    使用asyncio实现一个timer 定时器

    所谓的timer指的是,指定一个时间间隔,让某一个操作隔一个时间间隔执行一次,如此周而复始。很多编程语言都提供了专门的timer实现机制、包括C++、C#等。但是 Python 并没有原生支持 timer,不过可以用 asyncio.sleep 模拟。大致的思想如下,将timer定义为一个异步协程,然后通过事件循环去调用这个异步协程,让事件循环不断在这个协程中反反复调用,只不过隔几秒调用一次即可。简单的实现如下(本例基于python3.7:

    import asyncio
    async def delay(time):
        await asyncio.sleep(time)
    
    async def timer(time,function):
        while True:
            future=asyncio.ensure_future(delay(time))
            await future
            future.add_done_callback(function)
    
    def func(future):
        print('done')
    
    if __name__=='__main__':
        asyncio.run(timer(2,func))
    

    aiohttp模块

    asyncio可以实现单线程并发IO操作。如果仅用在客户端,发挥的威力不大。如果把asyncio用在服务器端,例如Web服务器,由于HTTP连接就是IO操作,因此可以用单线程+coroutine实现多用户的高并发支持。

    asyncio实现了TCP、UDP、SSL等协议,aiohttp则是基于asyncio实现的HTTP框架。

    • 安装

      pip3 install aiohttp

    • 示例代码

    import asyncio
    
    from aiohttp import web
    
    async def index(request):
        await asyncio.sleep(0.5)
        return web.Response(body=b'<h1>Index</h1>')
    
    async def hello(request):
        await asyncio.sleep(0.5)
        text = '<h1>hello, {}!</h1>'.format(request.match_info['name'])
        return web.Response(body=text.encode('utf-8'))
    
    async def init(loop):
        app = web.Application(loop=loop)
        app.router.add_route('GET', '/', index)
        app.router.add_route('GET', '/hello/{name}', hello)
        srv = await loop.create_server(app.make_handler(), '127.0.0.1', 8000)
        print('Server started at http://127.0.0.1:8000...')
        return srv
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(init(loop))
    loop.run_forever()
    

    aiomysql

    Python3.7+ 下的一个异步操作mysql数据的模块,官方地址

    示例:

    #coding:utf-8
    
    import aiomysql
    import asyncio
    import logging
    import traceback
    '''
    mysql 异步版本
    '''
    
    logobj = logging.getLogger('mysql')
    
    class Pmysql:
        __connection = None
    
        def __init__(self):
            self.cursor = None
            self.connection = None
    
        @staticmethod
        async def getconnection():
            if Pmysql.__connection == None:
                conn = await aiomysql.connect(
                    host='127.0.0.1',
                    port=3306,
                    user='root',
                    password='123456',
                    db='mytest',
                    )
                if conn:
                    Pmysql.__connection = conn
                    return conn
                else:
                    raise("connect to mysql error ")
            else:
                return Pmysql.__connection
    
        async def query(self,query,args=None):
            self.cursor = await self.connection.cursor()
            await self.cursor.execute(query,args)
            r = await self.cursor.fetchall()
            await self.cursor.close()
            return r
    
    
    async def test():
        conn = await Pmysql.getconnection()
        mysqlobj.connection = conn
        await conn.ping()
        r = await mysqlobj.query("select * from person")
        for i in r:
            print(i)
        conn.close()
    
    if __name__ == '__main__':
        mysqlobj = Pmysql()
        loop = asyncio.get_event_loop()
        loop.run_until_complete(test())
    

    aioredis

    redis异步操作库,官方地址

    示例:

    import aioredis
    import asyncio
    
    class Redis:
        _redis = None
    
        async def get_redis_pool(self, *args, **kwargs):
            if not self._redis:
                self._redis = await aioredis.create_redis_pool(*args, **kwargs)
            return self._redis
    
        async def close(self):
            if self._redis:
                self._redis.close()
                await self._redis.wait_closed()
    
    
    async def get_value(key):
        redis = Redis()
        r = await redis.get_redis_pool(('127.0.0.1', 6379), db=7, encoding='utf-8')
        value = await r.get(key)
        print(f'{key!r}: {value!r}')
        await redis.close()         
    
    if __name__ == '__main__':
        asyncio.run(get_value('key'))  # need python3.7
    

    测试

    单元测试

    单元测试是用来对一个模块、一个函数或者一个类来进行正确性检验的测试工作。

    Python自带的unittest模块可以很方便的让我们编写单元测试。

    编写单元测试时,我们需要编写一个测试类,从unittest.TestCase继承。
    以test开头的方法就是测试方法,不以test开头的方法不被认为是测试方法,测试的时候不会被执行。

    对每一类测试都需要编写一个test_xxx()方法。由于unittest.TestCase提供了很多内置的条件判断,我们只需要调用这些方法就可以断言输出是否是我们所期望的。

    代码示例:

    '''
    定义一个要测试的类
    mydict.py
    '''
    class MyDict(dict):
    
        def __init__(self, **kw):
            super().__init__(**kw)
    
        def __getattr__(self, key):
            try:
                return self[key]
            except KeyError:
                raise AttributeError(r"'Dict' object has no attribute '%s'" % key)
    
        def __setattr__(self, key, value):
            self[key] = value
    

    编写单元测试:

    import unittest
    
    from mydict import MyDict
    
    class TestDict(unittest.TestCase):
    
        def test_init(self):
            d = MyDict(a=1, b='test')
            self.assertEqual(d.a, 1)
            self.assertEqual(d.b, 'test')
            self.assertTrue(isinstance(d, dict))
    
        def test_key(self):
            d = MyDict()
            d['key'] = 'value'
            self.assertEqual(d.key, 'value')
    
        def test_attr(self):
            d = MyDict()
            d.key = 'value'
            self.assertTrue('key' in d)
            self.assertEqual(d['key'], 'value')
    
        def test_keyerror(self):
            d = MyDict()
            with self.assertRaises(KeyError):
                value = d['empty']
    
        def test_attrerror(self):
            d = MyDict()
            with self.assertRaises(AttributeError):
                value = d.empty
    

    运行单元测试

    一旦编写好单元测试,我们就可以运行单元测试。最简单的运行方式是在mydict_test.py的最后加上两行代码:

    if __name__ == '__main__':
        unittest.main()
    

    另一种方法是在命令行通过参数-m unittest直接运行单元测试:

    python -m unittest mydict_test

    这是推荐的做法,因为这样可以一次批量运行很多单元测试,并且,有很多工具可以自动来运行这些单元测试。

    setUp() tearDown()在每次执行之前准备环境,或者在每次执行完之后需要进行一些清理。比如执行前需要连接数据库,执行完成之后需要还原数据、断开连接。

    如果想要在所有case执行之前准备一次环境,并在所有case执行结束之后再清理环境,我们可以用 setUpClass() 与 tearDownClass()

    跳过某个case需要用到skip装饰器一共有三个:unittest.skip(reason)、unittest.skipIf(condition, reason)、unittest.skipUnless(condition, reason),skip无条件跳过,skipIf当condition为True时跳过,skipUnless当condition为False时跳过。

    在VS Code中对Python进行单元测试

    Python扩展支持使用Python的内置unittest框架以及pytest和Nose进行单元测试。要使用pytest和Nose,必须将它们安装到当前的Python环境中(即,在pythonPath设置中标识的环境,请参阅环境)。

    使用Python:Discover Unit Tests根据当前所选测试框架的发现模式扫描项目以进行测试(请参阅测试发现。一旦发现,VS Code提供了多种运行测试的方法(请参阅运行测试)。

    单元测试输出显示在Python Test Log面板中,包括未安装测试框架时导致的错误。

    在settings.json中进行设置:

    {
        "python.pythonPath": "/usr/local/bin/python3",
        "python.testing.unittestEnabled": true,
        "python.testing.unittestArgs": [
            "-v",
            "-s",
            "./src/tests",
            "-p",
            "test_*.py"
        ],
        "python.testing.pytestEnabled": false,
        "python.testing.nosetestsEnabled": false,
    }
    

    Unittest配置设置

    设置 默认 描述
    unittestEnabled false 指定是否为单元测试启用UnitTest。
    unittestArgs ["-v", "-s", ".", "-p", "test.py"] 传递给unittest的参数,其中由空格分隔的每个元素是列表中的单独项。有关默认值的说明,请参见下文。
    CWD 空值 指定单元测试的可选工作目录。
    outputWindow "Python Test Log" 用于单元测试输出的窗口。
    promptToConfigure true 指定VS代码是否在发现潜在测试时提示配置测试框架。
    DEBUGPORT 3000 用于调试UnitTest测试的端口号。
    autoTestDiscoverOnSaveEnabled true 指定在保存单元测试文件时是启用还是禁用自动运行测试发现。

    UnitTest的默认参数如下:

    -v设置默认详细程度。删除此参数以获得更简单的输出。

    -s .指定用于发现测试的起始目录。如果您在“test”文件夹中进行了测试,则可以将其更改为-s test("-s", "test"在arguments数组中)。

    -p test.py是用于查找测试的发现模式。在这种情况下,它.py是包含单词“test” 的任何文件。如果以不同的方式命名测试文件,例如在每个文件名后附加“_test”,则使用类似于*_test.py数组的相应参数的模式。

    要在第一次失败时停止测试运​​行,请将fail fast选项添加"-f"到arguments数组中。

    文档测试

    如果你经常阅读Python的官方文档,可以看到很多文档都有示例代码。比如re模块就带了很多示例代码:

    >>> import re
    >>> m = re.search('(?<=abc)def', 'abcdef')
    >>> m.group(0)
    'def'
    

    可以把这些示例代码在Python的交互式环境下输入并执行,结果与文档中的示例代码显示的一致。

    这些代码与其他说明可以写在注释中,然后,由一些工具来自动生成文档。既然这些代码本身就可以粘贴出来直接运行,那么,可不可以自动执行写在注释中的这些代码呢?

    答案是肯定的。

    当我们编写注释时,如果写上这样的注释:

    def abs(n):
        '''
        Function to get absolute value of number.
        
        Example:
        
        >>> abs(1)
        1
        >>> abs(-1)
        1
        >>> abs(0)
        0
        '''
        return n if n >= 0 else (-n)
    

    无疑更明确地告诉函数的调用者该函数的期望输入和输出。

    并且,Python内置的“文档测试”(doctest)模块可以直接提取注释中的代码并执行测试。

    doctest严格按照Python交互式命令行的输入和输出来判断测试结果是否正确。只有测试异常的时候,可以用...表示中间一大段烦人的输出。

    小结

    今天主要学习了Pyton的异步编程,并简单了解了下相关的常用模块。并针对单元测试进行了详细的了解,单元测试很重要。明天打算开始学习下Pyton的函数式编程。

    相关文章

      网友评论

        本文标题:Python学习笔记-第12天:异步编程(2)和单元测试

        本文链接:https://www.haomeiwen.com/subject/ogkvnctx.html