fastapi

作者: dittoyy3991 | 来源:发表于2024-07-11 15:18 被阅读0次

https://fastapi.tiangolo.com/zh/
对着敲的,差不多敲了12天了。。。学习速度太慢了,留痕留个脚印,算是笔记了


#!/usr/bin/python3
from fastapi import FastAPI,Query,Path,Body,Cookie,Header
from pydantic import BaseModel,Field,HttpUrl,EmailStr
import fastapi_cdn_host

#Query 为查询参数声明更多的校验和元数据的方式相同,你也可以使用 Path 为路径参数声明相同类型的校验和元数据。
# Field 在 Pydantic 模型内部声明校验和元数据
from typing_extensions import Annotated

app = FastAPI()
fastapi_cdn_host.patch_docs(app)


# @app.get("/items/{item_id}")
# async def read_item(item_id: int):
#     return {"item_id": item_id}

#有顺序的执行,所以同样路径的,前面放前面
@app.get("/users/me")
async def read_user_me():
    return {"user_id": "the current user"}


@app.get("/users/{user_id}")
async def read_user(user_id: str):
    return {"user_id": user_id}


from enum import Enum
# Enum枚举值,预设值 继承str和Enum
class ModelName(str, Enum):
    alexnet = "alexnet"
    resnet = "resnet"
    lenet = "lenet"


# 路径参数带{}
@app.get("/models/{model_name}")
async def get_model(model_name: ModelName):
    if model_name is ModelName.alexnet:
        return {"model_name": model_name, "message": "Deep Learning FTW!"}

    if model_name.value == "lenet":
        return {"model_name": model_name, "message": "LeCNN all the images"}

    return {"model_name": model_name, "message": "Have some residuals"}

# Starlette 内置工具 路径转换器¶:path  /files//home/johndoe/myfile.txt
@app.get("/files/{file_path:path}")
async def read_file(file_path: str):
    return {"file_path": file_path}


#查询参数  可选参数+默认值 q: str | None = None
# FastAPI 不使用 Optional[str] 中的 Optional(只使用 str)
fake_items_db = [{"item_name": "Foo"}, {"item_name": "Bar"}, {"item_name": "Baz"}]
@app.get("/items/")
async def read_item(skip: int = 0, limit: int = 10):
    return fake_items_db[skip : skip + limit]

from typing import Union,List,Set,Dict,Any


# @app.get("/items/{item_id}")
# async def read_item(item_id: str, q: str | None = None):
#     if q:
#         return {"item_id": item_id, "q": q}
#     return {"item_id": item_id}

async def read_item(item_id: str, q:[str, None] = None):
    if q:
        return {"item_id": item_id, "q": q}
    return {"item_id": item_id}

#请求体 BaseModel才行  请求体格式为 JSON 对象(即 Python 字典)
# {
#     "name": "Foo",
#     "price": 45.2
# }
# class Item(BaseModel):
#     name: str
#     description: str | None = None
#     price: float
#     tax: float | None = None
class Item(BaseModel):
    name: str
    description: Union[str, None] = None
    price: float
    tax: Union[float, None] = None


@app.post("/items1/")
async def create_item(item: Item):
    return item


#Query过滤请求参数条件
@app.get("/items2/")
async def read_items(q: Union[str, None] = Query( max_length=50)):
    results = {"items": [{"item_id": "Foo"}, {"item_id": "Bar"}]}
    if q:
        results.update({"q": q})
    return results

#Query+正则表达式过滤
@app.get("/items3/")
async def read_items(
    q: Union[str, None] = Query(
         min_length=3, max_length=50, pattern="^fixedquery$"
    ),
):
    results = {"items": [{"item_id": "Foo"}, {"item_id": "333"}]}
    if q:
        results.update({"q": q})
    return results

# Query 且需要声明一个值是必需的时,只需不声明默认参数
# 使用省略号(...)声明必需参数¶
@app.get("/items4/")
async def read_items(q: str = Query(default=..., min_length=3)):
    results = {"items": [{"item_id": "Foo"}, {"item_id": "444"}]}
    if q:
        results.update({"q": q})
    return results

#声明一个参数可以接收None值,但它仍然是必需的。这将强制客户端发送一个值,即使该值是None
#title description 添加 描述--添加更多有关该参数的信息。
# deprecated 标识废弃的接口,不用删
# alias="item-query"别名参数
@app.get("/items5/")
async def read_items(q: Union[str, None] = Query(default=...,  title="Query string",deprecated=True, alias="item-query",
        description="Query string for the items to search in the database that have a good match",
         min_length=3)):
    results = {"items": [{"item_id": "Foo"}, {"item_id": "555"}]}
    if q:
        results.update({"q": q})
    return results

