Python怎么测试异步接口

作者: 韩志超 | 来源:发表于2019-02-25 23:30 被阅读220次

    当业务处理比较耗时时, 接口一般会采用异步处理的方式, 这种异步处理的方式又叫Future模式.

    一般流程
    当你请求一个异步接口,接口会立刻返回你一个结果告诉你已经开始处理,结果中一般会包含一个任务id类似的东西用于追踪结果, 另外会提供一个查询结果的接口, 当结果未处理完查询接口会返回相应的"未完成"状态, 如果已经处理完,则会返回相应的数据.

    处理方法
    异步接口我们一般采取轮询的方法,每隔一定时间间隔取请求一下查询结果的接口,直到接口返回的状态是已完成/查询到指定数据或超时

    如果异步接口没有提供追踪id和查询接口,我们可以通过同样的方法轮询查取数据库数据或日志数据直到获取到指定结果或超时

    示例接口

    1. 创建订单接口

    请求地址
    http://115.28.108.130:5000/api/order/create/

    请求方法
    GET/POST

    请求格式
    POST: 表单

    参数 类型 说明
    user_id String 用户id
    goods_id String 商品id
    num int 数量
    amount float 总价

    响应示例
    缺少参数:

    {
        "msg": "参数缺失"
    }
    

    成功:

    {
        "order_id": "69561"
    }
    
    1. 获取订单结果接口

    请求地址
    http://115.28.108.130:5000/api/order/get_result/?order_id=***

    请求方法
    GET

    参数 类型 说明
    order_id String 订单id

    响应示例
    创建中:

    {}
    

    创建成功:

    {
        "amount": "20.0",
        "goods_id": "123",
        "num": "2",
        "user_id": "123"
    }
    

    Python实现方法

    import time
    import requests
    
    def create_order():
        url = "http://115.28.108.130:5000/api/order/create/"   # 异步接口
        data = {
            "user_id": "1234",
            "goods_id": "136",
            "num": 10,
            "amount": 20.00
        }
        res = requests.post(url=url, data=data)
        return res.json().get("order_id")  # 返回order_id用于追踪
    
    def get_order_result(interval = 1, time_out = 60):  # 设置了默认时间间隔和超时时间,可以修改
        order_id = create_order()
        # 查询结果接口
        url = "http://115.28.108.130:5000/api/order/get_result/?order_id={}".format(order_id) 
        start_time = time.time()  # 启动时间
        end_time = start_time + time_out  启动时间+超时时间=结束时间
        count = 1  # 计数器, 此处是为了显示更直观, 可以去掉
        while time.time() < end_time:   # 当未到结束时间时, 循环请求
            res = requests.get(url)  # 请求查询结果接口
            print(count, res.json())
            count += 1
            time.sleep(interval)   # 休眠指定时间
            if res.json():   # 如果有数据则退出循环
                break
        else:
            return None   # 正常退出(达到end_time, 非break退出) 返回None
        return res.json()  # break退出返回 接口数据
    
    
    if __name__ == '__main__':
        order_result = get_order_result()
        print(order_result)
    
    

    相关文章

      网友评论

        本文标题:Python怎么测试异步接口

        本文链接:https://www.haomeiwen.com/subject/lttoyqtx.html