博客 / 詳情

返回

在Python+FastAPI項目中使用SqlAlchemy操作數據的幾種常見方式

在Python+FastAPI的後端項目中,我們往往很多時候需要對數據進行相關的處理,本篇隨筆介紹在Python+FastAPI項目中使用SqlAlchemy操作數據的幾種常見方式。

使用 FastAPI, SQLAlchemy, Pydantic構建後端項目的時候,其中數據庫訪問採用SQLAlchemy 的異步方式處理。一般我們在操作數據庫操作的時候,採用基類繼承的方式減少重複代碼,提高代碼複用性。不過我們在分析SQLAlchemy的時候,我們可以簡單的方式來剖析幾種常見的數據庫操作方式,來介紹SQLAlchemy的具體使用。

1、SQLAlchemy介紹

SQLAlchemy 是一個功能強大且靈活的 Python SQL 工具包和對象關係映射(ORM)庫。它被廣泛用於在 Python 項目中處理關係型數據庫的場景,既提供了高級的 ORM 功能,又保留了對底層 SQL 語句的強大控制力。SQLAlchemy 允許開發者通過 Python 代碼與數據庫進行交互,而無需直接編寫 SQL 語句,同時也支持直接使用原生 SQL 進行復雜查詢。下面是SQLAlchemy和我們常規數據庫對象的對應關係説明。
Engine    連接對象         驅動引擎
Session   連接池           事務  由此開始查詢
Model     表                   類定義
Column     列  
Query     若干行         可以鏈式添加多個條件
 
在使用SQLAlchemy時,通常會將其與數據庫對象對應起來。
SQLAlchemy: 使用 Table 對象或 Declarative Base 中的類來表示。
對應關係: 數據庫中的每一個表對應於SQLAlchemy中的一個類,該類繼承自 declarative_base()
from sqlalchemy import Column, Integer, String, create_engine
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

class User(Base):
    __tablename__ = 'users'  # 數據庫表名
    id = Column(Integer, primary_key=True)
    name = Column(String)
    email = Column(String)

數據庫列 (Database Column):使用 Column 對象來表示。每個數據庫表中的列在SQLAlchemy中表示為 Column 對象,並作為類的屬性定義。

id = Column(Integer, primary_key=True)
name = Column(String(50))

數據庫行 (Database Row):每個數據庫表的一個實例(對象)代表數據庫表中的一行。在SQLAlchemy中,通過實例化模型類來表示數據庫表中的一行。

new_user = User(id=1, name='John Doe', email='john@example.com')

主鍵 (Primary Key):使用 primary_key=True 參數定義主鍵。

id = Column(Integer, primary_key=True)

外鍵 (Foreign Key): 使用 ForeignKey 對象來表示。

from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship

class Address(Base):
    __tablename__ = 'addresses'
    id = Column(Integer, primary_key=True)
    user_id = Column(Integer, ForeignKey('users.id'))
    user = relationship('User')

關係 (Relationships): 使用 relationship 對象來表示。數據庫中表與表之間的關係在SQLAlchemy中通過 relationship 來定義。

addresses = relationship("Address", back_populates="user")

 

2、常規的單表處理

下面我們通過異步處理的方式,介紹如何在單表中操作相關的數據庫數據。

async def get(self, db: AsyncSession, id: Any) -> Any:
    """根據主鍵獲取一個對象"""

    if isinstance(id, str):
        query = select(self.model).filter(func.lower(self.model.id) == id.lower())
    else:
        query = select(self.model).filter(self.model.id == id)

    result = await db.execute(query)
    item = result.scalars().first()

    return item

如果我們需要強制對外鍵的類型進行匹配(如對於Postgresql的嚴格要求,數據比較的類型必須一致),那麼我們需要在基類或者CRUD類初始化的時候,獲得對應的主鍵類型。

class BaseCrud(Generic[ModelType, PrimaryKeyType, PageDtoType, DtoType]):
    """
    基礎CRUD操作類,傳入參數説明:
    * `ModelType`: SQLAlchemy 模型類
    * `PrimaryKeyType`: 限定主鍵的類型
    * `PageDtoType`: 分頁查詢輸入類
    * `DtoType`: 數據傳輸對象類,如新增、更新的單個對象DTO
    """

    def __init__(self, model: Type[ModelType]):
        """
        數據庫訪問操作的基類對象(CRUD).
        * `model`: A SQLAlchemy model class
        """
        
        self.model = model  # 模型類型

        # 運行期獲取主鍵字段類型
        pk_column = inspect(model).primary_key[0]
        self._pk_type = pk_column.type.python_type  # int / str

