返回文章列表

SQLAlchemy 非同步操作與網頁整合

本文探討 SQLAlchemy 的非同步操作與網頁框架整合,解析隱式資料函式庫操作的非同步相容性問題,並提供查詢最佳化策略與 Flask、FastAPI 整合範例,有效提升網頁應用程式效能。

Web 開發 資料函式庫

SQLAlchemy 提供了成熟的 ORM 功能,但也引入了隱式資料函式庫操作的複雜性,尤其在非同步環境下可能導致關聯屬性懶載入問題。本文將探討如何透過初始化列表樣式關聯屬性或調整 autoflush 選項來避免這些問題,並深入研究非同步查詢的最佳化策略,例如使用 stream()stream_scalars() 處理大量資料,以提升效能。此外,文章也提供 SQLAlchemy 與 Flask 和 FastAPI 框架的整合範例,涵蓋模型序列化、反序列化和通用整合方法,協助開發者將 SQLAlchemy 無縫整合至網頁應用程式中。

隱式資料函式庫操作與非同步相容性問題解析

在對模型進行上述修改後,幾乎所有的隱式資料函式庫操作都被停用了。然而,在會話(session)中新增物件並執行flush()操作時,仍然存在一個隱式的情況。

flush() 操作與其影響

在會話的上下文中,flush()操作將會話中累積的所有未提交的變更寫入底層的資料函式庫交易,使其為資料函式庫所知。由於會話預設啟用了autoflush選項,SQLAlchemy通常會在執行資料函式庫查詢前自動發出flush()呼叫,以確保查詢結果包含會話中尚未提交的變更。

大多數情況下,flush()呼叫不會引起問題,但有一種特定情況除外。如果會話中有剛新增的物件,且這些物件具有未初始化的列表樣式關聯屬性,那麼當這些物件被flush()時,未初始化的關聯屬性將被標記為未載入,這意味著下次存取它們時將嘗試對其進行懶載入。

問題重現與解決方案

要理解這個問題如何影回應用程式,可以在非同步相容的Python會話中觸發此錯誤。首先,使用以下命令啟動一個支援await關鍵字的Python會話:

(venv) $ python -m asyncio

然後,可以按照以下步驟重現錯誤:

>>> from db import Session
>>> from models import Customer, Order
>>> session = Session()
>>> c = Customer(name='Susan') 
>>> o = Order(customer=c)
>>> session.add(o)
>>> o.order_items 
[]
>>> await session.flush() 

>>> o.order_items 
Traceback ...

有多種方法可以避免在會話flush()後發生關聯屬性的懶載入:

  1. 使用write_only載入機制:對於所有列表樣式的關聯屬性,使用write_only載入機制可以避免懶載入。

  2. 使用raise載入器:對於所有列表樣式的關聯屬性,使用raise載入器並在需要載入關聯屬性時透過options()子句覆寫此載入器,可以在嘗試懶載入時引發錯誤。

  3. 停用autoflush選項:停用會話的autoflush選項可以避免在查詢前自動執行flush(),但可能會導致查詢結果不包含會話中的未提交變更。

Session = async_sessionmaker(engine, expire_on_commit=False, autoflush=False)


