返回文章列表

RetroFun資料函式庫擴充套件與部落格訪客行為追蹤

本文闡述如何擴充套件 RetroFun 資料函式庫以追蹤部落格文章與訪客行為,包含資料函式庫模型設計、匯入指令碼實作以及頁面分析查詢範例。透過新增 BlogArticle、BlogAuthor、BlogUser、BlogSession 和 BlogView 模型,實作了文章、作者、訪客、會話和瀏覽記錄的關聯,並利用

資料函式庫 後端開發

資料函式庫模型設計的重點在於BlogArticleBlogAuthorBlogUserBlogSession以及BlogView五個模型之間的關聯性設計,其中BlogArticleBlogAuthor是一對多關係,ProductBlogArticle也是一對多關係,CustomerBlogUser同樣是一對多關係,而BlogUserBlogSession也是一對多關係,最後BlogView模型則負責記錄BlogArticleBlogSession之間的多對多關係,允許重複瀏覽記錄以準確計算頁面瀏覽次數。匯入指令碼則示範瞭如何使用 Python 和 SQLAlchemy 將 CSV 檔案中的資料匯入到資料函式庫中,並利用分批提交和快取機制提高效率。頁面分析查詢範例展示瞭如何使用 SQLAlchemy 進行各種資料分析,例如計算特定時間段內的頁面瀏覽量、部落格文章瀏覽量排名以及按產品分類別的頁面瀏覽量統計等。此外,文章也簡要介紹了外連線的概念及其在不同資料函式庫系統中的支援情況。

擴充套件RetroFun資料函式庫以追蹤部落格文章與訪客行為

在真實世界中,許多公司都設有部落格來發表文章,以促進銷售。為了達到這個目的,我們需要擴充套件現有的資料函式庫,以儲存部落格文章的相關資訊,並追蹤訪客對這些文章的瀏覽行為。

部落格文章與作者模型設計

首先,我們來看看如何設計部落格文章和作者的資料模型。以下程式碼展示了新增的BlogArticleBlogAuthor模型:

class Product(Model):
    # ...
    blog_articles: WriteOnlyMapped['BlogArticle'] = relationship(
        back_populates='product')
    # ...

class BlogArticle(Model):
    __tablename__ = 'blog_articles'
    id: Mapped[int] = mapped_column(primary_key=True)
    title: Mapped[str] = mapped_column(String(128), index=True)
    author_id: Mapped[int] = mapped_column(ForeignKey('blog_authors.id'), index=True)
    product_id: Mapped[Optional[int]] = mapped_column(ForeignKey('products.id'), index=True)
    timestamp: Mapped[datetime] = mapped_column(default=datetime.utcnow, index=True)
    author: Mapped['BlogAuthor'] = relationship(back_populates='articles')
    product: Mapped[Optional['Product']] = relationship(back_populates='blog_articles')

    def __repr__(self):
        return f'BlogArticle({self.id}, "{self.title}")'

class BlogAuthor(Model):
    __tablename__ = 'blog_authors'
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(64), index=True)
    articles: WriteOnlyMapped['BlogArticle'] = relationship(back_populates='author')

    def __repr__(self):
        return f'BlogAuthor({self.id}, "{self.name}")'

內容解密:

  1. BlogArticle模型儲存了部落格文章的標題、作者、相關產品以及發表時間等資訊。其中,author_idproduct_id分別是外部索引鍵,分別關聯到BlogAuthorProduct模型。
  2. BlogAuthor模型簡單地儲存了作者的名稱。
  3. BlogArticleBlogAuthor之間存在一對多的關係,因為一位作者可以撰寫多篇文章。
  4. 同樣地,ProductBlogArticle之間也存在一對多的關係,因為一個產品可以與多篇部落格文章相關聯。

部落格訪客與瀏覽行為追蹤

接下來,我們需要追蹤訪客對部落格文章的瀏覽行為。為此,我們引入了BlogUserBlogSession模型:

class Customer(Model):
    # ...
    blog_users: WriteOnlyMapped['BlogUser'] = relationship(back_populates='customer')
    # ...

class BlogUser(Model):
    __tablename__ = 'blog_users'
    id: Mapped[UUID] = mapped_column(default=uuid4, primary_key=True)
    customer_id: Mapped[Optional[UUID]] = mapped_column(ForeignKey('customers.id'), index=True)
    customer: Mapped[Optional['Customer']] = relationship(back_populates='blog_users')
    sessions: WriteOnlyMapped['BlogSession'] = relationship(back_populates='user')

    def __repr__(self):
        return f'BlogUser({self.id.hex})'

