美文网首页
Fast API 实例

Fast API 实例

作者: 旭Louis | 来源:发表于2021-01-06 16:45 被阅读0次

FastApi学习笔记

官方文档:https://fastapi.tiangolo.com/

安装

pip install fastapi
pip install uvicorn[standard]

快速开始

from typing import Optional

from fastapi import FastAPI

app = FastAPI()


@app.get("/")
def read_root():
    return {"Hello": "World"}


@app.get("/items/{item_id}")
def read_item(item_id: int, q: Optional[str] = None):
    return {"item_id": item_id, "q": q}

启动服务

uvicorn main:app --reload
image.png

常用例子集合

from enum import Enum
from typing import Optional, List, Set
from uuid import UUID

from fastapi import FastAPI, Query, Path, Body, Cookie, Header, Form, File, UploadFile, Request
from fastapi.responses import HTMLResponse
from pydantic import BaseModel, HttpUrl, Field, EmailStr
from datetime import datetime, time, timedelta
from fastapi.staticfiles import StaticFiles

app = FastAPI()
# 加载静态文件
app.mount("/static", StaticFiles(directory="static"), name="static")


@app.get("/")
async def root():
    return {"message": "Hello World"}

fake_items_db = [{"item_name": "Foo"}, {"item_name": "Bar"}, {"item_name": "Baz"}]


@app.get("/items/")
async def read_item(skip: int = 0, limit: int = 10):
    return fake_items_db[skip: skip + limit]


@app.get("/v2/items/{item_id}")
async def read_item(item_id: str, q: Optional[str] = None):
    """
    q是可选参数
    """
    if q:
        return {"item_id": item_id, "q": q}
    return {"item_id": item_id}


@app.get("/v3/items/{item_id}")
async def read_item(item_id: str, q: Optional[str] = None, short: bool = False):
    """
    short=1,true,True,on, yes都是真值 <br/>
    <p>item_id: ID </p>
    <h3>:param q: string变量 </h3>
    :param short: bool变量 /n
    :return: 文本
    """
    item = {"item_id": item_id}
    if q:
        item.update({"q": q})
    if not short:
        item.update(
            {"description": "This is an amazing item that has a long description"}
        )
    return item


class ModelName(str, Enum):
    alexnet = "alexnet"
    resnet = "resnet"
    lenet = "lenet"


@app.get("/models/{model_name}")
async def get_model(model_name: ModelName):
    if model_name == ModelName.alexnet:
        return {"model_name": model_name, "message": "Deep Learning FTW!"}

    if model_name.value == "lenet":
        return {"model_name": model_name, "message": "LeCNN all the images"}

    return {"model_name": model_name, "message": "Have some residuals"}


class Item(BaseModel):
    name: str
    # None可为空
    description: Optional[str] = None
    price: float
    tax: Optional[float] = None


@app.post("/v4/items/")
async def create_item(item: Item):
    item_dict = item.dict()
    if item.tax:
        price_with_tax = item.price + item.tax
        item_dict.update({"price_with_tax": price_with_tax})
    return item_dict


@app.put("/v5/items/{item_id}")
async def create_item(item_id: int, item: Item, q: Optional[str] = None):
    result = {"item_id": item_id, **item.dict()}
    if q:
        result.update({"q": q})
    return result


@app.get("/v6/items/")
async def read_items(q: Optional[str] = Query(None, min_length=3, max_length=50)):
    # query可以作为校验
    results = {"items": [{"item_id": "Foo"}, {"item_id": "Bar"}]}
    if q:
        results.update({"q": q})
    return results


@app.get("/v7/items/")
async def read_items(q: str = Query(..., min_length=3)):
    # ... 代表必要参数
    results = {"items": [{"item_id": "Foo"}, {"item_id": "Bar"}]}
    if q:
        results.update({"q": q})
    return results


@app.get("/v8/items/")
async def read_items(q: Optional[List[str]] = Query(None)):
    # 会去校验lis里面的每一个值是否为字符串
    query_items = {"q": q}
    return query_items


@app.get("/v9/items/{item_id}")
async def read_items(
    *,
    item_id: int = Path(..., title="The ID of the item to get", ge=0, le=1000),
    q: str,
    size: float = Query(..., gt=0, lt=10.5)
):
    results = {"item_id": item_id}
    if q:
        results.update({"q": q})
    return results


class User(BaseModel):
    username: str
    full_name: Optional[str] = None


@app.put("/v10/items/{item_id}")
async def update_item(
    item_id: int, item: Item, user: User, importance: int = Body(...)
):
    # 多个请求体,把参数importance放在请全体里面,而不是一个但一值
    results = {"item_id": item_id, "item": item, "user": user, "importance": importance}
    return results


class Image(BaseModel):
    # 带上url格式校验
    url: HttpUrl
    name: str