#Required代替省略号(... 必须必填
from pydantic import Required
@app.get("/items6/")
async def read_items(q: str = Query(default=Required, min_length=3)):
    results = {"items": [{"item_id": "Foo"}, {"item_id": "666"}]}
    if q:
        results.update({"q": q})
    return results

#可在 URL 中出现多次的查询参数 q ?q=foo&q=bar
# 要声明类型为 list 的查询参数,如上例所示,你需要显式地使用 Query,否则该参数将被解释为请求体。
@app.get("/items7/")
async def read_items(q: Union[List[str], None] = Query()):
    query_items = {"q": q}
    return query_items

#你也可以直接使用 list 代替 List [str]:
@app.get("/items8/")
async def read_items(q: list = Query(default=[])):
    query_items = {"q": q}
    return query_items
#
# 声明元数据¶
# 你可以声明与 Query 相同的所有参数。
@app.get("/items1/{item_id}")
async def read_items(
    item_id: Annotated[int, Path(title="The ID of the item to get")],
    q: Annotated[Union[str, None], Query(alias="item-query")] = None,
):
    results = {"item_id1": item_id}
    if q:
        results.update({"q": q})
    return results

#按需对参数排序的技巧  不会对该 * 做任何事情,但是它将知道之后的所有参数都应作为关键字参数(键值对),也被称为 kwargs,来调用。即使它们没有默认值。
# ge=1 大于(greater than)或等于(equal)1, gt=0大于(greater than), le=1000小于等于(less than or equal)lt:小于(less than)
@app.get("/items2/{item_id}")
async def read_items(*, item_id: int = Path(title="The ID of the item to get", gt=0, le=1000), q: str):
    results = {"item_id2": item_id}
    if q:
        results.update({"q": q})
    return results

#多个请求体参数 Item Pydantic 模型参数
# {
#     "name": "Foo",
#     "description": "The pretender",
#     "price": 42.0,
#     "tax": 3.2
# }
class Item(BaseModel):
    name: str
    description: Union[str, None] = None
    price: float
    tax: Union[float, None] = None

class User(BaseModel):
    username: str
    full_name: Union[str, None] = None

@app.put("/items3/{item_id}")
async def update_item(
    item_id: Annotated[int, Path(title="The ID of the item to get", ge=0, le=1000)],
    q: Union[str, None] = None,
    item: Union[Item, None] = None,
):
    results = {"item_id": item_id}
    if q:
        results.update({"q": q})
    if item:
        results.update({"item": item})
    return results

## {
#     "item": {
#         "name": "Foo",
#         "description": "The pretender",
#         "price": 42.0,
#         "tax": 3.2
#     },
#     "user": {
#         "username": "dave",
#         "full_name": "Dave Grohl"
#     }
# }
@app.put("/items4/{item_id}")
async def update_item(item_id: int, item: Item, user: User):
    results = {"item_id": item_id, "item": item, "user": user}
    return results

#请求体中的单一值¶Body Body 同样具有与 Query、Path 以及其他后面将看到的类完全相同的额外校验和元数据参数。
# 扩展先前的模型Body,你可能决定除了 item 和 user 之外,还想在同一请求体中具有另一个键 importance。
# 这种importance要写在param里 http://127.0.0.1:8000/items5/2?importance=5

@app.put("/items5/{item_id}")
async def update_item(
    item_id: int, item: Item, user: User, importance: Annotated[int, Body(title="The ID of the item to get",gt=0)],    q: Union[str, None] = None
):
    results = {"item_id": item_id, "item": item, "user": user, "importance": importance}
    return results


#Body(embed=True)一个拥有 item 键并在值中包含模型内容的 JSON 假设你只有一个来自 Pydantic 模型 Item 的请求体参数 item。
#在仅声明了一个请求体参数的情况下,将原本的请求体嵌入到一个键中
# {
#     "name": "Foo",
#     "description": "The pretender",
#     "price": 42.0,
#     "tax": 3.2
# }

class Item(BaseModel):
    name: str
    description: Union[str, None] = None
    price: float
    tax: Union[float, None] = None

@app.put("/items6/{item_id}")
async def update_item(item_id: int, item: Annotated[Item, Body(embed=True)]):
    results = {"item_id": item_id, "item": item}
    return results


#Field使用 Query、Path 都是 Params 的子类,而 Params 类又是 Pydantic 中 FieldInfo 的子类。
# 模型属性的类型、默认值及 Field 的代码结构与路径操作函数的参数相同,只不过是用 Field 替换了Path、Query、Body。
class Item(BaseModel):
    name: str
    description: Union[str, None] = Field(
         title="The description of the item", max_length=30
    )
    price: float = Field(gt=2, description="The price must be greater than zero")
    tax: Union[float, None] = None