因此對於單表的Get方法,我們修改下,讓他匹配主鍵的類型進行比較,這樣過對於嚴格類型判斷的Postgresql也正常匹配了。

    async def get(self, db: AsyncSession, id: PrimaryKeyType) -> Optional[ModelType]:
        """根據主鍵獲取一個對象"""
        
        #對id的主鍵進行類型轉換,self._pk_type在構造函數的初始化中獲取
        try:
            id = self._pk_type(id)
        except Exception:
            raise ValueError(f"Invalid primary key type: {id}")
        
        if isinstance(id, str):
            query = select(self.model).filter(func.lower(self.model.id) == id.lower())
        else:
            query = select(self.model).filter(self.model.id == id)

        result = await db.execute(query)
        item = result.scalars().first()

        return item

對於刪除的數據,我們也可以類似的處理對比進行了。

from sqlalchemy.orm import Session, Query
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import delete as sa_delete, update as sa_update

async def delete_byid(self, db: AsyncSession, id: PrimaryKeyType) -> bool:
    """根據主鍵刪除一個對象
    
    :param id: 主鍵值
    """
    
    #對id的主鍵進行類型轉換,self._pk_type在構造函數的初始化中獲取
    try:
        id = self._pk_type(id)
    except Exception:
        raise ValueError(f"Invalid primary key type: {id}")
    
    del_query: sa_delete
    if isinstance(id, str):
        del_query = sa_delete(self.model).where(
            func.lower(self.model.id) == id.lower()
        )
    else:
        del_query = sa_delete(self.model).where(self.model.id == id)

    result = await db.execute(del_query)

    await db.commit()
    return result.rowcount > 0

對於提供多條件的查詢或者過濾,我們可以使用where函數或者filter函數,在 SQLAlchemy 中,select(...).where(...) 和 select(...).filter(...) 都用於構造查詢條件,如下所示等效。

query = select(self.model).where(self.model.id == id)

query = select(self.model).filter(self.model.id == id)

我們可以通過sqlAlchemy的and_和or_函數來進行組合多個條件。

from sqlalchemy import ( Table,Column,and_,or_,asc,desc,select,func,distinct,text, Integer)

....
    match expression:
        case "and":
            query = await db.execute(
                select(self.model)
                .filter(and_(*where_list))
                .order_by(*order_by_list)
            )
        case "or":
            query = await db.execute(
                select(self.model).filter(or_(*where_list)).order_by(*order_by_list)
            )

Python的SqlAlchemy提供 InstrumentedAttribute 對象來操作多個條件,如我們對於一些多條件的處理,可以利用它來傳遞多個參數。

    async def get_all_by_attributes(
        self, db: AsyncSession, *attributes: InstrumentedAttribute, sorting: str = ""
    ) -> List[ModelType] | None:
        """根據列名稱和值獲取相關的對象列表

        :param sorting: 格式:name asc 或 name asc,age desc
        :param attributes: SQLAlchemy InstrumentedAttribute objects,可以輸入多個條件
        例子:User.id != 1 或者 User.username == "JohnDoe"
        """

        order_by_list = parse_sort_string(sorting, self.model)
        query = select(self.model).filter(and_(*attributes)).order_by(*order_by_list)
        
        result = await db.execute(query)
        return result.scalars().all()

例如,對於 模型 Material 對象,我們對它進行多個條件的查詢處理,如下所示,紅色部分為 *attributes: InstrumentedAttribute 參數。

items = await super().get_all_by_attributes(
    db,
    Material.id == vercol.id,
    Material.vercol == vercol.vercol,
    Material.ischecked == 0,
    Material.status == 0,
)

同樣我們可以利用它來獲取數量,或者判斷多條件的記錄是否存在。

image

 在數據插入或者更新的操作中,我們可以接受對象類型或者字典類型的參數對象,因此方法如下所示。

   async def update(self, db: AsyncSession, obj_in: DtoType | dict[str, Any]) -> bool:
        """更新對象
        
        :param obj_in: 對象輸入數據,可以是 DTO 對象或字典
        """
        try:
            if isinstance(obj_in, dict):
                obj_id = obj_in.get("id")
                if obj_id is None:
                    raise ValueError("id is required for update")
                update_data = obj_in
            else:
                obj_id = obj_in.id
                # update_data = vars(obj_in)  
                update_data = obj_in.model_dump(exclude_unset=True)

            query = select(self.model).filter(self.model.id == obj_id)
            result = await db.execute(query)
            db_obj = result.scalars().first()

            if db_obj:
                # 更新對象字段
                for field, value in update_data.items():
                    # 跳過以 "_" 開頭的私有屬性
                    if field.startswith("_"):
                        continue
                    setattr(db_obj, field, value)

                # 處理更新前的回調處理
                self.on_before_update(update_data, db_obj)
                # 提交事務
                await db.commit()
                return True
            else:
                return False
        except SQLAlchemyError as e:
            self.logger.error(f"update 操作出現錯誤: {e}")
            await db.rollback()  # 確保在出錯時回滾事務
            return False