class Item2(BaseModel):
    name: str
    description: Optional[str] = None
    price: float
    tax: Optional[float] = None
    tags: Set[str] = set()
    images: Optional[List[Image]] = None

    class Config:
        # 模型补充说明
        schema_extra = {
            "example": {
                "name": "Foo",
                "description": "A very nice Item",
                "price": 35.4,
                "tax": 3.2,
            }
        }


@app.put("/v11/items/{item_id}")
async def update_item(item_id: int, item: Item2):
    # 多个嵌套model
    results = {"item_id": item_id, "item": item}
    return results


class Item3(BaseModel):
    # 传递的那些额外参数不会添加任何验证,只会添加注释,用于文档的目的
    name: str = Field(..., example="Aoo")
    description: Optional[str] = Field(None, example="描述")
    price: float = Field(..., example=5.4)
    tax: Optional[float] = Field(None, example=3.2)


@app.put("/v12/items/{item_id}")
async def update_item(
    item_id: int,
    item: Item3 = Body(
        ...,
        example={
            "name": "Foo",
            "description": "A very nice Item",
            "price": 35.4,
            "tax": 3.2,
        },
    ),
):
    results = {"item_id": item_id, "item": item}
    return results


@app.put("/v14/items/{item_id}")
async def read_items(
    item_id: UUID,
    start_datetime: Optional[datetime] = Body(None),
    end_datetime: Optional[datetime] = Body(None),
    repeat_at: Optional[time] = Body(None),
    process_after: Optional[timedelta] = Body(None),
):
    # 日期,UUID等其它类型参数
    start_process = start_datetime + process_after
    duration = end_datetime - start_process
    return {
        "item_id": item_id,
        "start_datetime": start_datetime,
        "end_datetime": end_datetime,
        "repeat_at": repeat_at,
        "process_after": process_after,
        "start_process": start_process,
        "duration": duration,
    }


@app.get("/v15/items/")
async def read_items(ads_id: Optional[str] = Cookie(None)):
    # cookie使用
    return {"ads_id": ads_id}


@app.get("/v15/items/")
async def read_items(user_agent: Optional[str] = Header(None)):
    # Header参数使用
    return {"User-Agent": user_agent}


class Item5(BaseModel):
    name: str
    description: Optional[str] = None
    price: float
    tax: float = 10.5


items = {
    "foo": {"name": "Foo", "price": 50.2},
    "bar": {"name": "Bar", "description": "The Bar fighters", "price": 62, "tax": 20.2},
    "baz": {
        "name": "Baz",
        "description": "There goes my baz",
        "price": 50.2,
        "tax": 10.5,
    },
}


@app.get(
    "/v16/items/{item_id}/name",
    response_model=Item5,
    response_model_include={"name", "description"},
)
async def read_item_name(item_id: str):
    # 过滤隐私信息
    return items[item_id]


@app.get("/v16/items/{item_id}/public", response_model=Item5, response_model_exclude={"tax"})
async def read_item_public_data(item_id: str):
    # 过滤隐私信息
    return items[item_id]


class UserIn(BaseModel):
    username: str
    password: str
    email: EmailStr
    full_name: Optional[str] = None


class UserOut(BaseModel):
    username: str
    email: EmailStr
    full_name: Optional[str] = None


@app.post("/v3/user/", response_model=UserOut)
async def create_user(user: UserIn):
    # 创建专用输出模型,过滤隐私
    return user


@app.post("/login/")
async def login(username: str = Form(...), password: str = Form(...)):
    # 接受From表单传过来的数据
    return {"username": username, "passwd": password}


@app.post("/single/file/")
async def create_file(file: bytes = File(...)):
    return {"file_size": len(file)}


@app.post("/single/uploadfile/")
async def create_upload_file(file: UploadFile = File(...)):
    return {"filename": file.filename}


@app.post("/files/")
async def create_files(files: List[bytes] = File(...)):
    return {"file_sizes": [len(file) for file in files]}


@app.post("/uploadfiles/")
async def create_upload_files(files: List[UploadFile] = File(...)):
    return {"filenames": [file.filename for file in files]}


@app.get("/file_from")
async def main():
    content = """
<body>
<form action="/files/" enctype="multipart/form-data" method="post">
<input name="files" type="file" multiple>
<input type="submit">
</form>
<form action="/uploadfiles/" enctype="multipart/form-data" method="post">
<input name="files" type="file" multiple>
<input type="submit">
</form>
</body>
    """
    return HTMLResponse(content=content)


@app.get("/v17/items/{item_id}")
def read_root(item_id: str, request: Request):
    # 获取请求的所有数据
    client_host = request.client.host
    return {"client_host": client_host, "item_id": item_id}

更多大型项目例子:https://fastapi.tiangolo.com/zh/project-generation/

代码地址:https://gitee.com/louis_chen_admin/fast_api_demo

相关文章

网友评论

      本文标题:Fast API 实例

      本文链接:https://www.haomeiwen.com/subject/lvxooktx.html