@app.put("/items7/{item_id}")
async def update_item(item_id: int, item: Annotated[Item, Body(embed=True)]):
    results = {"item_id": item_id, "item": item}
    return results


#请求体嵌套
# {
#     "name": "Foo",
#     "description": "The pretender",
#     "price": 42.0,
#     "tax": 3.2,
#     "tags": ["rock", "metal", "bar"],
#     "tags2": ["rock", "metal", "bar"],
#  "images": [
#         {
#             "url": "http://example.com/baz.jpg",
#             "name": "The Foo live"
#         },
#         {
#             "url": "http://example.com/dave.jpg",
#             "name": "The Baz"
#         }
#     ]
# }


class Image(BaseModel):
    url: HttpUrl
    name: str

class Item(BaseModel):
    name: str
    description: Union[str, None] = None
    price: float
    tax: Union[float, None] = None
    tags: list = [] #    tags: List[str] = []字符串列表
    tags2: Set[str] = set()
    image: Union[List[Image], None] = None


@app.put("/items11/{item_id}")
async def update_item(item_id: int, item: Item):
    results = {"item_id": item_id, "item": item}
    return results

#深度嵌套,循环套啊
# {
#     "name": "Foo",
#     "description": "The pretender",
#     "price":66,
#     "items": [
#         {
#     "name": "Foo",
#     "description": "The pretender",
#     "price": 42.0,
#     "tax": 3.2,
#     "tags": ["rock", "metal", "bar"],
#     "tags2": ["rock", "metal", "bar"],
#  "images": [
#         {
#             "url": "http://example.com/baz.jpg",
#             "name": "The Foo live"
#         },
#         {
#             "url": "http://example.com/dave.jpg",
#             "name": "The Baz"
#         }
#     ]
# }
#     ]
# }
class Offer(BaseModel):
    name: str
    description: Union[str, None] = None
    price: float
    items: List[Item]


@app.post("/offers/")
async def create_offer(offer: Offer):
    return offer

@app.post("/images/multiple/")
async def create_multiple_images(*,images: List[Image]):
    for image in images:
        image.url
        image.name
    return images.url

#Dict   JSON 仅支持将 str 作为键。但是 Pydantic 具有自动转换数据的功能 接收一些尚且未知的键
# {
#     "7":"4.56",
#     "8": "890."
# }
@app.post("/index-weights/")
async def create_index_weights(weights: Dict[int, float]):
    return weights


#可以使用 Config 和 schema_extra 为Pydantic模型声明一个示例
class Item2(BaseModel):
    name: str
    description: Union[str, None] = None
    price: float
    tax: Union[float, None] = None

    model_config = {
        "json_schema_extra": {
            "examples": [
                {
                    "name": "Foo",
                    "description": "A very nice Item",
                    "price": 35.4,
                    "tax": 3.2,
                }
            ]
        }
    }


@app.put("/items01/{item_id}")
async def update_item(item_id: int, item: Item2):
    results = {"item_id": item_id, "item": item}
    return results

#Field example 传递的那些额外参数不会添加任何验证,只会添加注释,用于文档的目的。
class Item3(BaseModel):
    name: str = Field(examples=["Foo"])
    description: Union[str, None] = Field( examples=["A very nice Item"])
    price: float = Field(examples=[35.4])
    tax: Union[float, None] = Field( examples=[3.2])

#在 Field, Path, Query, Body 和其他你之后将会看到的工厂函数  field额外参数
@app.put("/items02/{item_id}")
async def update_item(item_id: int, item: Item3):
    results = {"item_id": item_id, "item": item}
    return results

# Body 额外参数 请求体举例子examples
@app.put("/items03/{item_id}")
async def update_item(
    item_id: int,
    item: Annotated[
        Item3,
        Body(
            examples=[
                {
                    "name": "Foo",
                    "description": "A very nice Item",
                    "price": 35.4,
                    "tax": 3.2
                }
            ]
        ),
    ],
):
    results = {"item_id": item_id, "item": item}
    return results


#额外数据类型 74CBEAE1-859B-54BF-EEB8-EC3E60E65426
from datetime import datetime, date, time, timedelta
from uuid import UUID

@app.put("/items001/{item_id}")
async def read_items(
    item_id: UUID,
    start_datetime: Annotated[datetime, Body()],
    end_datetime: Annotated[datetime, Body()],
    process_after: Annotated[timedelta, Body()],
    repeat_at: Annotated[Union[time, None], Body()] = None,
):
    start_process = start_datetime + process_after
    duration = end_datetime - start_process
    return {
        "item_id": item_id,
        "start_datetime": start_datetime,
        "end_datetime": end_datetime,
        "process_after": process_after,
        "repeat_at": repeat_at,
        "start_process": start_process,
        "duration": duration,
    }

