返回文章列表

SQLAlchemy複雜查詢與非同步程式設計

本文探討 SQLAlchemy 的進階用法,包含使用別名與條件表示式進行複雜查詢,以及在非同步環境下的最佳實踐。同時,文章也涵蓋了資料函式庫模型關聯設計與 Alembic 資料函式庫遷移組態,提供全面的資料函式庫操作。

資料函式庫 Web 開發

SQLAlchemy 作為一個功能強大的 Python ORM 框架,不僅提供了簡潔的資料函式庫操作介面,也支援進階的查詢技巧和非同步程式設計。在實際應用中,我們經常需要處理複雜的查詢邏輯,例如多表關聯、條件篩選、聚合計算等。此外,隨著非同步程式設計的普及,如何在非同步環境下高效地使用 SQLAlchemy 也成為了一個重要的議題。本文將結合實際案例,詳細介紹如何使用別名和條件表示式構建複雜查詢,以及如何在非同步應用程式中正確組態 SQLAlchemy 的 Session 和 relationship,並搭配 Alembic 進行資料函式庫遷移管理,確保資料函式庫操作的效能和穩定性。

在資料函式庫查詢中,使用別名可以簡化對同一表格多次查詢的操作,就像操作不同表格一樣。而條件表示式則允許根據不同條件傳回不同值,實作更靈活的查詢邏輯。例如,在統計文章瀏覽量時,可以根據文章型別或其他條件計算不同的指標。

使用別名與條件表示式進行複雜查詢

在處理複雜的資料函式庫查詢時,SQLAlchemy 提供了多種工具來簡化操作。其中,使用別名(alias)和條件表示式(CASE)是兩個非常有用的功能。本文將介紹如何使用這些功能來進行複雜的查詢。

使用別名

當需要對同一個表格進行多次查詢或操作時,使用別名可以讓我們像處理不同的表格一樣進行操作。例如,在處理部落格文章及其翻譯時,可以為 BlogArticle 建立一個別名 TranslatedBlogArticle

TranslatedBlogArticle = aliased(BlogArticle)

這樣,我們就可以將 TranslatedBlogArticleBlogArticle 當作兩個不同的表格來進行 join 操作。

條件表示式

SQL 中的 CASE 建構在 SQLAlchemy 中可以透過 case() 函式來實作。這個函式允許我們根據條件傳回不同的值。例如,在查詢原創文章及其翻譯文章的瀏覽量時,我們需要根據文章是否是原創來決定使用哪個 ID。

from sqlalchemy import case

original_id = case(
    (BlogArticle.translation_of == None, BlogArticle.id),
    else_=BlogArticle.translation_of_id
).label('original_id')

內容解密:

  1. 條件判斷case() 函式的第一個引數是一個 tuple,包含了條件和對應的值。在這個例子中,條件是 BlogArticle.translation_of == None,表示檢查文章是否是原創。如果是原創,則傳回 BlogArticle.id
  2. 預設值else_ 引數指定了當所有條件都不滿足時傳回的值。在這裡,如果文章不是原創(即它是翻譯文章),則傳回 BlogArticle.translation_of_id,也就是其原創文章的 ID。
  3. 標籤.label('original_id') 給這個計算出的欄位一個別名,方便後續查詢中使用。

結合別名與條件表示式進行複雜查詢

透過結合使用別名和條件表示式,我們可以實作更複雜的查詢。例如,查詢原創文章及其翻譯文章在某個時間段內的總瀏覽量。

page_views = func.count(BlogView.id).label('page_views')
q = (
    select(original_id, page_views)
    .join(BlogArticle.views)
    .where(BlogView.timestamp.between(datetime(2022, 11, 1), datetime(2022, 12, 1)))
    .group_by(original_id)
    .order_by(page_views.desc())
)

內容解密:

  1. 查詢目標:查詢原創文章及其翻譯文章在 2022 年 11 月的總瀏覽量。
  2. Join 操作:透過 .join(BlogArticle.views)BlogArticleBlogView 連線起來。
  3. 篩選條件:使用 .where() 方法篩選出指定時間範圍內的瀏覽記錄。
  4. 分組與排序:使用 .group_by() 將結果按原創文章 ID 分組,並使用 .order_by() 按瀏覽量降序排列。