我們在插入或者更新數據的時候,一般會默認更新一些字段,如創建人,創建日期、編輯人,編輯日期等信息,我們可以把它單獨作為一個可以給子類重寫的函數,基類做一些默認的處理。

    def on_before_update(self, update_data: dict[str, Any], db_obj: ModelType) -> None:
        """更新對象前的回調函數,子類可以重寫此方法

        可通過 setattr(db_obj, field, value) 設置字段值
        """
        
        setattr(db_obj, "edittime", datetime.now())
        user :CurrentUserIns  = get_current_user()
        if user:
            setattr(db_obj, "editor", user.fullname)
            setattr(db_obj, "editor_id", user.id)

            setattr(db_obj, "company_id", user.company_id)
            setattr(db_obj, "companyname", user.companyname)

有時候,如果我們需要獲取某個字段非重複的列表,用來做為動態下拉列表的數據,那麼我們可以通過下面函數封裝下。

    async def get_field_list(self, db: AsyncSession, field_name: str) -> Iterable[str]:
        """獲取指定字段值的唯一列表

        :param field_name: 字段名稱
        """

        field = getattr(self.model, field_name)
        query = select(distinct(field))
        result = await db.execute(query)
        return result.scalars().all()

 

3、多表聯合的處理操作 

多表操作,也是我們經常碰到的處理方式,如對於字典類型和字典項目,他們是兩個表,需要聯合起來獲取數據,那麼就需要多表的聯合操作。

image

 如下是字典CRUD類中,聯合字典類型獲取數據的記錄處理。

    async def get_dict_by_typename(self, db: AsyncSession, dicttype_name: str) -> dict:
        """根據字典類型名稱獲取所有該類型的字典列表集合"""
        result = await db.execute(
            select(self.model)
            .join(DictType, DictType.id == self.model.dicttype_id)  # 關聯字典類型表
            .filter(DictType.name == dicttype_name)  # 過濾字典類型名稱
            .order_by(DictData.seq)  # 排序
        )
        items = result.scalars().all()

        dict = {}
        for info in items:
            if info.name not in dict:
                dict[info.name] = info.value

        return dict

如果我們需要對某個表的遞歸獲取樹列表,可以如下處理

    async def get_tree(self, db: AsyncSession, pid: str) -> list[DictType]:
        """獲取字典類型一級列表及其下面的內容"""

        # 使用三元運算符將 pid 設為 "-1"(如果 pid 是 null 或空白)或保持原值
        pid = "-1" if not pid or pid.strip() == "" else pid
        result = await db.execute(
            select(self.model)
            .filter(self.model.pid == pid)
            .options(selectinload(DictType.children))
        )
        nodes = result.scalars().all()
        return nodes