#导入cookie ,header   Cookie 、Path 、Query 是兄弟类,都继承自共用的 Param 类。
@app.get("/items002/")
async def read_items(ads_id: Annotated[Union[str, None], Cookie()] = None):
    return {"ads_id": ads_id}

# 重复请求头,不用担心变量中的下划线 无需把首字母大写
# 类型声明中可以使用 list 定义多个请求头。
# 使用 Python list 可以接收重复请求头所有的值。
# ,FastAPI 可以自动转换。如需禁用下划线自动转换为连字符,可以把 Header 的 convert_underscores 参数设置为 False
@app.get("/items003/")
async def read_items(user_agent: Annotated[Union[str, None], Header(convert_underscores=False)] = None):
    return {"User-Agent": user_agent}
@app.get("/items004/")
async def read_items(x_token: Annotated[Union[List[str], None], Header()] = None):
    return {"X-Token values": x_token}


# response_model是「装饰器」方法(get,post 等)的一个参数。不像之前的所有参数和请求体,它不属于路径操作函数。
#响应模型
class Item01(BaseModel):
    name: str
    description: Union[str, None] = None
    price: float
    tax: Union[float, None] = None
    tags: List[str] = []


@app.post("/items005/", response_model=Item01)
async def create_item(item: Item) -> Any:
    return item

@app.get("/items006/", response_model=List[Item01])
async def read_items() -> Any:
    return [
        {"name": "Portal Gun", "price": 42.0},
        {"name": "Plumbus", "price": 32.0},
    ]

#返回与输入相同的数据
   # { "username": "st111r",
   #  "password": "str",
   #  "email": "ee@2.com",
   #  "full_name":"wrw" }
class UserIn(BaseModel):
    username: str
    password: str
    email: EmailStr
    full_name: Union[str, None] = None

class UserOut(BaseModel):
    username: str
    email: EmailStr
    full_name: Union[str, None] = None

# Don't do this in production!
# 因此,FastAPI 将会负责过滤掉未在输出模型中声明的所有数据(使用 Pydantic)。
@app.post("/user1/", response_model=UserOut)
async def create_user(user: UserIn) -> UserIn:
    return user

#来省略某些属性
#返回不包含
# response_model_exclude_defaults=True
# response_model_exclude_none=True
# response_model_by_alias

#仅包含
# response_model_include=["name", "description"]  使用[]
# response_model_include = {"name", "description"}
class Item6(BaseModel):
    name: str
    description: Union[str, None] = None
    price: float
    tax: float = 10.5
    tags: List[str] = []


items = {
    "foo": {"name": "Foo", "price": 50.2},
    "bar": {"name": "Bar", "description": "The bartenders", "price": 62, "tax": 20.2},
    "baz": {"name": "Baz", "description": None, "price": 50.2, "tax": 10.5, "tags": []},
}

# response_model_exclude_unset参数True,输出忽略未明确设置的字段
# response_model_exclude_defaults = True,忽略跟默认值一样的字段
# response_model_exclude_none = True,忽略None的字段

@app.get("/items61/{item_id}", response_model=Item6, response_model_exclude_unset=True, response_model_include={"name", "description"},response_model_exclude={"tax"})
async def read_item(item_id: str):
    return items[item_id]


class UserInDB(BaseModel):
    username: str
    hashed_password: str
    email: EmailStr
    full_name: Union[str, None] = None


def fake_password_hasher(raw_password: str):
    return "supersecret" + raw_password


def fake_save_user(user_in: UserIn):
    hashed_password = fake_password_hasher(user_in.password)
    print(user_in.password)
    user_in_db = UserInDB(**user_in.dict(), hashed_password=hashed_password)
    print("User saved! ..not really")
    print(hashed_password)
    print(user_in_db)
    return user_in_db

# user_in = UserIn(username="john", password="secret", email="john.doe@example.com")
# user_dict = user_in.dict() #Pydantic 模型支持 .dict() 方法,能返回包含模型数据的字典。
# print(user_dict) 输出的就是 Python 字典:
# {
#     'username': 'john',
#     'password': 'secret',
#     'email': 'john.doe@example.com',
#     'full_name': None,
# }
# UserInDB(**user_dict)


@app.post("/user011/", response_model=UserOut)
async def create_user(user_in: UserIn):
    user_saved = fake_save_user(user_in)
    return user_saved


#union 响应可以声明为两种类型的 Union 类型,即该响应可以是两种类型中的任意类型
class BaseItem(BaseModel):
    description: str
    type: str


class CarItem(BaseItem):
    type: str = "car"


class PlaneItem(BaseItem):
    type: str = "plane"
    size: int


items = {
    "item1": {"description": "All my friends drive a low rider", "type": "car"},
    "item2": {
        "description": "Music is my aeroplane, it's my aeroplane",
        "type": "plane",
        "size": 5,
    },
}