將原始 ID 轉換回實體

由於 original_id 是數字型別,我們需要將其轉換回對應的 BlogArticle 實體。為此,可以再次使用別名,並進行 join 操作。

OriginalBlogArticle = aliased(BlogArticle)
q = (
    select(OriginalBlogArticle, page_views)
    .join(BlogArticle.views)
    .join(OriginalBlogArticle, original_id == OriginalBlogArticle.id)
    .where(BlogView.timestamp.between(datetime(2022, 11, 1), datetime(2022, 12, 1)))
    .group_by(OriginalBlogArticle)
    .order_by(page_views.desc())
)

內容解密:

  1. 再次使用別名:建立 OriginalBlogArticle 作為 BlogArticle 的別名,以便將 original_id 與對應的實體進行匹配。
  2. 額外的 Join 操作:透過 .join(OriginalBlogArticle, ...)original_idOriginalBlogArticle.id 進行匹配。
  3. 結果:最終查詢結果包含了原創文章實體及其對應的總瀏覽量。

非同步SQLAlchemy程式設計

SQLAlchemy自1.4版本開始支援使用asyncio套件進行非同步程式設計,無論是在Core還是ORM模組中。這項改進將SQLAlchemy的功能帶到了現代應用程式中,例如使用FastAPI網頁框架開發的應用。

非同步與同步的主要差異

非同步程式設計模式引入了應用程式執行模型的一些差異。其中一個使非同步程式設計具有挑戰性的方面是函式著色(function coloring),這是一種隱喻,用於描述Python和其他語言在混契約步和非同步程式碼方面的限制。

簡而言之,函式著色意味著非同步應用程式應該避免長時間執行的同步函式,因為這些函式是阻塞的,會阻止並發。因此,使用SQLAlchemy的網頁應用程式需要所有層級的非同步程式碼,從上到下的層級可能包括:

  • 網頁伺服器
  • 網頁框架
  • 路由邏輯
  • SQLAlchemy會話(Session)
  • SQLAlchemy引擎(Engine)
  • 資料函式庫驅動程式

顯然,非同步網頁應用程式需要伺服器和框架也是非同步的,這個需求延伸到更低的層級。這意味著任何使用SQLAlchemy的非同步應用程式功能都必須是非同步函式,並且會話和引擎物件必須替換為非同步等效物件。最後,資料函式庫驅動程式也必須設計為非同步工作。

另一個重要的差異與隱式資料函式庫活動有關。SQLAlchemy ORM是一個高階資料函式庫框架,有時會自行決定發出資料函式庫查詢。最好的例子是組態了預設懶載入(lazy loader)的關係屬性,它們在第一次存取時隱式執行資料函式庫查詢以取得結果。

這些隱式行為在非同步應用程式中不能存在,因為非同步模型的函式著色限制。所有非同步資料函式庫活動必須發生在非同步函式內部。

非同步資料函式庫驅動程式

要能夠在非同步應用程式中使用SQLAlchemy,您必須使用相容的資料函式庫驅動程式。書中提到的普通資料函式庫驅動程式都不能非同步使用。

SQLite

Python直譯器附帶的sqlite模組不支援非同步模型。要能夠從非同步程式碼中使用SQLite,SQLAlchemy支援aiosqlite第三方套件,需要將其安裝到您的虛擬環境中,如下所示:

(venv) $ pip install aiosqlite

給予SQLAlchemy的資料函式庫連線URL需要修改以反映此驅動程式的使用。如下所示:

DATABASE_URL=sqlite+aiosqlite:///retrofun.sqlite

MySQL

使用MySQL或MariaDB時,SQLAlchemy 2.0支援兩個非同步驅動程式:aiomysql和asyncmy。

您必須將所選套件安裝到虛擬環境中。例如,以下是安裝aiomysql的方法:

(venv) $ pip install aiomysql

然後,資料函式庫連線URL的方言部分必須更改以反映所使用的驅動程式。例如:

DATABASE_URL=mysql+aiomysql://retrofun:my-password@localhost:3306/retrofun

PostgreSQL