class BlogSession(Model):
    __tablename__ = 'blog_sessions'
    id: Mapped[UUID] = mapped_column(default=uuid4, primary_key=True)
    user_id: Mapped[UUID] = mapped_column(ForeignKey('blog_users.id'), index=True)
    user: Mapped['BlogUser'] = relationship(back_populates='sessions')

    def __repr__(self):
        return f'BlogSession({self.id.hex})'

內容解密:

  1. BlogUser模型代表了部落格的訪客,使用UUID作為主鍵,以避免使用連續的數字ID可能帶來的安全問題。
  2. BlogSession模型代表了一次瀏覽會話,同樣使用UUID作為主鍵。每當訪客存取部落格時,都會建立一個新的會話,並與訪客的BlogUser記錄關聯起來。
  3. CustomerBlogUser之間存在一對多的關係,因為一位客戶可能會從不同的裝置存取部落格,從而建立多個BlogUser記錄。
  4. 透過將CustomerBlogUser關聯起來,可以更好地分析客戶的行為,並產生更有價值的報告。

匯入部落格分析資料的資料函式庫設計與實作

在開發部落格分析功能時,需要設計一個能夠記錄使用者行為的資料函式庫結構。這個結構需要能夠處理使用者在不同裝置上的行為,例如手機和筆電,並且記錄使用者瀏覽的文章。

資料函式庫模型設計

為了實作上述功能,需要建立三個主要的資料函式庫模型:BlogArticleBlogSessionBlogView

BlogArticle 模型

BlogArticle 模型代表部落格中的文章。它包含文章的標題、作者、產品等資訊。

BlogSession 模型

BlogSession 模型代表使用者的瀏覽會話。它包含使用者的 ID、裝置資訊等。

BlogView 模型

BlogView 模型代表使用者對文章的瀏覽記錄。它是一個多對多的關聯表,記錄了文章和會話之間的關聯。

class BlogView(Model):
    __tablename__ = 'blog_views'
    id: Mapped[int] = mapped_column(primary_key=True)
    article_id: Mapped[int] = mapped_column(ForeignKey('blog_articles.id'))
    session_id: Mapped[UUID] = mapped_column(ForeignKey('blog_sessions.id'))
    timestamp: Mapped[datetime] = mapped_column(default=datetime.utcnow, index=True)
    article: Mapped['BlogArticle'] = relationship(back_populates='views')
    session: Mapped['BlogSession'] = relationship(back_populates='views')

內容解密:

  • id 欄位是主鍵,用於唯一標識每筆瀏覽記錄。
  • article_idsession_id 欄位分別是對 BlogArticleBlogSession 模型的外部索引鍵,記錄了文章和會話之間的關聯。
  • timestamp 欄位記錄了瀏覽發生的時間,預設為 UTC 現在時間。
  • articlesession 欄位是對應的 BlogArticleBlogSession 物件,用於導航關聯資料。

允許重複瀏覽記錄的設計考量

在這個多對多關聯中,需要允許重複的瀏覽記錄,以準確計算頁面瀏覽次數。因此,不使用由兩個外部索引鍵組成的複合主鍵,而是使用一個獨立的自動遞增 ID 作為主鍵。

匯入指令碼實作

為了測試和分析,需要匯入一些樣本資料。以下是一個匯入文章和作者資料的指令碼:

def main():
    with Session() as session:
        with session.begin():
            # 清除現有資料
            session.execute(delete(BlogView))
            session.execute(delete(BlogSession))
            session.execute(delete(BlogUser))
            session.execute(delete(BlogArticle))
            session.execute(delete(BlogAuthor))

    with Session() as session:
        with session.begin():
            all_authors = {}
            all_products = {}
            with open('articles.csv') as f:
                reader = csv.DictReader(f)
                for row in reader:
                    # 建立作者和產品物件
                    author = all_authors.get(row['author'])
                    if author is None:
                        author = BlogAuthor(name=row['author'])
                        all_authors[author.name] = author
                    
                    product = None
                    if row['product']:
                        product = all_products.get(row['product'])
                        if product is None:
                            product = session.scalar(select(Product).where(Product.name == row['product']))
                            all_products[product.name] = product
                    
                    # 建立文章物件
                    article = BlogArticle(
                        title=row['title'],
                        author=author,
                        product=product,
                        timestamp=datetime.strptime(row['timestamp'], '%Y-%m-%d %H:%M:%S')
                    )
                    session.add(article)

if __name__ == '__main__':
    main()

內容解密:

  • 首先清除現有的相關資料,以確保資料的一致性。
  • 然後從 CSV 檔案中讀取資料,建立作者、產品和文章物件,並將它們新增到資料函式庫會話中。
  • 使用 csv.DictReader 讀取 CSV 檔案,可以方便地存取每一列的資料。
  • 作者和產品物件會被快取,以避免重複建立相同的物件。