@app.get("/items62/{item_id}", response_model=Union[PlaneItem, CarItem])
async def read_item(item_id: str):
    return items[item_id]


#由对象列表构成的响应
items = [
    {"name": "Foo", "description": "There comes my hero"},
    {"name": "Red", "description": "It's my aeroplane"},
]
class Item8(BaseModel):
    name: str
    description: str

@app.get("/items63/", response_model=List[Item8])
async def read_items():
    return items

#任意的 dict 都能用于声明响应,只要声明键和值的类型,无需使用 Pydantic 模型
@app.get("/keyword-weights/", response_model=Dict[str, float], status_code=201)
async def read_keyword_weights():
    return {"foo": 2.3, "bar": 3.4}

# 响应状态码 status_code=status.HTTP_201_CREATED 也可以使用 from starlette import status。
from fastapi import FastAPI, status
@app.post("/items/", status_code=status.HTTP_201_CREATED)
async def create_item(name: str):
    return {"name": name}

# pip install python-multipart 表单数据 Form  放在请求body里的,不是查询参数里的
# 表单数据的「媒体类型」编码一般为 application/x-www-form-urlencoded。
# 但包含文件的表单编码为 multipart/form-data。文件处理详见下节。
from fastapi import FastAPI, Form
@app.post("/login21/")
async def login(username: str = Form(), password: str = Form()):
    return {"username": username}

#请求文件 File UploadFile 与 bytes 相比有更多优势:
# filename:上传文件名字符串(str),例如, myimage.jpg;
# content_type:内容类型(MIME 类型 / 媒体类型)字符串(str),例如,image/jpeg;
# file: SpooledTemporaryFile( file-like 对象)。其实就是 Python文件,可直接传递给其他预期 file-like 对象的函数或支持库。
# 不包含文件时,表单数据一般用 application/x-www-form-urlencoded「媒体类型」编码。
# 但表单包含文件时,编码为 multipart/form-data。使用了 File
from fastapi import FastAPI, File, UploadFile
@app.post("/files/")
async def create_file(file: bytes = File()):
    return {"file_size": len(file)}

@app.post("/uploadfile/")
async def create_upload_file(file: UploadFile= File()):
    return {"filename": file.filename}

@app.post("/files1/")
async def create_file(file: Union[bytes, None] = File(description="A file read as UploadFile")):
    if not file:
        return {"message": "No file sent"}
    else:
        return {"file_size": len(file)}


@app.post("/uploadfile1/")
async def create_upload_file(file: Union[UploadFile, None] = None,):
    if not file:
        return {"message": "No upload file sent"}
    else:
        return {"filename": file.filename}

#多文件上传
from fastapi.responses import HTMLResponse

@app.post("/files3/")
async def create_files(files: List[bytes] = File()):
    return {"file_sizes": [len(file) for file in files]}


@app.post("/uploadfiles3/")
async def create_upload_files(files: List[UploadFile]):
    return {"filenames": [file.filename for file in files]}


@app.get("/")
async def main():
    content = """
<body>
<form action="/files3/" enctype="multipart/form-data" method="post">
<input name="files" type="file" multiple>
<input type="submit">
</form>
<form action="/uploadfiles3/" enctype="multipart/form-data" method="post">
<input name="files" type="file" multiple>
<input type="submit">
</form>
</body>
    """
    return HTMLResponse(content=content)

#请求表单与文件
@app.post("/files00/")
async def create_file(
    file: bytes = File(), fileb: UploadFile = File(), token: str = Form()
):
    return {
        "file_size": len(file),
        "token": token,
        "fileb_content_type": fileb.content_type,
    }


#处理错误4xx HTTPException HTTPException 是额外包含了和 API 有关数据的常规 Python 异常。因为是 Python 异常,所以不能 return,只能 raise
from fastapi import FastAPI, HTTPException

items = {"foo": "The Foo Wrestlers"}

@app.get("/items30/{item_id}")
async def read_item(item_id: str):
    if item_id not in items:
        raise HTTPException(status_code=404, detail="Item not found")
    return {"item": items[item_id]}

#有些场景下要为 HTTP 错误添加自定义响应头
@app.get("/items-header/{item_id}")
async def read_item_header(item_id: str):
    if item_id not in items:
        raise HTTPException(
            status_code=404,
            detail="Item not found",
            headers={"X-Error": "There goes my error"},
        )
    return {"item": items[item_id]}

#全局的 自定义异常 UnicornException 可以用 @app.exception_handler() 添加自定义异常控制器:
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
class UnicornException(Exception):
    def __init__(self, name: str):
        self.name = name


@app.exception_handler(UnicornException)
async def unicorn_exception_handler(request: Request, exc: UnicornException):
    return JSONResponse(
        status_code=418,
        content={"message": f"Oops! {exc.name} did something. There goes a rainbow..."},
    )