對於PostgreSQL,asyncpg驅動程式目前是唯一的非同步選項。

與所有其他資料函式庫一樣,需要安裝套件:

(venv) $ pip install asyncpg

您的資料函式庫連線URL必須在方言部分包含asyncpg。例如:

DATABASE_URL=postgresql+asyncpg://retrofun:my-password@localhost:5432/retrofun

引擎、後設資料和會話

SQLAlchemy附帶了一個非同步擴充套件,提供替代的引擎和會話物件。這些物件具有與您在前幾章中使用的常規物件相同的介面,但其方法是可等待的(awaitable)。

下面可以看到適合非同步應用程式的db.py版本。如果您打算在電腦上執行此程式碼,您可以為非同步程式碼建立一個新的專案目錄,以便在需要時保留前幾章的程式碼。

import os
from dotenv import load_dotenv
from sqlalchemy import MetaData
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase

class Model(DeclarativeBase):
    metadata = MetaData(naming_convention={
        "ix": "ix_%(column_0_label)s",
        "uq": "uq_%(table_name)s_%(column_0_name)s",
        "ck": "ck_%(table_name)s_%(constraint_name)s",
        "fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s",
        "pk": "pk_%(table_name)s",
    })

load_dotenv()
engine = create_async_engine(os.environ['DATABASE_URL'])
Session = async_sessionmaker(engine, expire_on_commit=False)

正如您所見,與同步版本沒有太多的差異。這個版本使用create_async_engine()而不是create_engine(),並且使用async_sessionmaker而不是sessionmaker

另一個差異是為會話組態的expire_on_commit=False選項。這停用了預設的SQLAlchemy行為,即在提交後過期所有例項屬性。

內容解密:

  1. create_async_engine():用於建立非同步引擎物件。
  2. async_sessionmaker:用於建立非同步會話類別。
  3. expire_on_commit=False:停用提交後過期所有例項屬性。

這段程式碼展示瞭如何組態SQLAlchemy以支援非同步操作,包括選擇合適的資料函式庫驅動程式和設定非同步引擎及會話。這對於開發現代網頁應用程式至關重要,因為它允許更高效的並發處理和更好的效能。

SQLAlchemy 非同步程式設計中的 Session 與 Relationship 組態最佳實踐

在開發非同步應用程式時,SQLAlchemy 的使用方式需要進行一些調整,以確保其與非同步程式設計模型的相容性。本文將探討在非同步環境中使用 SQLAlchemy 時,如何正確組態 Sessionrelationship,以避免潛在的問題。

Session 組態與過期模型處理

在非同步應用程式中,Session 的組態至關重要。特別是當使用 expire_on_commit=False 選項時,可以防止模型在提交後被標記為過期。這樣做的好處是避免了隱式的資料函式庫查詢,因為在非同步應用中,這種查詢可能會導致問題。

內容解密:

  • 使用 expire_on_commit=False 可以防止模型在提交後被標記為過期,從而避免隱式的資料函式庫查詢。
  • 這種做法的缺點是,如果使用長期存活的 Session 且多次提交,可能會導致模型資料過時。
  • 為瞭解決這個問題,可以使用較短生命週期的 Session,或是手動使用 session.expunge()session.refresh() 方法來更新模型。
from sqlalchemy.ext.asyncio import AsyncSession

async def get_session() -> AsyncSession:
    async with AsyncSession(engine, expire_on_commit=False) as session:
        # 使用 session 進行資料函式庫操作
        pass

進一步說明:

  1. 防止模型過期:透過設定 expire_on_commit=False,可以確保模型在提交後不會被標記為過期,從而避免不必要的資料函式庫查詢。
  2. 處理長期 Session:對於長期存活的 Session,需要手動管理模型的更新,以避免資料過時的問題。

非同步環境中的 MetaData 操作

在非同步環境中,MetaData 例項的操作需要特別注意。雖然 MetaData 沒有非同步版本,但可以使用 run_sync() 方法來執行同步的資料函式庫程式碼,如 create_all()drop_all()

async with engine.begin() as connection:
    await connection.run_sync(Model.metadata.drop_all)
    await connection.run_sync(Model.metadata.create_all)

