aioflask

作者: 是东东 | 来源:发表于2022-12-10 19:58 被阅读0次
    from flask import Flask, jsonify, has_request_context, copy_current_request_context, request
    from functools import wraps
    from concurrent.futures import Future, ThreadPoolExecutor
    import asyncio
    import time
    
    app = Flask(__name__)
    
    
    def run_async(func):
        @wraps(func)
        def _wrapper(*args, **kwargs):
            call_result = Future()
    
            def _run():
                loop = asyncio.new_event_loop()
                try:
                    result = loop.run_until_complete(func(*args, **kwargs))
                except Exception as error:
                    call_result.set_exception(error)
                else:
                    call_result.set_result(result)
                finally:
                    loop.close()
    
            loop_executor = ThreadPoolExecutor(max_workers=1)
            if has_request_context():
                _run = copy_current_request_context(_run)
            loop_future = loop_executor.submit(_run)
            loop_future.result()
            return call_result.result()
    
        return _wrapper
    
    
    start_time = time.time()
    
    @app.route("/", methods=["POST"])
    @run_async
    async def report_result_success():
        print(f'{time.time() - start_time}>> 注册中: ')
        # account = request.json
        # print(request.json)
        await asyncio.sleep(1)
        print(f'{time.time() - start_time}>> 注册成功: ')
        return request.json
    
    
    if __name__ == '__main__':
        app.run(host='0.0.0.0', port=80, debug=False)
    
    

    相关文章

      网友评论

          本文标题:aioflask

          本文链接:https://www.haomeiwen.com/subject/bvsxqdtx.html