@app.get("/unicorns/{name}")
async def read_unicorn(name: str):
    if name == "yolo":
        raise UnicornException(name=name)
    return {"unicorn_name": name}

#覆盖异常 请求中包含无效数据时,FastAPI 内部会触发 RequestValidationError。该异常也内置了默认异常处理器。

# 覆盖默认异常处理器时需要导入 RequestValidationError  。,并用 @app.excption_handler(RequestValidationError) 装饰异常处理器
from fastapi.exceptions import RequestValidationError
from fastapi.responses import PlainTextResponse
from starlette.exceptions import HTTPException as StarletteHTTPException
from fastapi.exception_handlers import (
    http_exception_handler,
    request_validation_exception_handler,
)
@app.exception_handler(StarletteHTTPException)
async def http_exception_handler(request, exc):
    return PlainTextResponse(str(exc.detail), status_code=exc.status_code)


@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request, exc):
    return PlainTextResponse(str(exc), status_code=400)


@app.get("/items34/{item_id}")
async def read_item(item_id: int):
    if item_id == 3:
        raise HTTPException(status_code=418, detail="Nope! I don't like 3.")
    return {"item_id": item_id}


# RequestValidationError 包含其接收到的无效数据请求的 body 收到的响应包含 body 信息,并说明数据是无效的:
from fastapi.encoders import jsonable_encoder

@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
    print(exc.body)
    return JSONResponse(
        status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
        content=jsonable_encoder({"detail": exc.errors(), "body": exc.body}),
    )

class Item11(BaseModel):
    title: str
    size: int


@app.post("/items36/")
async def create_item(item: Item11):
    return item


@app.exception_handler(StarletteHTTPException)
async def custom_http_exception_handler(request, exc):
    print(f"OMG! An HTTP error!: {repr(exc)}")
    return await http_exception_handler(request, exc)


@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request, exc):
    print(f"OMG! The client sent invalid data!: {exc}")
    return await request_validation_exception_handler(request, exc)


@app.get("/items35/{item_id}")
async def read_item(item_id: int):
    if item_id == 3:
        raise HTTPException(status_code=418, detail="Nope! I don't like 3.")
    return {"item_id": item_id}

#路径操作配置
# tags 参数的值是由 str 组成的 list (一般只有一个 str ),tags 用于为路径操作添加标签:
# 路径装饰器还支持 summary 和 description 这两个参数:
# 描述内容比较长且占用多行时,可以在函数的 docstring 中声明路径操作的描述,FastAPI 支持从文档字符串中读取描述内容。
# response_description 只用于描述响应,description 一般则用于描述路径操作。 响应描述 response_description="The created item",
# deprecated 参数可以把路径操作标记为弃用,无需直接删除:


class Item17(BaseModel):
    name: str
    description: Union[str, None] = None
    price: float
    tax: Union[float, None] = None
    tags: Set[str] = set()


@app.post("/items70/", response_model=Item17,tags=["items"], status_code=status.HTTP_201_CREATED,summary="Create an item",
    description="Create an item with all the information, name, description, price, tax and a set of unique tags",response_description="The created item",)
async def create_item(item: Item17):
    """
    Create an item with all the information:

    - **name**: each item must have a name
    - **description**: a long description
    - **price**: required
    - **tax**: if the item doesn't have tax, you can omit this
    - **tags**: a set of unique tag strings for this item
    """
    return item

@app.get("/items71/", tags=["items"])
async def read_items():
    return [{"name": "Foo", "price": 42}]


@app.get("/users72/", tags=["users"], deprecated=True)
async def read_users():
    return [{"username": "johndoe"}]


#将数据类型(如Pydantic模型)转换为与JSON兼容的数据类型(如dict、list等)。
# FastAPI提供了jsonable_encoder()函数

fake_db = {}


class Item18(BaseModel):
    title: str
    timestamp: datetime
    description: Union[str, None] = None


@app.put("/items118/{id}")
def update_item(id: str, item: Item18 ,response_model=Item18):
    print(item)
    json_compatible_item_data = jsonable_encoder(item)
    fake_db[id] = json_compatible_item_data
    print(fake_db)
    return fake_db


#
class Item20(BaseModel):
    name: Union[str, None] = None
    description: Union[str, None] = None
    price: Union[float, None] = None
    tax: float = 10.5
    tags: List[str] = []


items = {
    "foo": {"name": "Foo", "price": 50.2},
    "bar": {"name": "Bar", "description": "The bartenders", "price": 62, "tax": 20.2},
    "baz": {"name": "Baz", "description": None, "price": 50.2, "tax": 10.5, "tags": []},
}


@app.get("/items201/{item_id}", response_model=Item20)
async def read_item(item_id: str):
    return items[item_id]