內容解密:

  • 使用 run_sync() 方法可以在非同步環境中執行同步的資料函式庫操作。
  • 這種方法對於執行 create_all()drop_all() 等操作非常有用。

Relationship Loaders 的組態

在非同步應用程式中,模型的 relationship 組態需要仔細檢查。預設的懶載入行為(lazy='select')與非同步程式設計不相容,因此需要更改為其他載入策略。

載入策略選擇:

  • Eager Load:使用 joinedselectin 載入策略,可以在查詢父模型時一併載入相關模型。
  • Explicit Load:使用 lazy='raise' 可以強制顯式載入相關模型,避免隱式的資料函式庫查詢。
class Product(Model):
    # ...
    manufacturer: Mapped['Manufacturer'] = relationship(
        lazy='joined', innerjoin=True, back_populates='products')
    countries: Mapped[list['Country']] = relationship(
        lazy='selectin', secondary=ProductCountry, back_populates='products')

內容解密:

  1. 選擇合適的載入策略:根據關係的型別和需求,選擇適合的載入策略,如 joinedselectinwrite_only
  2. 避免懶載入:透過設定 lazy='raise',可以強制顯式載入相關模型,避免潛在的隱式查詢問題。

資料函式庫模型關聯與Alembic組態最佳實踐

在現代軟體開發中,資料函式庫的設計與管理是至關重要的一環。本文將探討如何使用SQLAlchemy進行資料函式庫模型關聯的設計,以及如何組態Alembic進行資料函式庫遷移。

資料函式庫模型關聯設計

關聯型別與實作

在SQLAlchemy中,資料函式庫模型之間的關聯是透過relationship函式來建立的。以下是一個典型的例子:

class Order(Model):
    # ...
    customer: Mapped['Customer'] = relationship(
        lazy='joined', innerjoin=True, back_populates='orders')
    order_items: Mapped[list['OrderItem']] = relationship(
        lazy='selectin', back_populates='order')
    # ...

內容解密:

  1. lazy='joined'表示關聯物件會在查詢主物件時一併載入,適合用於需要頻繁存取的關聯資料。
  2. innerjoin=True指定使用內連線查詢,確保只有存在關聯的資料才會被載入。
  3. back_populates引數用於雙向關聯的設定,使兩邊的模型都能互相參照。

Alembic組態與資料函式庫遷移

初始化Alembic倉儲

在使用Alembic進行資料函式庫遷移之前,需要先初始化一個遷移倉儲。對於非同步資料函式庫操作,可以使用以下命令:

(venv) $ alembic init -t async migrations

組態專案資料函式庫

env.py檔案中,需要進行一些設定以使Alembic能夠正確地與專案的資料函式庫互動:

from db import Model, engine
import models

target_metadata = Model.metadata
config.set_main_option("sqlalchemy.url", engine.url.render_as_string(
    hide_password=False))

內容解密:

  1. 匯入必要的模組,包括Modelengine,以便設定資料函式庫連線。
  2. target_metadata = Model.metadata將SQLAlchemy的模型元資料指定給target_metadata,使Alembic能夠識別模型的結構。
  3. 設定sqlalchemy.url選項以指定資料函式庫連線字串。

啟用批次遷移模式

為了確保相容性,特別是在使用SQLite時,啟用批次遷移模式是必要的:

def do_run_migrations(connection: Connection) -> None:
    context.configure(
        connection=connection, 
        target_metadata=target_metadata,
        render_as_batch=True)
    with context.begin_transaction():
        context.run_migrations()

內容解密:

  1. render_as_batch=True啟用批次遷移模式,這對於某些資料函式庫(如SQLite)是必要的,因為它們對遷移操作的支援有限。
  2. 在組態完成後,Alembic就可以自動產生初始的資料函式庫遷移指令碼。

實際操作與注意事項

  1. 環境變陣列態:確保.env檔案中的DATABASE_URL變數正確組態為非同步資料函式庫驅動。
  2. 資料函式庫遷移命令
    • 產生初始遷移指令碼:(venv) $ alembic revision --autogenerate -m "initial migration"
    • 執行遷移:(venv) $ alembic upgrade head