美文网首页
aiohttp异步编程(一)

aiohttp异步编程(一)

作者: 阿尼奥赛哟 | 来源:发表于2020-06-22 17:34 被阅读0次

    1. 进程、线程、协程的概念

    进程:CPU是系统进行资源分配和调度的基本单位,进程是程序执行的实体,是线程执行的容器。

    线程:轻量级进程,是程序执行流的最小单元。在单个程序中同时运行多个线程完成不同的工作,称为多线程。在多线程OS中,通常是在一个进程中包括多个线程,每个线程都是作为利用CPU的基本单位,是花费最小开销的实体。在同一进程中的各个线程,都可以共享该进程所拥有的资源。

    协程:类似于线程,线程的调度由系统决定,协程的切换由自己决定。

    协程和线程区别:协程避免了无意义的调度,由此可以提高性能,但也因此,程序员必须自己承担调度的责任,同时,协程也失去了标准线程使用多CPU的能力。


    2.python 异步编程框架 asyncio

     查看python 官方文档:

    https://docs.python.org/zh-cn/3.8/library/asyncio-task.html#asyncio.run

    3.7之前的asyncio模块使用理解:

    将多个协程进一步封装到一个task对象中,task就是一个储存任务的盒子。此时,装在盒子里的任务并没有真正的运行,需要把它接入到一个监视器中使它运行,同时监视器还要持续不断的盯着盒子里的任务运行到了哪一步,这个持续不断的监视器就用一个循环对象loop来实现。

    3.7之后的协程,直接使用run调用协程:

    以下代码段,会打印 "hello",等待 1 秒,再打印 "world":

    使用create_task()将协程打包成任务


    3. aiohttp基于asyncio的http异步框架

    异步io的好处在于避免的线程的开销和切换,而且我们都知道python其实是没有多线程的,只是通过底层线层锁实现的多线程。另一个好处在于避免io操作(包含网络传输)的堵塞时间。

    asyncio可以实现单线程并发IO操作。如果仅用在客户端,发挥的威力不大。如果把asyncio用在服务器端,例如Web服务器,由于HTTP连接就是IO操作,因此可以用单线程+coroutine实现多用户的高并发支持。

    asyncio实现了TCP、UDP、SSL等协议,aiohttp则是基于asyncio实现的HTTP框架。

    (1)aiohttp作为服务器端的一个简单的demo。

    import argparse

    from aiohttp import web

    import asyncio

    import base64

    import logging

    import uvloop

    import time,datetime

    import json

    import requests

    asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

    routes = web.RouteTableDef()

    @routes.get('/')

    async def hello(request):

      return web.Response(text="Hello, world")

    # 定义一个路由映射,接收网址参数,post方式

    @routes.post('/demo1/{name}')

    async def demo1(request):  # 异步监听,只要一有握手就开始触发,此时网址参数中的name就已经知道了,但是前端可能还没有完全post完数据。

      name = request.match_info.get('name', "Anonymous") # 获取name

      print(datetime.datetime.now())  # 触发视图函数的时间

      data = await request.post()  # 等待post数据完成接收,只有接收完成才能进行后续操作.data['key']获取参数

      print(datetime.datetime.now())  # 接收post数据完成的时间

      logging.info('safety dect request start %s' % datetime.datetime.now())

      result = {'name':name,'key':data['key']}

      logging.info('safety dect request finish %s, %s' % (datetime.datetime.now(),json.dumps(result)))

      return web.json_response(result)

    # 定义一个路由映射,设计到io操作

    @routes.post('/demo2')

    async def demo2(request):  # 异步监听,只要一有握手就开始触发,此时网址参数中的name就已经知道了,但是前端可能还没有完全post完数据。

      data = await request.post()  # 等待post数据完成接收,只有接收完成才能进行后续操作.data['key']获取参数

      logging.info('safety dect request start %s' % datetime.datetime.now())

      res = requests.post('http://www.baidu.com')  # 网路id,会自动切换到其他协成上

      logging.info('safety dect request finish %s' % res.test)

      return web.Response(text="welcome")

    if __name__ == '__main__':

      logging.info('server start')

      app = web.Application()

      app.add_routes(routes)

      web.run_app(app,host='0.0.0.0',port=8080)

      logging.info('server close')

    (2)aiohttp客户端。

    aiohttp的另一个主要作用是作为异步客户端,用来解决高并发请求的情况。比如现在我要模拟一个高并发请求来测试我的服务器负载情况。所以需要在python里模拟高并发。高并发可以有多种方式,比如多线程,但是由于python本质上是没有多线程的,通过底层线程锁实现的多线程。在模型高并发时,具有线程切换和线程开销的损耗。所以我们就可以使用多协成来实现高并发。我们就可以使用aiohttp来模拟高并发客户端。demo如下,用来模拟多个客户端向指定服务器post图片。

    # 异步并发客户端

    class Asyncio_Client(object):

      def __init__(self):

        self.loop=asyncio.get_event_loop()

        self.tasks=[]

      # 将异步函数介入任务列表。后续参数直接传给异步函数

      def set_task(self,task_fun,num,*args):

        for i in range(num):

          self.tasks.append(task_fun(*args))

      # 运行,获取返回结果

      def run(self):

        back=[]

        try:

          f = asyncio.wait(self.tasks)  # 创建future

          self.loop.run_until_complete(f) # 等待future完成

        finally:

          pass

    # 服务器高并发压力测试

    class Test_Load():

      total_time=0 # 总耗时

      total_payload=0 # 总负载

      total_num=0 # 总并发数

      all_time=[]

      # 创建一个异步任务,本地测试,所以post和接收几乎不损耗时间,可以等待完成,主要耗时为算法模块

      async def task_func1(self,session):

        begin = time.time()

        # print('开始发送:', begin)

        file=open(self.image, 'rb')

        fsize = os.path.getsize(self.image)

        self.total_payload+=fsize/(1024*1024)

        data = {"image_id": "2", 'image':file}

        r = await session.post(self.url,data=data) #只post,不接收

        result = await r.json()

        self.total_num+=1

        # print(result)

        end = time.time()

        # print('接收完成:', end,',index=',self.total_num)

        self.all_time.append(end-begin)

      # 负载测试

      def test_safety(self):

        print('test begin')

        async_client = Asyncio_Client() # 创建客户端

        session = aiohttp.ClientSession()

        for i in range(10): # 执行10次

          self.all_time=[]

          self.total_num=0

          self.total_payload=0

          self.image = 'xxxx.jpg' # 设置测试nayizhang

          print('测试图片:', self.image)

          begin = time.time()

          async_client.set_task(self.task_func1,self.num,session) # 设置并发任务

          async_client.run()  # 执行任务

          end=time.time()

          self.all_time.sort(reverse=True)

          print(self.all_time)

          print('并发数量(个):',self.total_num)

          print('总耗时(s):',end-begin)

          print('最大时延(s):',self.all_time[0])

          print('最小时延(s):', self.all_time[len(self.all_time)-1])

          print('top-90%时延(s):', self.all_time[int(len(self.all_time)*0.1)])

          print('平均耗时(s/个):',sum(self.all_time)/self.total_num)

          print('支持并发率(个/s):',self.total_num/(end-begin))

          print('总负载(MB):',self.total_payload)

          print('吞吐率(MB/S):',self.total_payload/(end-begin))  # 吞吐率受上行下行带宽,服务器带宽,服务器算法性能诸多影响

          time.sleep(3)

        session.close()

        print('test finish')

    相关文章

      网友评论

          本文标题:aiohttp异步编程(一)

          本文链接:https://www.haomeiwen.com/subject/nngxfktx.html