#PUT 用于接收替换现有数据的数据。
@app.put("/items202/{item_id}", response_model=Item20)
async def update_item(item_id: str, item: Item20):
    print(item)
    update_item_encoded = jsonable_encoder(item)
    print(update_item_encoded)
    print(item)
    items[item_id] = update_item_encoded
    print(update_item_encoded)
    return update_item_encoded
#只更新用户设置过的值,不用模型中的默认值覆盖已存储过的值。
#patch 更新部分数据时,可以在 Pydantic 模型的 .dict() 中使用 exclude_unset 参数。
@app.patch("/items203/{item_id}", response_model=Item20)
async def update_item(item_id: str, item: Item20):
    stored_item_data = items[item_id]
    stored_item_model = Item(**stored_item_data)
    update_data = item.dict(exclude_unset=True)
    updated_item = stored_item_model.copy(update=update_data)
    items[item_id] = jsonable_encoder(updated_item)
    return updated_item

# patch 接下来,用 .copy() 为已有模型创建调用 update 参数的副本,该参数为包含更新数据的 dict。
@app.patch("/items204/{item_id}", response_model=Item20)
async def update_item(item_id: str, item: Item20):
    stored_item_data = items[item_id]
    stored_item_model = Item(**stored_item_data)
    update_data = item.dict(exclude_unset=True)
    updated_item = stored_item_model.copy(update=update_data)
    items[item_id] = jsonable_encoder(updated_item)
    return updated_item

#Deppends创建 依赖项就是一个函数,且可以使用与路径操作函数相同的参数:
# 这里只能传给 Depends 一个参数。
# 且该参数必须是可调用对象,比如函数
from fastapi import Depends, FastAPI
async def common_parameters(
    q: Union[str, None] = None, skip: int = 0, limit: int = 100
):
    return {"q": q, "skip": skip, "limit": limit}

@app.get("/items300/")
async def read_items(commons: dict = Depends(common_parameters)):
    return commons


@app.get("/users300/")
async def read_users(commons: dict = Depends(common_parameters)):
    return commons

#中间件 要创建中间件你可以在函数的顶部使用装饰器 @app.middleware("http").
import time

@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
    start_time = time.time()
    response = await call_next(request)
    process_time = time.time() - start_time
    response.headers["X-Process-Time"] = str(process_time)
    return response

#oauth2
from fastapi.security import OAuth2PasswordBearer
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

#并不实用
@app.get("/items400/")
async def read_items(token: str = Depends(oauth2_scheme)):
    return {"token": token}

class User(BaseModel):
    username: str
    email: Union[str, None] = None
    full_name: Union[str, None] = None
    disabled: Union[bool, None] = None


def fake_decode_token(token):
    return User(
        username=token + "fakedecoded", email="john@example.com", full_name="John Doe"
    )


async def get_current_user(token: str = Depends(oauth2_scheme)):
    user = fake_decode_token(token)
    return user


@app.get("/users1/me")
async def read_users_me(current_user: User = Depends(get_current_user)):
    return current_user

from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "fakehashedsecret",
        "disabled": False,
    },
    "alice": {
        "username": "alice",
        "full_name": "Alice Wonderson",
        "email": "alice@example.com",
        "hashed_password": "fakehashedsecret2",
        "disabled": True,
    },
}

app = FastAPI()


def fake_hash_password(password: str):
    return "fakehashed" + password


oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")


class User(BaseModel):
    username: str
    email: Union[str, None] = None
    full_name: Union[str, None] = None
    disabled: Union[bool, None] = None


class UserInDB(User):
    hashed_password: str


def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)


def fake_decode_token(token):
    # This doesn't provide any security at all
    # Check the next version
    user = get_user(fake_users_db, token)
    return user


async def get_current_user(token: str = Depends(oauth2_scheme)):
    user = fake_decode_token(token)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authentication credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )
    return user


async def get_current_active_user(current_user: User = Depends(get_current_user)):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user


@app.post("/token1")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    user_dict = fake_users_db.get(form_data.username)
    if not user_dict:
        raise HTTPException(status_code=400, detail="Incorrect username or password")
    user = UserInDB(**user_dict)
    hashed_password = fake_hash_password(form_data.password)
    if not hashed_password == user.hashed_password:
        raise HTTPException(status_code=400, detail="Incorrect username or password")

    return {"access_token": user.username, "token_type": "bearer"}


@app.get("/users20/me")
async def read_users_me(current_user: User = Depends(get_current_active_user)):
    return current_user

# 安装 passlib¶
# Passlib 是处理密码哈希的 Python 包。
#
# 它支持很多安全哈希算法及配套工具。
from datetime import datetime, timedelta, timezone
import jwt
from jwt.exceptions import InvalidTokenError
from passlib.context import CryptContext

# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30


fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
        "disabled": False,
    }
}


class Token(BaseModel):
    access_token: str
    token_type: str


class TokenData(BaseModel):
    username: Union[str, None] = None


class User(BaseModel):
    username: str
    email: Union[str, None] = None
    full_name: Union[str, None] = None
    disabled: Union[bool, None] = None


class UserInDB(User):
    hashed_password: str


pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

app = FastAPI()


def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)


def get_password_hash(password):
    return pwd_context.hash(password)


def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)


def authenticate_user(fake_db, username: str, password: str):
    user = get_user(fake_db, username)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user


def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.now(timezone.utc) + expires_delta
    else:
        expire = datetime.now(timezone.utc) + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt


async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except InvalidTokenError:
        raise credentials_exception
    user = get_user(fake_users_db, username=token_data.username)
    if user is None:
        raise credentials_exception
    return user


async def get_current_active_user(
    current_user: Annotated[User, Depends(get_current_user)],
):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user


# @app.post("/token4")
# async def login_for_access_token(
#     form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
# ) -> Token:
#     user = authenticate_user(fake_users_db, form_data.username, form_data.password)
#     if not user:
#         raise HTTPException(
#             status_code=status.HTTP_401_UNAUTHORIZED,
#             detail="Incorrect username or password",
#             headers={"WWW-Authenticate": "Bearer"},
#         )
#     access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
#     access_token = create_access_token(
#         data={"sub": user.username}, expires_delta=access_token_expires
#     )
#     return Token(access_token=access_token, token_type="bearer")


@app.get("/users42/me/", response_model=User)
async def read_users_me(
    current_user: Annotated[User, Depends(get_current_active_user)],
):
    return current_user


@app.get("/users41/me/items/")
async def read_own_items(
    current_user: Annotated[User, Depends(get_current_active_user)],
):
    return [{"item_id": "Foo", "owner": current_user.username}]


#cors allow_origins - 一个允许跨域请求的源列表
# allow_origin_regex - 一个正则表达式字符串,匹配的源允许跨域请求
# allow_methods - 一个允许跨域请求的 HTTP 方法列表
# allow_headers - 一个允许跨域请求的 HTTP 请求头列表
# allow_credentials - 指示跨域请求支持 cookies
# expose_headers - 指示可以被浏览器访问的响应头
# max_age - 设定浏览器缓存 CORS 响应的最长时间,单位是秒。默认为 600
from fastapi.middleware.cors import CORSMiddleware

origins = [
    "http://localhost.tiangolo.com",
    "https://localhost.tiangolo.com",
    "http://localhost",
    "http://localhost:8080",
]

app.add_middleware(
    CORSMiddleware,
    allow_origins=origins,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)


@app.get("/cors1/")
async def main():
    return {"message": "Hello World"}


# Jinja2Templates
# pip install Jinjia2
# pip install aiofiles # 适用于FastAPI
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates

app.mount("/static", StaticFiles(directory="static"), name="static")


templates = Jinja2Templates(directory="templates")


@app.get("/items0001/{id}", response_class=HTMLResponse)
async def read_item(request: Request, id: str):
    return templates.TemplateResponse(
        request=request, name="item.html", context={"id": id}
    )

from fastapi import FastAPI, Response
@app.get("/headers-and-object/")
def get_headers(response: Response):
    response.headers["X-Cat-Dog"] = "alone in the world"
    return {"message": "Hello World"}

@app.get("/headers/")
def get_headers():
    content = {"message": "Hello World"}
    headers = {"X-Cat-Dog": "alone in the world", "Content-Language": "en-US"}
    return JSONResponse(content=content, headers=headers)


#可选参数
from typing import Optional

# 路径参数+可选参数
@app.get("/items7101/{item_id}")
async def read_item(item_id: str, name: Optional[str] = None):
    return {"item_id": item_id, "name": name}
# 必传参数+可选参数
@app.get("/items7101")
async def read_item(item_id: str, name: Optional[str] = None):
    return {"item_id": item_id, "name": name}


#路由 类似于蓝图
from fastapi import APIRouter
router = APIRouter()


@router.get("/users071/", tags=["users"])
async def read_users():
    return [{"username": "Rick"}, {"username": "Morty"}]


@router.get("/users071/me", tags=["users"])
async def read_user_me():
    return {"username": "fakecurrentuser"}


@router.get("/users071/{username}", tags=["users"])
async def read_user(username: str):
    return {"username": username}

app.include_router(router)


if __name__ == '__main__':
    import uvicorn
    uvicorn.run(app=app, host="127.0.0.1", port=8000, debug=True)

相关文章

网友评论

      本文标题:fastapi

      本文链接:https://www.haomeiwen.com/subject/jzmpcjtx.html