另一個指令碼用於匯入頁面瀏覽記錄、部落格使用者和會話的資料:

def main():
    with Session() as session:
        with session.begin():
            # 清除現有資料
            session.execute(delete(BlogView))
            session.execute(delete(BlogSession))
            session.execute(delete(BlogUser))

    with Session() as session:
        all_articles = {}
        all_customers = {}
        all_blog_users = {}
        all_blog_sessions = {}
        with open('views.csv') as f:
            reader = csv.DictReader(f)
            for row in reader:
                # 建立使用者和會話物件
                user = all_blog_users.get(row['user'])
                if user is None:
                    # ...

內容解密:

  • 與前一個指令碼類別似,首先清除現有的相關資料。
  • 然後從 CSV 檔案中讀取資料,建立使用者、會話和頁面瀏覽記錄物件。
  • 使用快取來避免重複建立相同的物件。

資料匯入與頁面分析查詢

匯入大量資料的處理方式

在處理大量資料匯入時,採用適當的資料函式庫會話管理至關重要。以下範例展示瞭如何有效地匯入大資料集到資料函式庫中:

i = 0
for row in reader:
    # ... 匯入資料的邏輯
    i += 1
    if i % 100 == 0:
        print(i)
        session.commit()
print(i)
session.commit()

內容解密:

  1. 計數器使用:變數 i 用於追蹤已匯入的資料行數。
  2. 分批提交:每當 i 是 100 的倍數時,執行 session.commit() 以提交變更。這種分批處理的方式避免了一次性提交大量資料,減少記憶體消耗並提高效率。
  3. 進度顯示:透過列印 i 的值,可以觀察到匯入的進度。
  4. 最終提交:迴圈結束後,再次執行 session.commit() 以確保最後一批資料被提交。

頁面分析查詢範例

計算特定時間段內的頁面瀏覽總數

>>> from datetime import datetime
>>> from sqlalchemy import select, func
>>> from db import Session
>>> from models import BlogArticle, BlogView, Product
>>> session = Session()
>>> q = (select(func.count(BlogView.id))
.where(BlogView.timestamp.between(
    datetime(2022, 11, 1), datetime(2022, 12, 1))))
>>> session.scalar(q)
4034

內容解密:

  1. 查詢特定時間段內的瀏覽量:使用 BlogView.timestamp.between() 方法篩選出特定時間段內的瀏覽記錄。
  2. func.count() 用於計數:計算篩選出的瀏覽記錄總數。

部落格文章瀏覽量排名查詢

>>> page_views = func.count(BlogView.id).label(None)
>>> q = (select(BlogArticle.title, page_views)
.join(BlogArticle.views)
.where(BlogView.timestamp.between(
    datetime(2022, 11, 1), datetime(2022, 12, 1)))
.group_by(BlogArticle)
.order_by(page_views.desc(), BlogArticle.title))
>>> session.execute(q).all()
[('Boy itself fish traditional', 57), ..., ('Still defense foreign social', 1)]

內容解密:

  1. 多表連線查詢:透過 join(BlogArticle.views)BlogArticleBlogView 表連線起來。
  2. 分組與排序:使用 group_by() 對部落格文章進行分組,並透過 order_by() 按瀏覽量降序排序。

按產品分類別的頁面瀏覽量查詢

>>> q = (select(Product.name, page_views)
.join(Product.blog_articles)
.join(BlogArticle.views)
.where(BlogView.timestamp.between(
    datetime(2022, 11, 1), datetime(2022, 12, 1)))
.group_by(Product)
.order_by(page_views.desc()))
>>> session.execute(q).all()
[('ZX Spectrum', 1096), ('Commodore 64', 1056), ('Apple II', 349),
('TRS-80 Color Computer', 349), ('Amiga', 301), ('BBC Micro', 180), ('TI-99/4A', 133),
('Commodore 128', 77)]

內容解密:

  1. 多層連線查詢:透過連線 ProductBlogArticleBlogView 表,實作按產品分類別統計瀏覽量。
  2. inner join 的限制:由於採用了 inner join,未與任何產品關聯的部落格文章將不會被包含在結果中。

外連線(Outer Join)介紹

外連線允許查詢結果包含一側或兩側表中不匹配的記錄。主要有三種型別:

  • 全外連線(Full Outer Join):包含左右兩側表中所有記錄。
  • 左外連線(Left Outer Join):包含左側表中所有記錄。
  • 右外連線(Right Outer Join):包含右側表中所有記錄。

注意事項:

不同資料函式庫系統對外連線的支援程度不同。例如,SQLite 在版本 3.39.0 之後才支援全外連線和右外連線。MySQL 目前不支援全外連線。