https://fastapi.tiangolo.com/zh/
对着敲的,差不多敲了12天了。。。学习速度太慢了,留痕留个脚印,算是笔记了
#!/usr/bin/python3
from fastapi import FastAPI,Query,Path,Body,Cookie,Header
from pydantic import BaseModel,Field,HttpUrl,EmailStr
import fastapi_cdn_host
#Query 为查询参数声明更多的校验和元数据的方式相同,你也可以使用 Path 为路径参数声明相同类型的校验和元数据。
# Field 在 Pydantic 模型内部声明校验和元数据
from typing_extensions import Annotated
app = FastAPI()
fastapi_cdn_host.patch_docs(app)
# @app.get("/items/{item_id}")
# async def read_item(item_id: int):
# return {"item_id": item_id}
#有顺序的执行,所以同样路径的,前面放前面
@app.get("/users/me")
async def read_user_me():
return {"user_id": "the current user"}
@app.get("/users/{user_id}")
async def read_user(user_id: str):
return {"user_id": user_id}
from enum import Enum
# Enum枚举值,预设值 继承str和Enum
class ModelName(str, Enum):
alexnet = "alexnet"
resnet = "resnet"
lenet = "lenet"
# 路径参数带{}
@app.get("/models/{model_name}")
async def get_model(model_name: ModelName):
if model_name is ModelName.alexnet:
return {"model_name": model_name, "message": "Deep Learning FTW!"}
if model_name.value == "lenet":
return {"model_name": model_name, "message": "LeCNN all the images"}
return {"model_name": model_name, "message": "Have some residuals"}
# Starlette 内置工具 路径转换器¶:path /files//home/johndoe/myfile.txt
@app.get("/files/{file_path:path}")
async def read_file(file_path: str):
return {"file_path": file_path}
#查询参数 可选参数+默认值 q: str | None = None
# FastAPI 不使用 Optional[str] 中的 Optional(只使用 str)
fake_items_db = [{"item_name": "Foo"}, {"item_name": "Bar"}, {"item_name": "Baz"}]
@app.get("/items/")
async def read_item(skip: int = 0, limit: int = 10):
return fake_items_db[skip : skip + limit]
from typing import Union,List,Set,Dict,Any
# @app.get("/items/{item_id}")
# async def read_item(item_id: str, q: str | None = None):
# if q:
# return {"item_id": item_id, "q": q}
# return {"item_id": item_id}
async def read_item(item_id: str, q:[str, None] = None):
if q:
return {"item_id": item_id, "q": q}
return {"item_id": item_id}
#请求体 BaseModel才行 请求体格式为 JSON 对象(即 Python 字典)
# {
# "name": "Foo",
# "price": 45.2
# }
# class Item(BaseModel):
# name: str
# description: str | None = None
# price: float
# tax: float | None = None
class Item(BaseModel):
name: str
description: Union[str, None] = None
price: float
tax: Union[float, None] = None
@app.post("/items1/")
async def create_item(item: Item):
return item
#Query过滤请求参数条件
@app.get("/items2/")
async def read_items(q: Union[str, None] = Query( max_length=50)):
results = {"items": [{"item_id": "Foo"}, {"item_id": "Bar"}]}
if q:
results.update({"q": q})
return results
#Query+正则表达式过滤
@app.get("/items3/")
async def read_items(
q: Union[str, None] = Query(
min_length=3, max_length=50, pattern="^fixedquery$"
),
):
results = {"items": [{"item_id": "Foo"}, {"item_id": "333"}]}
if q:
results.update({"q": q})
return results
# Query 且需要声明一个值是必需的时,只需不声明默认参数
# 使用省略号(...)声明必需参数¶
@app.get("/items4/")
async def read_items(q: str = Query(default=..., min_length=3)):
results = {"items": [{"item_id": "Foo"}, {"item_id": "444"}]}
if q:
results.update({"q": q})
return results
#声明一个参数可以接收None值,但它仍然是必需的。这将强制客户端发送一个值,即使该值是None
#title description 添加 描述--添加更多有关该参数的信息。
# deprecated 标识废弃的接口,不用删
# alias="item-query"别名参数
@app.get("/items5/")
async def read_items(q: Union[str, None] = Query(default=..., title="Query string",deprecated=True, alias="item-query",
description="Query string for the items to search in the database that have a good match",
min_length=3)):
results = {"items": [{"item_id": "Foo"}, {"item_id": "555"}]}
if q:
results.update({"q": q})
return results
#Required代替省略号(... 必须必填
from pydantic import Required
@app.get("/items6/")
async def read_items(q: str = Query(default=Required, min_length=3)):
results = {"items": [{"item_id": "Foo"}, {"item_id": "666"}]}
if q:
results.update({"q": q})
return results
#可在 URL 中出现多次的查询参数 q ?q=foo&q=bar
# 要声明类型为 list 的查询参数,如上例所示,你需要显式地使用 Query,否则该参数将被解释为请求体。
@app.get("/items7/")
async def read_items(q: Union[List[str], None] = Query()):
query_items = {"q": q}
return query_items
#你也可以直接使用 list 代替 List [str]:
@app.get("/items8/")
async def read_items(q: list = Query(default=[])):
query_items = {"q": q}
return query_items
#
# 声明元数据¶
# 你可以声明与 Query 相同的所有参数。
@app.get("/items1/{item_id}")
async def read_items(
item_id: Annotated[int, Path(title="The ID of the item to get")],
q: Annotated[Union[str, None], Query(alias="item-query")] = None,
):
results = {"item_id1": item_id}
if q:
results.update({"q": q})
return results
#按需对参数排序的技巧 不会对该 * 做任何事情,但是它将知道之后的所有参数都应作为关键字参数(键值对),也被称为 kwargs,来调用。即使它们没有默认值。
# ge=1 大于(greater than)或等于(equal)1, gt=0大于(greater than), le=1000小于等于(less than or equal)lt:小于(less than)
@app.get("/items2/{item_id}")
async def read_items(*, item_id: int = Path(title="The ID of the item to get", gt=0, le=1000), q: str):
results = {"item_id2": item_id}
if q:
results.update({"q": q})
return results
#多个请求体参数 Item Pydantic 模型参数
# {
# "name": "Foo",
# "description": "The pretender",
# "price": 42.0,
# "tax": 3.2
# }
class Item(BaseModel):
name: str
description: Union[str, None] = None
price: float
tax: Union[float, None] = None
class User(BaseModel):
username: str
full_name: Union[str, None] = None
@app.put("/items3/{item_id}")
async def update_item(
item_id: Annotated[int, Path(title="The ID of the item to get", ge=0, le=1000)],
q: Union[str, None] = None,
item: Union[Item, None] = None,
):
results = {"item_id": item_id}
if q:
results.update({"q": q})
if item:
results.update({"item": item})
return results
## {
# "item": {
# "name": "Foo",
# "description": "The pretender",
# "price": 42.0,
# "tax": 3.2
# },
# "user": {
# "username": "dave",
# "full_name": "Dave Grohl"
# }
# }
@app.put("/items4/{item_id}")
async def update_item(item_id: int, item: Item, user: User):
results = {"item_id": item_id, "item": item, "user": user}
return results
#请求体中的单一值¶Body Body 同样具有与 Query、Path 以及其他后面将看到的类完全相同的额外校验和元数据参数。
# 扩展先前的模型Body,你可能决定除了 item 和 user 之外,还想在同一请求体中具有另一个键 importance。
# 这种importance要写在param里 http://127.0.0.1:8000/items5/2?importance=5
@app.put("/items5/{item_id}")
async def update_item(
item_id: int, item: Item, user: User, importance: Annotated[int, Body(title="The ID of the item to get",gt=0)], q: Union[str, None] = None
):
results = {"item_id": item_id, "item": item, "user": user, "importance": importance}
return results
#Body(embed=True)一个拥有 item 键并在值中包含模型内容的 JSON 假设你只有一个来自 Pydantic 模型 Item 的请求体参数 item。
#在仅声明了一个请求体参数的情况下,将原本的请求体嵌入到一个键中
# {
# "name": "Foo",
# "description": "The pretender",
# "price": 42.0,
# "tax": 3.2
# }
class Item(BaseModel):
name: str
description: Union[str, None] = None
price: float
tax: Union[float, None] = None
@app.put("/items6/{item_id}")
async def update_item(item_id: int, item: Annotated[Item, Body(embed=True)]):
results = {"item_id": item_id, "item": item}
return results
#Field使用 Query、Path 都是 Params 的子类,而 Params 类又是 Pydantic 中 FieldInfo 的子类。
# 模型属性的类型、默认值及 Field 的代码结构与路径操作函数的参数相同,只不过是用 Field 替换了Path、Query、Body。
class Item(BaseModel):
name: str
description: Union[str, None] = Field(
title="The description of the item", max_length=30
)
price: float = Field(gt=2, description="The price must be greater than zero")
tax: Union[float, None] = None
@app.put("/items7/{item_id}")
async def update_item(item_id: int, item: Annotated[Item, Body(embed=True)]):
results = {"item_id": item_id, "item": item}
return results
#请求体嵌套
# {
# "name": "Foo",
# "description": "The pretender",
# "price": 42.0,
# "tax": 3.2,
# "tags": ["rock", "metal", "bar"],
# "tags2": ["rock", "metal", "bar"],
# "images": [
# {
# "url": "http://example.com/baz.jpg",
# "name": "The Foo live"
# },
# {
# "url": "http://example.com/dave.jpg",
# "name": "The Baz"
# }
# ]
# }
class Image(BaseModel):
url: HttpUrl
name: str
class Item(BaseModel):
name: str
description: Union[str, None] = None
price: float
tax: Union[float, None] = None
tags: list = [] # tags: List[str] = []字符串列表
tags2: Set[str] = set()
image: Union[List[Image], None] = None
@app.put("/items11/{item_id}")
async def update_item(item_id: int, item: Item):
results = {"item_id": item_id, "item": item}
return results
#深度嵌套,循环套啊
# {
# "name": "Foo",
# "description": "The pretender",
# "price":66,
# "items": [
# {
# "name": "Foo",
# "description": "The pretender",
# "price": 42.0,
# "tax": 3.2,
# "tags": ["rock", "metal", "bar"],
# "tags2": ["rock", "metal", "bar"],
# "images": [
# {
# "url": "http://example.com/baz.jpg",
# "name": "The Foo live"
# },
# {
# "url": "http://example.com/dave.jpg",
# "name": "The Baz"
# }
# ]
# }
# ]
# }
class Offer(BaseModel):
name: str
description: Union[str, None] = None
price: float
items: List[Item]
@app.post("/offers/")
async def create_offer(offer: Offer):
return offer
@app.post("/images/multiple/")
async def create_multiple_images(*,images: List[Image]):
for image in images:
image.url
image.name
return images.url
#Dict JSON 仅支持将 str 作为键。但是 Pydantic 具有自动转换数据的功能 接收一些尚且未知的键
# {
# "7":"4.56",
# "8": "890."
# }
@app.post("/index-weights/")
async def create_index_weights(weights: Dict[int, float]):
return weights
#可以使用 Config 和 schema_extra 为Pydantic模型声明一个示例
class Item2(BaseModel):
name: str
description: Union[str, None] = None
price: float
tax: Union[float, None] = None
model_config = {
"json_schema_extra": {
"examples": [
{
"name": "Foo",
"description": "A very nice Item",
"price": 35.4,
"tax": 3.2,
}
]
}
}
@app.put("/items01/{item_id}")
async def update_item(item_id: int, item: Item2):
results = {"item_id": item_id, "item": item}
return results
#Field example 传递的那些额外参数不会添加任何验证,只会添加注释,用于文档的目的。
class Item3(BaseModel):
name: str = Field(examples=["Foo"])
description: Union[str, None] = Field( examples=["A very nice Item"])
price: float = Field(examples=[35.4])
tax: Union[float, None] = Field( examples=[3.2])
#在 Field, Path, Query, Body 和其他你之后将会看到的工厂函数 field额外参数
@app.put("/items02/{item_id}")
async def update_item(item_id: int, item: Item3):
results = {"item_id": item_id, "item": item}
return results
# Body 额外参数 请求体举例子examples
@app.put("/items03/{item_id}")
async def update_item(
item_id: int,
item: Annotated[
Item3,
Body(
examples=[
{
"name": "Foo",
"description": "A very nice Item",
"price": 35.4,
"tax": 3.2
}
]
),
],
):
results = {"item_id": item_id, "item": item}
return results
#额外数据类型 74CBEAE1-859B-54BF-EEB8-EC3E60E65426
from datetime import datetime, date, time, timedelta
from uuid import UUID
@app.put("/items001/{item_id}")
async def read_items(
item_id: UUID,
start_datetime: Annotated[datetime, Body()],
end_datetime: Annotated[datetime, Body()],
process_after: Annotated[timedelta, Body()],
repeat_at: Annotated[Union[time, None], Body()] = None,
):
start_process = start_datetime + process_after
duration = end_datetime - start_process
return {
"item_id": item_id,
"start_datetime": start_datetime,
"end_datetime": end_datetime,
"process_after": process_after,
"repeat_at": repeat_at,
"start_process": start_process,
"duration": duration,
}
#导入cookie ,header Cookie 、Path 、Query 是兄弟类,都继承自共用的 Param 类。
@app.get("/items002/")
async def read_items(ads_id: Annotated[Union[str, None], Cookie()] = None):
return {"ads_id": ads_id}
# 重复请求头,不用担心变量中的下划线 无需把首字母大写
# 类型声明中可以使用 list 定义多个请求头。
# 使用 Python list 可以接收重复请求头所有的值。
# ,FastAPI 可以自动转换。如需禁用下划线自动转换为连字符,可以把 Header 的 convert_underscores 参数设置为 False
@app.get("/items003/")
async def read_items(user_agent: Annotated[Union[str, None], Header(convert_underscores=False)] = None):
return {"User-Agent": user_agent}
@app.get("/items004/")
async def read_items(x_token: Annotated[Union[List[str], None], Header()] = None):
return {"X-Token values": x_token}
# response_model是「装饰器」方法(get,post 等)的一个参数。不像之前的所有参数和请求体,它不属于路径操作函数。
#响应模型
class Item01(BaseModel):
name: str
description: Union[str, None] = None
price: float
tax: Union[float, None] = None
tags: List[str] = []
@app.post("/items005/", response_model=Item01)
async def create_item(item: Item) -> Any:
return item
@app.get("/items006/", response_model=List[Item01])
async def read_items() -> Any:
return [
{"name": "Portal Gun", "price": 42.0},
{"name": "Plumbus", "price": 32.0},
]
#返回与输入相同的数据
# { "username": "st111r",
# "password": "str",
# "email": "ee@2.com",
# "full_name":"wrw" }
class UserIn(BaseModel):
username: str
password: str
email: EmailStr
full_name: Union[str, None] = None
class UserOut(BaseModel):
username: str
email: EmailStr
full_name: Union[str, None] = None
# Don't do this in production!
# 因此,FastAPI 将会负责过滤掉未在输出模型中声明的所有数据(使用 Pydantic)。
@app.post("/user1/", response_model=UserOut)
async def create_user(user: UserIn) -> UserIn:
return user
#来省略某些属性
#返回不包含
# response_model_exclude_defaults=True
# response_model_exclude_none=True
# response_model_by_alias
#仅包含
# response_model_include=["name", "description"] 使用[]
# response_model_include = {"name", "description"}
class Item6(BaseModel):
name: str
description: Union[str, None] = None
price: float
tax: float = 10.5
tags: List[str] = []
items = {
"foo": {"name": "Foo", "price": 50.2},
"bar": {"name": "Bar", "description": "The bartenders", "price": 62, "tax": 20.2},
"baz": {"name": "Baz", "description": None, "price": 50.2, "tax": 10.5, "tags": []},
}
# response_model_exclude_unset参数True,输出忽略未明确设置的字段
# response_model_exclude_defaults = True,忽略跟默认值一样的字段
# response_model_exclude_none = True,忽略None的字段
@app.get("/items61/{item_id}", response_model=Item6, response_model_exclude_unset=True, response_model_include={"name", "description"},response_model_exclude={"tax"})
async def read_item(item_id: str):
return items[item_id]
class UserInDB(BaseModel):
username: str
hashed_password: str
email: EmailStr
full_name: Union[str, None] = None
def fake_password_hasher(raw_password: str):
return "supersecret" + raw_password
def fake_save_user(user_in: UserIn):
hashed_password = fake_password_hasher(user_in.password)
print(user_in.password)
user_in_db = UserInDB(**user_in.dict(), hashed_password=hashed_password)
print("User saved! ..not really")
print(hashed_password)
print(user_in_db)
return user_in_db
# user_in = UserIn(username="john", password="secret", email="john.doe@example.com")
# user_dict = user_in.dict() #Pydantic 模型支持 .dict() 方法,能返回包含模型数据的字典。
# print(user_dict) 输出的就是 Python 字典:
# {
# 'username': 'john',
# 'password': 'secret',
# 'email': 'john.doe@example.com',
# 'full_name': None,
# }
# UserInDB(**user_dict)
@app.post("/user011/", response_model=UserOut)
async def create_user(user_in: UserIn):
user_saved = fake_save_user(user_in)
return user_saved
#union 响应可以声明为两种类型的 Union 类型,即该响应可以是两种类型中的任意类型
class BaseItem(BaseModel):
description: str
type: str
class CarItem(BaseItem):
type: str = "car"
class PlaneItem(BaseItem):
type: str = "plane"
size: int
items = {
"item1": {"description": "All my friends drive a low rider", "type": "car"},
"item2": {
"description": "Music is my aeroplane, it's my aeroplane",
"type": "plane",
"size": 5,
},
}
@app.get("/items62/{item_id}", response_model=Union[PlaneItem, CarItem])
async def read_item(item_id: str):
return items[item_id]
#由对象列表构成的响应
items = [
{"name": "Foo", "description": "There comes my hero"},
{"name": "Red", "description": "It's my aeroplane"},
]
class Item8(BaseModel):
name: str
description: str
@app.get("/items63/", response_model=List[Item8])
async def read_items():
return items
#任意的 dict 都能用于声明响应,只要声明键和值的类型,无需使用 Pydantic 模型
@app.get("/keyword-weights/", response_model=Dict[str, float], status_code=201)
async def read_keyword_weights():
return {"foo": 2.3, "bar": 3.4}
# 响应状态码 status_code=status.HTTP_201_CREATED 也可以使用 from starlette import status。
from fastapi import FastAPI, status
@app.post("/items/", status_code=status.HTTP_201_CREATED)
async def create_item(name: str):
return {"name": name}
# pip install python-multipart 表单数据 Form 放在请求body里的,不是查询参数里的
# 表单数据的「媒体类型」编码一般为 application/x-www-form-urlencoded。
# 但包含文件的表单编码为 multipart/form-data。文件处理详见下节。
from fastapi import FastAPI, Form
@app.post("/login21/")
async def login(username: str = Form(), password: str = Form()):
return {"username": username}
#请求文件 File UploadFile 与 bytes 相比有更多优势:
# filename:上传文件名字符串(str),例如, myimage.jpg;
# content_type:内容类型(MIME 类型 / 媒体类型)字符串(str),例如,image/jpeg;
# file: SpooledTemporaryFile( file-like 对象)。其实就是 Python文件,可直接传递给其他预期 file-like 对象的函数或支持库。
# 不包含文件时,表单数据一般用 application/x-www-form-urlencoded「媒体类型」编码。
# 但表单包含文件时,编码为 multipart/form-data。使用了 File
from fastapi import FastAPI, File, UploadFile
@app.post("/files/")
async def create_file(file: bytes = File()):
return {"file_size": len(file)}
@app.post("/uploadfile/")
async def create_upload_file(file: UploadFile= File()):
return {"filename": file.filename}
@app.post("/files1/")
async def create_file(file: Union[bytes, None] = File(description="A file read as UploadFile")):
if not file:
return {"message": "No file sent"}
else:
return {"file_size": len(file)}
@app.post("/uploadfile1/")
async def create_upload_file(file: Union[UploadFile, None] = None,):
if not file:
return {"message": "No upload file sent"}
else:
return {"filename": file.filename}
#多文件上传
from fastapi.responses import HTMLResponse
@app.post("/files3/")
async def create_files(files: List[bytes] = File()):
return {"file_sizes": [len(file) for file in files]}
@app.post("/uploadfiles3/")
async def create_upload_files(files: List[UploadFile]):
return {"filenames": [file.filename for file in files]}
@app.get("/")
async def main():
content = """
<body>
<form action="/files3/" enctype="multipart/form-data" method="post">
<input name="files" type="file" multiple>
<input type="submit">
</form>
<form action="/uploadfiles3/" enctype="multipart/form-data" method="post">
<input name="files" type="file" multiple>
<input type="submit">
</form>
</body>
"""
return HTMLResponse(content=content)
#请求表单与文件
@app.post("/files00/")
async def create_file(
file: bytes = File(), fileb: UploadFile = File(), token: str = Form()
):
return {
"file_size": len(file),
"token": token,
"fileb_content_type": fileb.content_type,
}
#处理错误4xx HTTPException HTTPException 是额外包含了和 API 有关数据的常规 Python 异常。因为是 Python 异常,所以不能 return,只能 raise
from fastapi import FastAPI, HTTPException
items = {"foo": "The Foo Wrestlers"}
@app.get("/items30/{item_id}")
async def read_item(item_id: str):
if item_id not in items:
raise HTTPException(status_code=404, detail="Item not found")
return {"item": items[item_id]}
#有些场景下要为 HTTP 错误添加自定义响应头
@app.get("/items-header/{item_id}")
async def read_item_header(item_id: str):
if item_id not in items:
raise HTTPException(
status_code=404,
detail="Item not found",
headers={"X-Error": "There goes my error"},
)
return {"item": items[item_id]}
#全局的 自定义异常 UnicornException 可以用 @app.exception_handler() 添加自定义异常控制器:
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
class UnicornException(Exception):
def __init__(self, name: str):
self.name = name
@app.exception_handler(UnicornException)
async def unicorn_exception_handler(request: Request, exc: UnicornException):
return JSONResponse(
status_code=418,
content={"message": f"Oops! {exc.name} did something. There goes a rainbow..."},
)
@app.get("/unicorns/{name}")
async def read_unicorn(name: str):
if name == "yolo":
raise UnicornException(name=name)
return {"unicorn_name": name}
#覆盖异常 请求中包含无效数据时,FastAPI 内部会触发 RequestValidationError。该异常也内置了默认异常处理器。
# 覆盖默认异常处理器时需要导入 RequestValidationError 。,并用 @app.excption_handler(RequestValidationError) 装饰异常处理器
from fastapi.exceptions import RequestValidationError
from fastapi.responses import PlainTextResponse
from starlette.exceptions import HTTPException as StarletteHTTPException
from fastapi.exception_handlers import (
http_exception_handler,
request_validation_exception_handler,
)
@app.exception_handler(StarletteHTTPException)
async def http_exception_handler(request, exc):
return PlainTextResponse(str(exc.detail), status_code=exc.status_code)
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request, exc):
return PlainTextResponse(str(exc), status_code=400)
@app.get("/items34/{item_id}")
async def read_item(item_id: int):
if item_id == 3:
raise HTTPException(status_code=418, detail="Nope! I don't like 3.")
return {"item_id": item_id}
# RequestValidationError 包含其接收到的无效数据请求的 body 收到的响应包含 body 信息,并说明数据是无效的:
from fastapi.encoders import jsonable_encoder
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
print(exc.body)
return JSONResponse(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
content=jsonable_encoder({"detail": exc.errors(), "body": exc.body}),
)
class Item11(BaseModel):
title: str
size: int
@app.post("/items36/")
async def create_item(item: Item11):
return item
@app.exception_handler(StarletteHTTPException)
async def custom_http_exception_handler(request, exc):
print(f"OMG! An HTTP error!: {repr(exc)}")
return await http_exception_handler(request, exc)
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request, exc):
print(f"OMG! The client sent invalid data!: {exc}")
return await request_validation_exception_handler(request, exc)
@app.get("/items35/{item_id}")
async def read_item(item_id: int):
if item_id == 3:
raise HTTPException(status_code=418, detail="Nope! I don't like 3.")
return {"item_id": item_id}
#路径操作配置
# tags 参数的值是由 str 组成的 list (一般只有一个 str ),tags 用于为路径操作添加标签:
# 路径装饰器还支持 summary 和 description 这两个参数:
# 描述内容比较长且占用多行时,可以在函数的 docstring 中声明路径操作的描述,FastAPI 支持从文档字符串中读取描述内容。
# response_description 只用于描述响应,description 一般则用于描述路径操作。 响应描述 response_description="The created item",
# deprecated 参数可以把路径操作标记为弃用,无需直接删除:
class Item17(BaseModel):
name: str
description: Union[str, None] = None
price: float
tax: Union[float, None] = None
tags: Set[str] = set()
@app.post("/items70/", response_model=Item17,tags=["items"], status_code=status.HTTP_201_CREATED,summary="Create an item",
description="Create an item with all the information, name, description, price, tax and a set of unique tags",response_description="The created item",)
async def create_item(item: Item17):
"""
Create an item with all the information:
- **name**: each item must have a name
- **description**: a long description
- **price**: required
- **tax**: if the item doesn't have tax, you can omit this
- **tags**: a set of unique tag strings for this item
"""
return item
@app.get("/items71/", tags=["items"])
async def read_items():
return [{"name": "Foo", "price": 42}]
@app.get("/users72/", tags=["users"], deprecated=True)
async def read_users():
return [{"username": "johndoe"}]
#将数据类型(如Pydantic模型)转换为与JSON兼容的数据类型(如dict、list等)。
# FastAPI提供了jsonable_encoder()函数
fake_db = {}
class Item18(BaseModel):
title: str
timestamp: datetime
description: Union[str, None] = None
@app.put("/items118/{id}")
def update_item(id: str, item: Item18 ,response_model=Item18):
print(item)
json_compatible_item_data = jsonable_encoder(item)
fake_db[id] = json_compatible_item_data
print(fake_db)
return fake_db
#
class Item20(BaseModel):
name: Union[str, None] = None
description: Union[str, None] = None
price: Union[float, None] = None
tax: float = 10.5
tags: List[str] = []
items = {
"foo": {"name": "Foo", "price": 50.2},
"bar": {"name": "Bar", "description": "The bartenders", "price": 62, "tax": 20.2},
"baz": {"name": "Baz", "description": None, "price": 50.2, "tax": 10.5, "tags": []},
}
@app.get("/items201/{item_id}", response_model=Item20)
async def read_item(item_id: str):
return items[item_id]
#PUT 用于接收替换现有数据的数据。
@app.put("/items202/{item_id}", response_model=Item20)
async def update_item(item_id: str, item: Item20):
print(item)
update_item_encoded = jsonable_encoder(item)
print(update_item_encoded)
print(item)
items[item_id] = update_item_encoded
print(update_item_encoded)
return update_item_encoded
#只更新用户设置过的值,不用模型中的默认值覆盖已存储过的值。
#patch 更新部分数据时,可以在 Pydantic 模型的 .dict() 中使用 exclude_unset 参数。
@app.patch("/items203/{item_id}", response_model=Item20)
async def update_item(item_id: str, item: Item20):
stored_item_data = items[item_id]
stored_item_model = Item(**stored_item_data)
update_data = item.dict(exclude_unset=True)
updated_item = stored_item_model.copy(update=update_data)
items[item_id] = jsonable_encoder(updated_item)
return updated_item
# patch 接下来,用 .copy() 为已有模型创建调用 update 参数的副本,该参数为包含更新数据的 dict。
@app.patch("/items204/{item_id}", response_model=Item20)
async def update_item(item_id: str, item: Item20):
stored_item_data = items[item_id]
stored_item_model = Item(**stored_item_data)
update_data = item.dict(exclude_unset=True)
updated_item = stored_item_model.copy(update=update_data)
items[item_id] = jsonable_encoder(updated_item)
return updated_item
#Deppends创建 依赖项就是一个函数,且可以使用与路径操作函数相同的参数:
# 这里只能传给 Depends 一个参数。
# 且该参数必须是可调用对象,比如函数
from fastapi import Depends, FastAPI
async def common_parameters(
q: Union[str, None] = None, skip: int = 0, limit: int = 100
):
return {"q": q, "skip": skip, "limit": limit}
@app.get("/items300/")
async def read_items(commons: dict = Depends(common_parameters)):
return commons
@app.get("/users300/")
async def read_users(commons: dict = Depends(common_parameters)):
return commons
#中间件 要创建中间件你可以在函数的顶部使用装饰器 @app.middleware("http").
import time
@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
process_time = time.time() - start_time
response.headers["X-Process-Time"] = str(process_time)
return response
#oauth2
from fastapi.security import OAuth2PasswordBearer
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
#并不实用
@app.get("/items400/")
async def read_items(token: str = Depends(oauth2_scheme)):
return {"token": token}
class User(BaseModel):
username: str
email: Union[str, None] = None
full_name: Union[str, None] = None
disabled: Union[bool, None] = None
def fake_decode_token(token):
return User(
username=token + "fakedecoded", email="john@example.com", full_name="John Doe"
)
async def get_current_user(token: str = Depends(oauth2_scheme)):
user = fake_decode_token(token)
return user
@app.get("/users1/me")
async def read_users_me(current_user: User = Depends(get_current_user)):
return current_user
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "fakehashedsecret",
"disabled": False,
},
"alice": {
"username": "alice",
"full_name": "Alice Wonderson",
"email": "alice@example.com",
"hashed_password": "fakehashedsecret2",
"disabled": True,
},
}
app = FastAPI()
def fake_hash_password(password: str):
return "fakehashed" + password
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
class User(BaseModel):
username: str
email: Union[str, None] = None
full_name: Union[str, None] = None
disabled: Union[bool, None] = None
class UserInDB(User):
hashed_password: str
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def fake_decode_token(token):
# This doesn't provide any security at all
# Check the next version
user = get_user(fake_users_db, token)
return user
async def get_current_user(token: str = Depends(oauth2_scheme)):
user = fake_decode_token(token)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
return user
async def get_current_active_user(current_user: User = Depends(get_current_user)):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
@app.post("/token1")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
user_dict = fake_users_db.get(form_data.username)
if not user_dict:
raise HTTPException(status_code=400, detail="Incorrect username or password")
user = UserInDB(**user_dict)
hashed_password = fake_hash_password(form_data.password)
if not hashed_password == user.hashed_password:
raise HTTPException(status_code=400, detail="Incorrect username or password")
return {"access_token": user.username, "token_type": "bearer"}
@app.get("/users20/me")
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
# 安装 passlib¶
# Passlib 是处理密码哈希的 Python 包。
#
# 它支持很多安全哈希算法及配套工具。
from datetime import datetime, timedelta, timezone
import jwt
from jwt.exceptions import InvalidTokenError
from passlib.context import CryptContext
# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
"disabled": False,
}
}
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Union[str, None] = None
class User(BaseModel):
username: str
email: Union[str, None] = None
full_name: Union[str, None] = None
disabled: Union[bool, None] = None
class UserInDB(User):
hashed_password: str
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
app = FastAPI()
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except InvalidTokenError:
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
return user
async def get_current_active_user(
current_user: Annotated[User, Depends(get_current_user)],
):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
# @app.post("/token4")
# async def login_for_access_token(
# form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
# ) -> Token:
# user = authenticate_user(fake_users_db, form_data.username, form_data.password)
# if not user:
# raise HTTPException(
# status_code=status.HTTP_401_UNAUTHORIZED,
# detail="Incorrect username or password",
# headers={"WWW-Authenticate": "Bearer"},
# )
# access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
# access_token = create_access_token(
# data={"sub": user.username}, expires_delta=access_token_expires
# )
# return Token(access_token=access_token, token_type="bearer")
@app.get("/users42/me/", response_model=User)
async def read_users_me(
current_user: Annotated[User, Depends(get_current_active_user)],
):
return current_user
@app.get("/users41/me/items/")
async def read_own_items(
current_user: Annotated[User, Depends(get_current_active_user)],
):
return [{"item_id": "Foo", "owner": current_user.username}]
#cors allow_origins - 一个允许跨域请求的源列表
# allow_origin_regex - 一个正则表达式字符串,匹配的源允许跨域请求
# allow_methods - 一个允许跨域请求的 HTTP 方法列表
# allow_headers - 一个允许跨域请求的 HTTP 请求头列表
# allow_credentials - 指示跨域请求支持 cookies
# expose_headers - 指示可以被浏览器访问的响应头
# max_age - 设定浏览器缓存 CORS 响应的最长时间,单位是秒。默认为 600
from fastapi.middleware.cors import CORSMiddleware
origins = [
"http://localhost.tiangolo.com",
"https://localhost.tiangolo.com",
"http://localhost",
"http://localhost:8080",
]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/cors1/")
async def main():
return {"message": "Hello World"}
# Jinja2Templates
# pip install Jinjia2
# pip install aiofiles # 适用于FastAPI
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
app.mount("/static", StaticFiles(directory="static"), name="static")
templates = Jinja2Templates(directory="templates")
@app.get("/items0001/{id}", response_class=HTMLResponse)
async def read_item(request: Request, id: str):
return templates.TemplateResponse(
request=request, name="item.html", context={"id": id}
)
from fastapi import FastAPI, Response
@app.get("/headers-and-object/")
def get_headers(response: Response):
response.headers["X-Cat-Dog"] = "alone in the world"
return {"message": "Hello World"}
@app.get("/headers/")
def get_headers():
content = {"message": "Hello World"}
headers = {"X-Cat-Dog": "alone in the world", "Content-Language": "en-US"}
return JSONResponse(content=content, headers=headers)
#可选参数
from typing import Optional
# 路径参数+可选参数
@app.get("/items7101/{item_id}")
async def read_item(item_id: str, name: Optional[str] = None):
return {"item_id": item_id, "name": name}
# 必传参数+可选参数
@app.get("/items7101")
async def read_item(item_id: str, name: Optional[str] = None):
return {"item_id": item_id, "name": name}
#路由 类似于蓝图
from fastapi import APIRouter
router = APIRouter()
@router.get("/users071/", tags=["users"])
async def read_users():
return [{"username": "Rick"}, {"username": "Morty"}]
@router.get("/users071/me", tags=["users"])
async def read_user_me():
return {"username": "fakecurrentuser"}
@router.get("/users071/{username}", tags=["users"])
async def read_user(username: str):
return {"username": username}
app.include_router(router)
if __name__ == '__main__':
import uvicorn
uvicorn.run(app=app, host="127.0.0.1", port=8000, debug=True)
网友评论