在現代數據驅動的商業環境中,資料來源日益多樣化,其結構也愈趨複雜。企業在建構資料管道時,不僅需具備處理 XML 或 JSON 等半結構化資料的能力,更關鍵的挑戰在於如何確保流經管道的數據品質。不良數據可能導致錯誤的商業決策與模型預測,嚴重影響營運效益。因此,導入自動化的資料品質監控機制成為不可或缺的一環。本文將從技術實踐層面切入,先說明如何運用 Apache Spark 的內建函式有效解析與轉換複雜資料類型,接著銜接至資料品質管理的實務,探討如何利用 Deequ 函式庫建立系統性的資料驗證與分析框架,為建構高可靠性的資料基礎設施提供理論依據與操作指南。
第六章:深入理解資料轉換
處理複雜資料集類型
新欄位,其中包含已解析的XML。解析後的欄位類型為結構體陣列。玄貓現在需要從結構體內部展開陣列的元素,如下面的程式碼所示:
dfXmlFun
.select($"id", explode($"parsed.observation") // 展開 parsed.observation 陣列
.alias("observation"))
.groupBy("id") // 根據 id 進行分組
.agg(
max($"observation").alias("max_obs"), // 計算最大值
min($"observation").alias("min_obs"), // 計算最小值
avg($"observation").alias("avg_obs") // 計算平均值
).show(10)
這是輸出:
+---+-------+-------+------------------+
| id|max_obs|min_obs| avg_obs|
+---+-------+-------+------------------+
| 0| 8.3| 8.3| 8.3|
| 1| 9.9| 6.5| 8.2|
| 2| 9.0| 4.9| 6.8|
| 3| 8.8| 3.2|5.3999999999999995|
| 4| 5.7| 5.7| 5.7|
| 5| 6.4| 4.1| 5.300000000000001|
| 6| 3.2| 3.2| 3.2|
| 7| 7.5| 6.1| 6.8|
| 8| 10.0| 3.4| 7.959999999999999|
| 9| 9.2| 1.3| 5.88|
+---+-------+-------+------------------+
上述程式碼與從JSON檔案中展開observations的程式碼相同,除了這行:
.select($"id", explode($"parsed.observation"))
當玄貓定義要在explode操作中使用的欄位時,玄貓必須透過parsed.observation訪問結構體中的巢狀陣列。至此,玄貓結束了本節,玄貓學習了複雜類型,並看到了幾個如何使用它們的範例。
第七章:資料剖析與資料品質
當玄貓處理多個資料來源時,如果沒有適當的檢查,一些不良資料很容易通過。這可能會導致依賴上游資料準確性來建立模型、運行業務關鍵應用程式等的下游系統出現嚴重問題。為了使玄貓的資料管道具有彈性,玄貓必須實施資料品質檢查,以確保正在處理的資料符合以下要求:
- 完整性(Completeness):玄貓計畫用於即將到來的行銷活動的客戶資料集是否填寫了所有必需的屬性?
- 準確性(Accuracy):玄貓的客戶記錄的電子郵件地址和電話號碼是否準確?
- 一致性(Consistency):客戶資料在各個系統中是否一致?
- 有效性(Validity):玄貓的客戶記錄是否具有有效的郵遞區號?
- 唯一性(Uniqueness):玄貓的資料集中是否有相同客戶的多條記錄?
- 完整性(Integrity):玄貓的客戶資料是否可以在玄貓的組織中追蹤和連接?
玄貓將使用Deequ,它使得定義資料品質檢查、執行分析、根據資料建議約束、儲存指標以供將來參考等變得非常容易。它是一個最初由Amazon開發的開源函式庫。在本章中,玄貓將涵蓋以下主題:
- 了解Deequ的組成部分
- 執行資料分析
- 利用自動約束建議
- 定義約束
- 使用
MetricsRepository儲存指標 - 檢測異常
技術要求
如果玄貓尚未完成,玄貓建議玄貓按照第二章中介紹的步驟安裝VS Code或IntelliJ以及運行Scala程式所需的插件。玄貓還需要添加以下依賴項才能使用Deequ:
libraryDependencies +=
"com.amazon.deequ" %
"deequ" % "2.0.4-spark-3.3"
儘管玄貓可以在學習過程中選擇玄貓自己的資料,但玄貓將使用玄貓已載入的表格。
此圖示:複雜資料處理與資料品質管理流程
@startuml
!define DISABLE_LINK
!define PLANTUML_FORMAT svg
!theme _none_
skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100
package "複雜資料類型處理" as ComplexTypeHandling {
rectangle "dfXmlFun: DataFrame (Parsed XML)" as XmlParsedDF
component "select($"id", explode($"parsed.observation").alias(\"observation\"))" as ExplodeParsedXml
component "groupBy(\"id\").agg(max, min, avg)" as AggregateParsedXml
rectangle "dfAggregatedXml: DataFrame" as AggregatedXmlDF
}
package "資料品質管理 (Deequ)" as DataQualityManagement {
rectangle "原始資料集 (例如:客戶資料)" as RawCustomerData
component "Deequ 函式庫" as DeequLibrary
component "了解 Deequ 組成部分" as DeequComponents
component "執行資料分析" as DataAnalysis
component "利用自動約束建議" as AutoConstraintSuggestion
component "定義約束" as DefineConstraints
component "使用 MetricsRepository 儲存指標" as StoreMetrics
component "檢測異常" as AnomalyDetection
rectangle "資料品質報告與監控" as DataQualityReport
}
XmlParsedDF --> ExplodeParsedXml
ExplodeParsedXml --> AggregateParsedXml
AggregateParsedXml --> AggregatedXmlDF : 從 XML 提取並聚合觀察值
RawCustomerData --> DeequLibrary
DeequLibrary --> DeequComponents
DeequComponents --> DataAnalysis
DataAnalysis --> AutoConstraintSuggestion
AutoConstraintSuggestion --> DefineConstraints
DefineConstraints --> StoreMetrics
StoreMetrics --> AnomalyDetection
AnomalyDetection --> DataQualityReport : 確保資料品質的完整流程
note right of AggregatedXmlDF
- 成功從巢狀 XML 結構中提取並聚合數值資料
- 證明 Spark 處理複雜資料類型的能力
end note
note right of DataQualityReport
- 透過 Deequ 實施資料品質檢查,提升資料可靠性
- 涵蓋完整性、準確性、一致性、有效性、唯一性和完整性
end note
@enduml
看圖說話:
此圖示展示了從複雜資料類型處理到資料品質管理的兩個關鍵階段。在複雜資料類型處理部分,玄貓從一個已解析XML的dfXmlFun DataFrame開始。透過explode函數,玄貓將巢狀結構中的parsed.observation陣列展開為獨立的行,並將其別名為observation。隨後,玄貓按id對這些展開的資料進行groupBy,並計算max、min和avg等聚合指標,最終生成了dfAggregatedXml DataFrame。這個流程證明了Spark在處理從XML中提取的複雜巢狀資料時的強大能力。
進入資料品質管理部分,玄貓引入了Deequ函式庫來確保資料的可靠性。這個流程從原始客戶資料開始,透過Deequ的各個組成部分逐步進行。玄貓首先需要了解Deequ的組成部分,然後執行資料分析以洞察資料特性。接著,玄貓可以利用自動約束建議來自動生成潛在的資料品質規則,然後定義約束以明確資料應滿足的標準。這些約束的執行結果會使用MetricsRepository儲存指標,以便長期追蹤和監控。最終,透過檢測異常,玄貓可以及時發現資料中不符合預期模式或規則的偏差,所有這些都匯總到資料品質報告與監控中,以確保資料的完整性、準確性、一致性、有效性、唯一性和完整性。這個綜合流程對於建立穩健的資料管道至關重要。
第七章:資料剖析與資料品質
玄貓將使用在第四章中載入到MySQL資料庫的表格。如果玄貓尚未完成,玄貓可能需要回顧那裡的步驟。
了解 Deequ 的組成部分
Deequ提供了許多功能,使資料品質檢查變得容易。以下圖表顯示了其主要組成部分:
圖7.1 – Deequ 的組成部分
玄貓可以觀察到以下組成部分:
- 指標計算(Metrics computation):Deequ計算資料品質指標,例如完整性、最大值等。玄貓可以直接訪問在資料上計算的原始指標。
- 約束驗證(Constraint verification):Deequ會自動推導需要在資料上計算的必要指標,確保約束驗證。
- 約束建議(Constraint suggestion):玄貓可以選擇利用Deequ的自動約束建議方法來推斷有價值的約束,或定義玄貓自己的客製化資料品質約束。
在後台,Deequ使用Apache Spark進行指標計算,因此它快速高效。在接下來的章節中,玄貓將介紹這些功能。
執行資料分析
Deequ提供了生成資料統計資訊(稱為指標)的功能。例如,玄貓可以使用Deequ為玄貓提供資料集中的記錄數,告訴玄貓特定欄位是否唯一,給玄貓欄位之間的相關程度等等。Deequ透過在com.amazon.deequ.analyzers套件中定義的ApproxCountDistinct、Completeness、Correlation等**案例類別(case classes)**提供此功能。有關指標的完整列表及其說明,請參閱相關文獻。
在下面的範例中,玄貓將使用玄貓載入到名為flights的MySQL表格中的航班資料。玄貓分析flights資料以檢查記錄數、airline欄位是否包含任何NULL值、origin_airport的近似不同計數等等。結果集隨後轉換為DataFrame並最終顯示在螢幕上:
package com.packt.dewithscala.chapter7
import com.packt.dewithscala.utils._
import com.amazon.deequ.analyzers.runners.AnalyzerContext.
successMetricsAsDataFrame
import com.amazon.deequ.analyzers._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
object DataAnalysis extends App {
val session: SparkSession = Spark.initSparkSession("de-with-scala")
val db: Database = Database("my_db")
val df: DataFrame = db
.multiPartitionRead(
session = session,
dbTable = "my_db.flights",
partitionCol = "day_of_week",
upperBound = "7",
lowerBound = "1",
7
)
.filter(col("airline") === lit("US")) // 過濾出航空公司為 "US" 的航班
val analysisResult: AnalyzerContext = AnalysisRunner
.onData(df)
.addAnalyzer(Size()) // 添加 Size 分析器,計算記錄數
.addAnalyzer(Completeness("airline")) // 添加 Completeness 分析器,檢查 airline 欄位的完整性
.addAnalyzer(ApproxCountDistinct("origin_airport")) // 添加 ApproxCountDistinct 分析器,計算 origin_airport 的近似不同計數
.addAnalyzer(Correlation("departure_delay", "arrival_delay")) // 添加 Correlation 分析器,計算出發延遲和到達延遲的相關性
.addAnalyzer(Compliance("no arrival delay", "arrival_delay <= 0")) // 添加 Compliance 分析器,檢查沒有到達延遲的合規性
.run()
successMetricsAsDataFrame(session, analysisResult).show(false)
}
範例 7.1
這是輸出:
圖7.2 – 分析結果
從前面的輸出中,玄貓可以理解以下內容:
- DataFrame有198715條記錄
airline欄位沒有任何NULL值(完整性為1.0)departure_delay和arrival_delay之間的相關性為89%- 61.6%的航班準時或提前到達
origin_airport欄位大約有74個不同的值
正如玄貓所看到的,獲得這些指標非常簡單。除了玄貓剛才看到的,Deequ還提供了相當多的分析器。
在本節中,玄貓研究了如何利用Deequ對玄貓的資料進行分析。在下一節中,玄貓將繼續深入。
此圖示:Deequ 資料品質分析流程
@startuml
!define DISABLE_LINK
!define PLANTUML_FORMAT svg
!theme _none_
skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100
package "Deequ 核心概念" as DeequCore {
component "Metrics Computation" as MetricsComp
component "Constraint Verification" as ConstraintVer
component "Constraint Suggestion" as ConstraintSug
}
package "資料分析流程" as DataAnalysisFlow {
rectangle "原始航班資料 (MySQL)" as RawFlightData
component "SparkSession 初始化" as SparkInit
component "資料讀取與過濾" as DataReadFilter
rectangle "df: DataFrame (US 航空公司航班)" as FilteredDF
component "AnalysisRunner.onData(df)" as AnalysisSetup
component "addAnalyzer(Size())" as AddSize
component "addAnalyzer(Completeness(\"airline\"))" as AddCompleteness
component "addAnalyzer(ApproxCountDistinct(\"origin_airport\"))" as AddApproxCount
component "addAnalyzer(Correlation(\"departure_delay\", \"arrival_delay\"))" as AddCorrelation
component "addAnalyzer(Compliance(\"no arrival delay\", \"arrival_delay <= 0\"))" as AddCompliance
component "run() -> AnalyzerContext" as RunAnalysis
component "successMetricsAsDataFrame(session, analysisResult)" as MetricsToDF
rectangle "分析結果 DataFrame" as AnalysisResultDF
}
DeequCore -[hidden]-> DataAnalysisFlow
MetricsComp <|-- AddSize
MetricsComp <|-- AddCompleteness
MetricsComp <|-- AddApproxCount
MetricsComp <|-- AddCorrelation
MetricsComp <|-- AddCompliance
RawFlightData --> SparkInit
SparkInit --> DataReadFilter
DataReadFilter --> FilteredDF
FilteredDF --> AnalysisSetup
AnalysisSetup --> AddSize
AddSize --> AddCompleteness
AddCompleteness --> AddApproxCount
AddApproxCount --> AddCorrelation
AddCorrelation --> AddCompliance
AddCompliance --> RunAnalysis
RunAnalysis --> MetricsToDF
MetricsToDF --> AnalysisResultDF : 顯示詳細分析指標
note right of AnalysisResultDF
- 包含記錄數、完整性、相關性、合規性等關鍵指標
- 為資料品質評估提供量化依據
end note
@enduml
看圖說話:
此圖示清晰地展示了Deequ在資料品質分析中的應用流程。首先,圖示左側概述了Deequ的三個核心概念:指標計算、約束驗證和約束建議,這些是Deequ強大功能的基礎。
接著,圖示右側詳細描繪了資料分析流程。這個流程從原始航班資料開始,首先透過SparkSession初始化和資料讀取與過濾步驟,將資料載入並篩選出美國航空公司的航班,形成FilteredDF。隨後,FilteredDF被傳遞給AnalysisRunner.onData()進行分析設定。在這個階段,玄貓透過一系列addAnalyzer方法添加了多個Deequ分析器:包括Size()(計算記錄數)、Completeness("airline")(檢查airline欄位的完整性)、ApproxCountDistinct("origin_airport")(計算origin_airport的近似不同計數)、Correlation("departure_delay", "arrival_delay")(計算延遲的相關性)以及Compliance("no arrival delay", "arrival_delay <= 0")(檢查航班準點合規性)。
這些分析器被run()方法執行後,會產生一個AnalyzerContext結果。最後,successMetricsAsDataFrame()函數將這個結果轉換為一個易於閱讀的AnalysisResultDF,並顯示在螢幕上。這個DataFrame包含了所有計算出的資料品質指標,為玄貓評估資料的健康狀況提供了量化依據,例如記錄總數、欄位完整性、資料相關性等。
結論
縱觀現代資料工程的實踐演進,從單純的資料轉換能力,進階到對資料品質的系統性保障,標誌著工程成熟度的關鍵躍升。這不僅是技術棧的擴展,更是資料價值觀的深刻轉變。
先前章節所展示的複雜資料處理能力,例如從巢狀XML中萃取數據,僅是資料工程的基礎建設。若缺乏後續的品質驗證,此等強大能力反而可能放大「垃圾進,垃圾出」的風險,侵蝕決策品質的根基。Deequ的引入正是為了解決此一核心挑戰,它將完整性、準確性等抽象維度,轉化為可量化、可自動執行的分析器與約束,為資料管道建立了關鍵的第一道防線。
展望未來,資料品質管理將從被動補救轉為前瞻性策略,無縫整合至開發流程中。以Deequ為代表的工具將催生「品質即程式碼」(Quality as Code)文化,使資料可靠性成為可追蹤、可迭代的工程資產,而非難以捉摸的運氣。
玄貓認為,掌握資料轉換技術僅是入場券,建立起一套自動化的資料品質監控體系,才是實現資料價值最大化的關鍵。對於追求卓越的資料團隊而言,這已非選項,而是必經之路。