我們來假設用户和文章的示例表結構(ORM 模型,如下所示。

class User(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True, index=True)
    name = Column(String)
    email = Column(String)
    articles = relationship("Article", back_populates="author")


class Article(Base):
    __tablename__ = "articles"
    id = Column(Integer, primary_key=True, index=True)
    title = Column(String)
    content = Column(Text)
    user_id = Column(Integer, ForeignKey("users.id"))
    author = relationship("User", back_populates="articles")

我們可以通過下面函數處理獲得相關的記錄集合。

async def get_user_articles(db: AsyncSession):
    stmt = (
        select(User, Article)
        .join(Article, Article.user_id == User.id)
    )
    result = await db.execute(stmt)
    return result.all()   # [(User(), Article()), ...]

如果需要可以使用outer_join函數處理

async def get_users_with_articles(db: AsyncSession):
    stmt = (
        select(User, Article)
        .outerjoin(Article, Article.user_id == User.id)
    )
    result = await db.execute(stmt)
    return result.all()  # 用户即便沒有文章也會出現

 

如果我們需要獲取有文章的所有用户,如下所示。

async def get_users_with_articles(db: AsyncSession):
    stmt = select(User).options(selectinload(User.articles))  # 自動 load 關聯
    result = await db.execute(stmt)
    return result.scalars().all()

selectinload 會執行兩次 SQL,但效率高,不會產生笛卡爾積,非常適合集合查詢。

多表鏈式 Join的處理,可以獲得兩個表的不同信息進行組合。

async def get_articles_with_author(db: AsyncSession):
    stmt = (
        select(Article.title, User.name.label("author"))
        .join(User, Article.user_id == User.id)
    )
    rows = await db.execute(stmt)
    return rows.mappings().all()  # 以 dict 形式返回 [{'title':..., 'author':...}]

帶篩選條件與分頁的處理實現,如下所示

async def search_articles(db: AsyncSession, keyword: str, page: int = 1, size: int = 10):
    stmt = (
        select(Article, User.name.label("author"))
        .join(User)
        .filter(Article.title.contains(keyword))
        .offset((page - 1) * size)
        .limit(size)
    )
    result = await db.execute(stmt)
    return result.mappings().all()

 

對於權限管理系統來説,一般有用户、角色,以及用户角色的中間表,我們來看看這個在SQLAlchemy最佳實踐是如何的操作。

from sqlalchemy import Table, Column, Integer, ForeignKey
from sqlalchemy.orm import relationship, Mapped, mapped_column
from database import Base

# --- 中間表寫法 ---
role_user = Table(
    "role_user",
    Base.metadata,
    Column("user_id", ForeignKey("users.id"), primary_key=True),
    Column("role_id", ForeignKey("roles.id"), primary_key=True)
)

class User(Base):
    __tablename__ = "users"
    id: Mapped[int] = mapped_column(primary_key=True)
    username: Mapped[str] = mapped_column()

    roles: Mapped[list["Role"]] = relationship(
        secondary=role_user,
        back_populates="users",
        lazy="selectin"
    )

class Role(Base):
    __tablename__ = "roles"
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column()

    users: Mapped[list[User]] = relationship(
        secondary=role_user,
        back_populates="roles",
        lazy="selectin"
    )

在 SQLAlchemy 聲明多對多關係時,secondary 參數既可以填 字符串形式的表名,也可以填 已經定義好的中間表對象(Table 對象)。

① econdary="role_user" —— 使用字符串表名

roles = relationship("Role", secondary="role_user", back_populates="users")

② secondary=role_user —— 傳入中間表對象(推薦方式)

role_user = Table(
    "role_user",
    Base.metadata,
    Column("user_id", ForeignKey("users.id"), primary_key=True),
    Column("role_id", ForeignKey("roles.id"), primary_key=True)
)

roles = relationship("Role", secondary=role_user, back_populates="users")

對於如果獲取對應角色的用户記錄,我們可以通過下面方式獲取(通過連接中間表的方式)

async def get_users_by_role(db: AsyncSession, role_id: int) -> list[User]:
    stmt = (
        select(User)
        .join(role_user, role_user.c.user_id == User.id)
        .where(role_user.c.role_id == role_id)
    )
    result = await db.execute(stmt)
    return result.scalars().all()

也可以下面的方式進行處理(使用 relationship any()),效果是一樣的。

select(User).filter(User.roles.any(id=role_id))

如果需要寫入用户、角色的關聯關係,我們可以使用下面方法來通過中間表進行判斷並寫入記錄。

from sqlalchemy import select, insert

async def add_users_to_role(db: AsyncSession, role_id: int, user_ids: list[int]):
    # 1️⃣ 查詢已有關聯 user_id
    stmt = select(role_user.c.user_id).where(role_user.c.role_id == role_id)
    res = await db.execute(stmt)
    existing_user_ids = {row[0] for row in res.fetchall()}

    # 2️⃣ 過濾出新的 user_id
    new_user_ids = [uid for uid in user_ids if uid not in existing_user_ids]
    if not new_user_ids:
        return 0  # 沒有新增

    # 3️⃣ 批量插入
    values = [{"user_id": uid, "role_id": role_id} for uid in new_user_ids]
    stmt = insert(role_user).values(values)
    await db.execute(stmt)
    await db.commit()

    return len(new_user_ids)

如果只是單個記錄的插入,可以利用下面的方式處理。

from sqlalchemy import select, insert

async def add_user_to_role(db: AsyncSession, role_id: int, user_id: int) -> bool:
    # 1️⃣ 檢查是否已存在關聯
    stmt = select(role_user).where(
        role_user.c.role_id == role_id,
        role_user.c.user_id == user_id
    )
    res = await db.execute(stmt)
    exists = res.first()

    if exists:
        return False  # 已存在,不再插入

    # 2️⃣ 插入記錄
    stmt = insert(role_user).values(user_id=user_id, role_id=role_id)
    await db.execute(stmt)
    await db.commit()

    return True

以上就是對於在Python+FastAPI的後端項目中使用SqlAlchemy操作數據的幾種常見方式,包括單表處理,多表關聯、中間表的數據維護和定義等內容,是我們在操作常規數據的時候,經常碰到的幾種方式。

希望上文對你有所啓發和幫助,感謝閲讀。

user avatar
0 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.