FastAPI操作database

作者: 北邮郭大宝 | 来源:发表于2020-02-17 21:06 被阅读0次

    简单记录一下使用FastAPI完成对数据库的CRUD操作。在参考文档的基础上,增加了U、D部分,全部代码可以参考Github

    1. Installation

    pip install fastapi
    pip install uvicorn
    pip install sqlalchemy
    

    2. File Structure

    1581931091044.jpg

    其中:

    • crud.py 完成对数据库的CRUD操作
    • database.py 关于数据库相关配置
    • main.py fastapi路由部分
    • models.py 定义表结构
    • schemas.py 定义pydantic models信息,也就是schemas
    • init.py

    3. Details

    3.1 database.py

    首先需要定义数据库部分,作为demo,使用sqlite作为我们的数据库。这部分都是常规配置操作。

    from sqlalchemy import create_engine
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy.orm import sessionmaker
    
    SQLALCHEMY_DATABASE_URL = "sqlite:///./test.db"
    
    engine = create_engine(
        SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
    )
    SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
    
    Base = declarative_base()
    

    3.2 models.py

    第二步需要定义我们的表结构,这里定义了User表存储用户数据,item表用于存储物品数据,两者关系是一对多。

    from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
    from sqlalchemy.orm import relationship
    
    from .database import Base
    
    
    class User(Base):
        __tablename__ = "users"
    
        id = Column(Integer, primary_key=True, index=True)
        email = Column(String, unique=True, index=True)
        hashed_password = Column(String)
        is_active = Column(Boolean, default=True)
    
        items = relationship("Item", back_populates="owner")
    
        def to_dict(self):
            return {c.name: getattr(self, c.name) for c in self.__table__.columns}
    
    
    class Item(Base):
        __tablename__ = "items"
    
        id = Column(Integer, primary_key=True, index=True)
        title = Column(String, index=True)
        description = Column(String, index=True)
        owner_id = Column(Integer, ForeignKey("users.id"))
    
        owner = relationship("User", back_populates="items")
    
        def to_dict(self):
            return {c.name: getattr(self, c.name) for c in self.__table__.columns}
    

    3.3 schemas.py

    第三步需要定义fastapi中schemas信息,后续路由、CRUD时都需要使用。由于增删改查需要不同的schema,所以官网的最佳实践一般都是通过继承解决。

    from typing import List
    
    from pydantic import BaseModel
    
    
    class ItemBase(BaseModel):
        title: str
        description: str = None
    
    
    class ItemCreate(ItemBase):
        pass
    
    
    class ItemUpdate(ItemBase):
        pass
    
    
    class Item(ItemBase):
        id: int
        owner_id: int
    
        class Config:
            orm_mode = True
    
    
    class UserBase(BaseModel):
        email: str
    
    
    class UserCreate(UserBase):
        password: str
    
    
    class UserUpdate(UserBase):
        is_active: bool
    
    
    class User(UserBase):
        id: int
        is_active: bool
        items: List[Item] = []
    
        class Config:
            orm_mode = True
    

    3.4 crud.py

    接下来就是实际完成CRUD,官网上没有对update、delete做实例展示,这里补充一下。

    from sqlalchemy.orm import Session
    
    from . import models, schemas
    
    
    def get_user(db: Session, user_id: int):
        return db.query(models.User).filter(models.User.id == user_id).first()
    
    
    def get_user_by_email(db: Session, email: str):
        return db.query(models.User).filter(models.User.email == email).first()
    
    
    def get_users(db: Session, skip: int = 0, limit: int = 100):
        return db.query(models.User).offset(skip).limit(limit).all()
    
    
    def create_user(db: Session, user: schemas.UserCreate):
        fake_hashed_password = user.password + "notreallyhashed"
        db_user = models.User(email=user.email, hashed_password=fake_hashed_password)
        db.add(db_user)
        db.commit()
        db.refresh(db_user)
        return db_user
    
    
    def update_user(db: Session, user_id: int, update_user: schemas.UserUpdate):
        db_user = db.query(models.User).filter(models.User.id == user_id).first()
        if db_user:
            update_dict = update_user.dict(exclude_unset=True)
            for k, v in update_dict.items():
                setattr(db_user, k, v)
            db.commit()
            db.flush()
            db.refresh(db_user)
            return db_user
    
    
    def delete_user(db: Session, user_id: int):
        db_user = db.query(models.User).filter(models.User.id == user_id).first()
        if db_user:
            db.delete(db_user)
            db.commit()
            db.flush()
            return db_user
    
    
    def get_items(db: Session, skip: int = 0, limit: int = 100):
        return db.query(models.Item).offset(skip).limit(limit).all()
    
    
    def create_user_item(db: Session, item: schemas.ItemCreate, user_id: int):
        db_item = models.Item(**item.dict(), owner_id=user_id)
        db.add(db_item)
        db.commit()
        db.refresh(db_item)
        return db_item
    
    
    def relate_user_item(db: Session, user_id: int, item_id: int):
        db_item = db.query(models.Item).filter(models.Item.id == item_id).first()
        if db_item:
            db_item.owner_id = user_id
            db.commit()
            db.flush()
            return db.query(models.User).filter(models.User.id == user_id).first()
    
    
    def update_item(db: Session, item_id: int, update_item: schemas.ItemUpdate):
        db_item = db.query(models.Item).filter(models.Item.id == item_id).first()
        if db_item:
            update_dict = update_item.dict(exclude_unset=True)
            for k, v in update_dict.items():
                setattr(db_item, k, v)
            db.commit()
            db.flush()
            db.refresh(db_item)
            return db_item
    
    
    def delete_item(db: Session, item_id: int):
        db_item = db.query(models.Item).filter(models.Item.id == item_id).first()
        if db_item:
            db.delete(db_item)
            db.commit()
            db.flush()
            return db_item
    

    3.5 main.py

    最后完成路由的配置,基本都是按照官网上来的,只是稍微补充了一些删和改的内容,为了让这个demo更加完善。

    from typing import List
    
    from fastapi import Depends, FastAPI, HTTPException
    from sqlalchemy.orm import Session
    
    from . import crud, models, schemas
    from .database import SessionLocal, engine
    
    models.Base.metadata.create_all(bind=engine)
    
    app = FastAPI()
    
    
    # Dependency
    def get_db():
        try:
            db = SessionLocal()
            yield db
        finally:
            db.close()
    
    
    @app.post("/users/", response_model=schemas.User)
    def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
        db_user = crud.get_user_by_email(db, email=user.email)
        if db_user:
            raise HTTPException(status_code=400, detail="Email already registered")
        return crud.create_user(db=db, user=user)
    
    
    @app.get("/users/", response_model=List[schemas.User])
    def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
        users = crud.get_users(db, skip=skip, limit=limit)
        return users
    
    
    @app.get("/users/{user_id}", response_model=schemas.User)
    def read_user(user_id: int, db: Session = Depends(get_db)):
        db_user = crud.get_user(db, user_id=user_id)
        if db_user is None:
            raise HTTPException(status_code=404, detail="User not found")
        return db_user
    
    
    @app.delete('/users/{user_id}', response_model=schemas.User)
    def delete_user(user_id: int, db: Session = Depends(get_db)):
        db_user = crud.delete_user(db, user_id=user_id)
        if db_user is None:
            raise HTTPException(status_code=404, detail="User not found")
        return db_user
    
    
    @app.put("/users/{user_id}", response_model=schemas.User)
    def update_user(user_id: int, update_user: schemas.UserUpdate, db: Session = Depends(get_db)):
        updated_user = crud.update_user(db, user_id, update_user)
        if updated_user is None:
            raise HTTPException(status_code=404, detail="User not found")
        return updated_user
    
    
    @app.post("/users/{user_id}/items/", response_model=schemas.Item)
    def create_item_for_user(
        user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
    ):
        return crud.create_user_item(db=db, item=item, user_id=user_id)
    
    
    @app.get("/items/", response_model=List[schemas.Item])
    def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
        items = crud.get_items(db, skip=skip, limit=limit)
        return items
    
    
    @app.put("/items/{user_id}/{item_id}/", response_model=schemas.User)
    def relate_user_item(user_id: int, item_id: int, db: Session = Depends(get_db)):
        user = crud.relate_user_item(db=db, item_id=item_id, user_id=user_id)
        return user
    
    
    @app.put("/items/{item_id}", response_model=schemas.Item)
    def update_item(item_id: int, update_item: schemas.ItemUpdate, db: Session = Depends(get_db)):
        updated_item = crud.update_item(db, item_id, update_item)
        if updated_item is None:
            raise HTTPException(status_code=404, detail="Item not found")
        return updated_item
    
    
    @app.delete('/items/{item_id}', response_model=schemas.Item)
    def delete_item(item_id: int, db: Session = Depends(get_db)):
        db_item = crud.delete_item(db, item_id=item_id)
        if db_item is None:
            raise HTTPException(status_code=404, detail="User not found")
        return db_item
    

    4. Tests

    在文件路径下执行,--reload当代码有修改时,可以自动加载。

    uvicorn sql_app.main:app --reload
    

    可以通过postman发送请求做测试,这里贴图一张。


    1581943691192.jpg

    最后fastapi还可以自动生成文档,可以访问http://127.0.0.1:8000/docs查阅。

    1581943651716.jpg

    相关文章

      网友评论

        本文标题:FastAPI操作database

        本文链接:https://www.haomeiwen.com/subject/hwvifhtx.html