美文网首页
实现简单的 协程异步并发池

实现简单的 协程异步并发池

作者: JZ莫问 | 来源:发表于2020-01-16 17:37 被阅读0次
    # -*- coding:utf-8 -*-
    '''asyncio 学习'''
    import aiohttp
    import asyncio
    from threading import Thread
    import time,os,random
    class myasync(object):
        def __init__(self):
            self.pool = 1000
            self.new_loop = asyncio.new_event_loop()
            self.thread = Thread(target=self.start_loop,args=(self.new_loop,)) # 创建新的线程
            # 它独立于控制终端并且周期性地执行某种任务或等待处理某些发生的事件。也就是说守护线程不依赖于终端,但是依赖于系统,与系统“同生共死”。
            # setDaemon(True): 当主线程退出时,后台线程随机退出;
            # setDaemon(False)(默认情况): 当主线程退出时,若前台线程还未结束,则等待所有线程结束,相当于在程序末尾加入join().
            self.thread.setDaemon(True) # 守护进程
            self.thread.start() # 开始进程
            self.tasks = []
    
    
        def start_loop(self,loop):
            while True:
                try:
                    asyncio.set_event_loop(loop)
                    loop.run_forever()
                except Exception as EX:
                    print(repr(EX),' <==')
                else:
                    break
    
        # 进入异步中
        def add(self,fun):
            while True:
                if self.jc():
                    task = asyncio.run_coroutine_threadsafe(fun, self.new_loop)
                    self.tasks.append(task)
                    return "OK"
                else:
                    time.sleep(2)
    
        # 检测tasks是否还有
        def jc(self,):
            while True:
                if len(self.tasks) < self.pool:
                    return True
                for n,task in enumerate(self.tasks):
                    if task.done():
                        # print(task) # <Future at 0x27348f2f9b0 state=finished returned int> 运行完成的
                        del self.tasks[n]
                        return True
                else:
                    return False
    
    
        # 检测是否运行完成
        def join(self,):
            while True:
                for task in self.tasks:
                    # print(dir(task))
                    # print(task.result()) # 运行结果?_? # 添加时进行运行操作 return 结果 进行阻塞?_?
                    # xx = task.result()
                    # print(task.exception()) # 返回Task的异常?_? # 添加时进行运行操作
                    # print(task.done()) # 运行完成 # 检测task是否运行完成
                    # print(task.add_done_callback(self.hd)) # 添加完成的回调????
                    # print(task.cancel) # 取消 <bound method Future.cancel of <Future at 0x268d6776a90 state=pending>>
                    # print(task.cancelled) # 取消 <bound method Future.cancelled of <Future at 0x204f92b5cc0 state=pending>>
                    # print(task.running()) # False 是否运行?
                    # 没有运行完成的继续运行运行完成才break
                    if not task.done():
                        break
                else:
                    print(task.exception(),'Error')
                    return "OK"
                time.sleep(1)
    
        def __del__(self):
            self.new_loop.close()
    
    async def gg(xnum):
        xx = random.randint(1,10)
        print('time:',xx,'key:',xnum)
        await asyncio.sleep(xx)
        print('OK ====>> :',xx,xnum)
        path = os.getcwd() + '\\test\\'
        with open(path+str(xnum)+'.txt','ab+') as f:
            f.write(str(xnum).encode('utf-8'))
            f.write('\n'.encode('utf-8'))
    
    # Pending -> Pending:Runing -> Finished
    
    if __name__ == '__main__':
        t2 = time.time()
        xx = myasync()
        for x in range(10000):
            xx.add(gg(x))
        xx.join()
        print(time.time() - t2)
    

    特别感谢白大佬

    相关文章

      网友评论

          本文标题:实现简单的 协程异步并发池

          本文链接:https://www.haomeiwen.com/subject/kcxqzctx.html