4. **初始化列表樣式關聯屬性**:在會話`flush()`前初始化所有列表樣式的關聯屬性,可以使SQLAlchemy在`flush()`時保留其值。

   ```python
>>> o = Order(customer=c, order_items=[]) 
>>> session.add(o)
>>> await session.flush()
>>> o.order_items 
[]

程式碼實作:自動初始化列表關聯屬性

為了避免每次建立新物件時都需要手動初始化關聯屬性,可以擴充Model類別以自動初始化所有根據列表的關聯屬性。以下是一個範例實作:

# Listing 41 db.py: Initialize all list relationships
from sqlalchemy import event, inspect
# ...

@event.listens_for(Model, "init", propagate=True)
def init_relationships(tgt, arg, kw):
    mapper = inspect(tgt.__class__)
    for arg in mapper.relationships:
        if arg.collection_class is None and arg.uselist:
            continue # skip write-only and similar relationships
        if arg.key not in kw:
            kw.setdefault(
                arg.key, None if not arg.uselist else arg.collection_class())

內容解密:

  1. @event.listens_for(Model, "init", propagate=True):此裝飾器註冊init_relationships函式作為事件處理器,當Model例項被建立時觸發。

  2. mapper = inspect(tgt.__class__):使用SQLAlchemy的inspect()函式對模型類別進行內省,以找出需要初始化的關聯屬性。

  3. for arg in mapper.relationships::遍歷模型類別的所有關聯屬性。

  4. kw.setdefault(arg.key, None if not arg.uselist else arg.collection_class()):如果關聯屬性未在建構函式中被初始化,則將其預設為空列表(如果是列表樣式的關聯屬性)。

這個解決方案可以有效地避免因flush()導致的懶載入問題,同時保持了程式碼的清晰和可維護性。

非同步資料函式庫操作與查詢最佳化

在現代軟體開發中,非同步程式設計已成為提升應用程式效能的重要技術手段。特別是在資料函式庫操作領域,非同步查詢能夠顯著改善系統的平行處理能力與整體效能表現。本文將探討如何在SQLAlchemy框架下實作非同步資料函式庫操作,並重點分析查詢最佳化策略。

非同步匯入指令碼的結構調整

要使資料匯入指令碼適應非同步環境,需要對其結構進行特定修改。首先,必須匯入asyncio函式庫並定義非同步的main函式:

import asyncio
async def main():
    # 匯入邏輯實作
    pass

if __name__ == '__main__':
    asyncio.run(main())

內容解密:

  1. asyncio.run(main())用於啟動非同步事件迴圈並執行main函式。
  2. 資料函式庫session操作需改用async with語法以支援非同步上下文管理。
  3. 所有涉及資料函式庫的操作(如session.execute()session.scalar()session.commit())都必須使用await關鍵字進行等待。

非同步查詢的實作與最佳化

在SQLAlchemy的非同步環境中,查詢陳述式的建構方式保持不變,但執行方式需要調整為非同步模式:

>>> from sqlalchemy import select
>>> from db import Session
>>> from models import Product, Customer, Order
>>> session = Session()

>>> c64 = await session.scalar(
    select(Product)
    .where(Product.name == 'Commodore 64'))
>>> c64
Product(41, "Commodore 64")

內容解密:

  1. 查詢建構過程與同步模式相同,但執行時需使用await關鍵字。
  2. session.scalar()session.scalars()方法需要非同步等待其執行結果。
  3. 對於大型查詢結果集,使用stream()stream_scalars()方法可實作更高效的非同步迭代處理。

串流結果處理機制

在處理大量資料查詢時,傳統的結果集傳回方式可能導致效能問題。SQLAlchemy提供了串流處理機制來最佳化這一過程:

>>> r = await session.stream_scalars(
    c.orders.select()
    .order_by(Order.timestamp.desc())
    .limit(2))
>>> [order async for order in r]
[Order(eaf9c1386a514c9781bdd849f7e99787), Order(db2c90dcc4ae4072b12a58496f47f5cf)]

內容解密:

  1. stream_scalars()方法傳回一個支援非同步迭代的結果物件。
  2. 使用async for迴圈可以逐一處理查詢結果,有效降低記憶體佔用。
  3. 在處理大型結果集時,建議優先使用串流處理方法以提升效能。

最佳實踐與效能考量

在實際開發過程中,應根據具體業務場景選擇適當的查詢執行方法:

  • 當預期查詢傳回多欄位結果時,建議使用stream()方法。
  • 當查詢結果為單一值時,優先使用stream_scalars()方法。
  • 對於確定會傳回單一結果的查詢,可安全使用scalar()scalar_one()scalar_or_none()方法。

綜上所述,透過合理採用非同步查詢機制並結合串流處理技術,可以顯著提升資料函式庫操作的效能與系統整體的平行處理能力。開發人員應深入理解這些技術特點,並根據實際業務需求進行適當的最佳化設計。

SQLAlchemy 與網頁應用程式整合

SQLAlchemy 是 Python 網頁伺服器中新增資料函式庫支援的最佳選擇之一,無論是建立傳統的網頁應用程式還是與網頁前端或智慧型手機應用程式配合使用的網頁 API。在本章中,將示範與 Flask 和 FastAPI 兩個最流行的 Python 網頁框架的範例整合。

通用整合方法

如果您正在尋找將 SQLAlchemy 整合到網頁應用程式中的最簡單方法,那麼您應該考慮不使用任何整合。按照本文中介紹的結構,您可以將 db.pymodels.py 模組新增到您的應用程式中,然後使用 session 上下文管理器在需要的地方包含資料函式庫功能。

這種方法不僅適用於網頁應用程式,也適用於其他型別的 Python 應用程式,並且具有不需要任何額外依賴或擴充套件的優勢。

SQLAlchemy 整合技術

雖然上一節中建議的不使用整合的方法對於許多專案來說應該足夠好,但您可能更喜歡使用一種封裝和簡化資料函式庫功能的方法,這種方法對於您選擇的網頁框架來說很方便。下面的章節將討論一些應該考慮的實作細節。

SQLAlchemy 匯入的消除歧義

SQLAlchemy 有時會匯出許多類別和函式,這可能會令人感到繁瑣。考慮一下前一章中 models.py 模組中使用的 SQLAlchemy 匯入:

from sqlalchemy import String, Text, ForeignKey, Table, Column
from sqlalchemy.orm import Mapped, WriteOnlyMapped, mapped_column, relationship

如果您需要執行資料函式庫查詢,那麼您必須匯入 Sessionselect() 函式和其他一些符號,具體取決於您的需求。

在應用程式中擁有這些長長的匯入清單有兩個缺點。首先,維護這些匯入清單很耗時,但更重要的是,一些具有相當通用名稱的匯入(如 select)可能會與其他依賴項或應用程式本身的符號發生衝突。

一個有用的技術可以解決這兩個問題,即透過只匯入父模組來命名所有匯入。對於 SQLAlchemy ORM 應用程式,通常有兩個父模組:sqlalchemysqlalchemy.orm,因此可以直接匯入:

import sqlalchemy as sa
import sqlalchemy.orm as so

現在,符號分別以 sa.so. 為字首,用於 SQLAlchemy Core 和 ORM。以下是使用這種樣式的 Product 模型範例:

class Product(Model):
    __tablename__ = 'products'
    id: so.Mapped[int] = so.mapped_column(primary_key=True)
    name: so.Mapped[str] = so.mapped_column(
        sa.String(64), index=True, unique=True)
    # ...

模型序列化

網頁應用程式和網頁 API 特有的需求是將模型傳送給請求它們的客戶端。為了透過網路傳送這些實體,它們必須被序列化,這是一個將 Python 模型例項從其內部二進位表示轉換為可以傳輸的字串或位元組序列,然後在另一端重建的過程。

最常用的序列化格式是 JavaScript 物件表示法(JSON)。以下是 RetroFun 資料函式庫中的一個 Product 實體在序列化為 JSON 格式後可能的外觀:

{
    "id": 41,
    "name": "Commodore 64",
    "manufacturer": {
        "id": 14,
        "name": "Commodore"
    },
    "countries": [
        {
            "id": 3,
            "name": "USA"
        }
    ],
    "year": 1982,
    "cpu": "6510"
}

JSON 格式在語法上與 Python 字典、列表和基本型別(如整數和字串)相似。事實上,Python 標準函式庫中的 json 模組可以將具有上述結構的 Python 字典渲染為對應的 JSON 序列化表示,該表示作為字串傳回,可以在對客戶端的回應中傳送。

反序列化

序列化的反向過程稱為反序列化。在 JSON 的情況下,它由 JSON 解碼器執行。所有語言和技術堆積疊都有 JSON 解碼器,因此當客戶端接收到回應時,它可以使用 JSON 解碼器將 JSON 表示轉換回其本地資料結構。

Flask 與 FastAPI 整合範例

本章將提供 Flask 和 FastAPI 這兩個流行的 Python 網頁框架與 SQLAlchemy 整合的範例。這些範例將展示如何使用 SQLAlchemy 與網頁框架配合,以實作資料函式庫功能。

程式碼範例:使用 SQLAlchemy 與 Flask 整合

from flask import Flask, jsonify
from flask_sqlalchemy import SQLAlchemy

app = Flask(__name__)
app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///example.db"
db = SQLAlchemy(app)

class Product(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(64), index=True, unique=True)

@app.route("/products", methods=["GET"])
def get_products():
    products = Product.query.all()
    return jsonify([{"id": p.id, "name": p.name} for p in products])

if __name__ == "__main__":
    app.run(debug=True)

#### 內容解密:

此範例展示瞭如何使用 Flask 和 SQLAlchemy 建立一個簡單的網頁 API。我們首先建立了一個 Flask 應用程式,並組態了 SQLAlchemy 使用 SQLite 資料函式庫。然後,我們定義了一個 Product 模型,並建立了一個路由來取得所有產品的列表。最後,我們使用 jsonify 將產品列表序列化為 JSON 格式並傳回給客戶端。

程式碼範例:使用 SQLAlchemy 與 FastAPI 整合

from fastapi import FastAPI
from pydantic import BaseModel
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

app = FastAPI()

# SQLAlchemy setup
SQLALCHEMY_DATABASE_URL = "sqlite:///example.db"
engine = create_engine(SQLALCHEMY_DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

Base = declarative_base()

class Product(Base):
    __tablename__ = "products"
    id = Column(Integer, primary_key=True)
    name = Column(String, index=True, unique=True)

Base.metadata.create_all(bind=engine)

class ProductSchema(BaseModel):
    id: int
    name: str

@app.get("/products/")
def read_products():
    db = SessionLocal()
    products = db.query(Product).all()
    return [{"id": p.id, "name": p.name} for p in products]

#### 內容解密:

此範例展示瞭如何使用 FastAPI 和 SQLAlchemy 建立一個簡單的網頁 API。我們首先建立了一個 FastAPI 應用程式,並組態了 SQLAlchemy 使用 SQLite 資料函式庫。然後,我們定義了一個 Product 模型,並建立了一個路由來取得所有產品的列表。最後,我們使用 Pydantic 的 BaseModel 將產品列表序列化為 JSON 格式並傳回給客戶端。