Delta Live Tables(DLT)根據 PySpark 構建,簡化了資料引入和轉換流程。利用 Spark DataFrames,開發者可以輕鬆定義資料讀取和轉換邏輯。Auto Loader 功能則能有效處理雲端儲存中的大量資料,自動推斷 Schema 並支援通知模式,提升了資料擷取的效率和可擴充套件性。此外,DLT 提供了 APPLY CHANGES 和 apply_changes() API,能自動處理亂序資料並應用變更至下游資料集,簡化了資料管線的構建和維護。最後,DLT 支援將資料集釋出至 Unity Catalog,提供集中式資料治理和安全控管,方便後續分析和處理。
使用 Delta Live Tables 進行資料轉換的高階應用
資料引入與轉換的簡化
Delta Live Tables(DLT)建構於 PySpark 之上,使我們能夠利用 Spark DataFrames 來定義如何從雲端儲存引入資料以及如何應用資料轉換。首先,我們定義一個函式,該函式使用 Spark 從 /databricks-datasets 目錄讀取 NYC taxi 樣本資料集:
def yellow_taxi_raw():
path = "/databricks-datasets/nyctaxi/tripdata/yellow"
return (spark.readStream
.schema(schema)
.format("csv")
.option("header", True)
.load(path))
內容解密:
此函式 yellow_taxi_raw 使用 Spark 的 readStream 方法從指定的路徑讀取 CSV 格式的資料,並傳回一個串流 DataFrame。其中,.schema(schema) 指定了資料的結構描述,.format("csv") 指定了資料的格式,.option("header", True) 表示 CSV 檔案包含標頭。
透過在函式上新增 @dlt.table() 裝飾器,我們將此函式宣告為 DLT 的一部分,用於建立 Delta Live Table 並將其新增到資料管線的資料流圖中:
@dlt.table(
comment="The raw NYC taxi cab trip dataset located in `/databricks-datasets/`"
)
def yellow_taxi_raw():
path = "/databricks-datasets/nyctaxi/tripdata/yellow"
return (spark.readStream
.schema(schema)
.format("csv")
.option("header", True)
.load(path))
內容解密:
@dlt.table() 裝飾器用於宣告 DLT 表格,comment 引數提供了對該表格的描述。這樣,當執行 notebook cell 時,Databricks Data Intelligence Platform 將檢測到 DLT 表格,並提示建立新的 DLT 管線。
建立第一個 Delta Live Tables 管線
點選 Create Pipeline 按鈕以生成新的 DLT 管線。為資料管線指定一個有意義的名稱,例如 Yellow Taxi Cab Pipeline。選擇 Core 作為產品版本,並將管線執行模式設定為 Triggered。
在 Target Location 設定下,選擇 Unity Catalog 單選按鈕,並指定目標目錄和結構描述以儲存資料集。在 Compute 設定下,將 Min workers 和 Max workers 都設定為 1。然後,點選 Create 按鈕接受預設設定。最後,點選 Start 按鈕執行資料管線。
資料轉換與應用變更
本章節將探討如何使用 DLT 從多種輸入來源引入資料,並將變更應用於下游資料集。我們將使用 APPLY CHANGES 命令來高效、準確地應用變更。
從輸入來源引入資料
DLT 簡化了從各種輸入來源引入資料的過程,無論是存放在雲端儲存中的檔案還是連線到外部儲存系統(如關聯式資料函式倉管理系統(RDBMS))。
將變更應用於下游表格
使用 APPLY CHANGES 命令,可以高效地將輸入資料來源的變更應用於下游資料集。
釋出資料集到 Unity Catalog
DLT 支援將資料集釋出到 Unity Catalog,以便進行進一步的分析和處理。
資料管線設定
本章節還將探討 DLT 管線的高階設定,包括如何最佳化底層資料集。
實作練習:應用 SCD Type 2 變更
本章節提供了一個實作練習,用於演示如何使用 DLT 應用 SCD Type 2 變更。
使用Delta Live Tables進行資料轉換
從輸入來源擷取資料
Delta Live Tables(DLT)使得從多種輸入來源擷取資料變得簡單。例如,DLT可以有效地處理一天中陸續抵達雲端儲存位置的新檔案、透過連線外部儲存系統(如關聯式資料函式庫)擷取結構化資料,或讀取可儲存在記憶體中的靜態參考表。讓我們來看看如何使用DLT增量式地擷取抵達雲端儲存位置的新資料。
使用Databricks Auto Loader擷取資料
Databricks Data Intelligence Platform的一個關鍵功能是Auto Loader,它是一種簡單而強大的擷取機制,用於有效地從雲端儲存讀取輸入檔案。可以在DataFrame定義中透過使用cloudFiles資料來源來參照Auto Loader。例如,以下程式碼片段將使用Databricks Auto Loader功能來擷取從儲存容器中新抵達的JSON檔案:
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", schema_path)
.load(raw_data_path))
內容解密:
.format("cloudFiles"):指定使用Auto Loader來讀取雲端檔案。.option("cloudFiles.format", "json"):設定輸入檔案的格式為JSON。.option("cloudFiles.schemaLocation", schema_path):指定儲存推斷出的schema位置。.load(raw_data_path):從指定的路徑載入資料。
Auto Loader可以擴充套件以有效地處理雲端儲存中的數十億個檔案。Databricks Auto Loader支援擷取儲存在CSV、JSON、XML、Apache Parquet、Apache Avro和Apache Orc檔案格式中的檔案,以及文字和二進位檔案。此外,在前面的程式碼片段中,您可能注意到沒有為輸入串流指定schema定義,而是指定了目標schema位置。這是因為Auto Loader將自動推斷資料來源schema,並在單獨的儲存位置跟蹤schema定義的變更。在幕後,Auto Loader將抽樣最多前1,000個雲端檔案物件以推斷雲端檔案來源的schema結構。對於像JSON這樣的半結構化格式,schema可能會隨著時間而改變,這可以大大減輕資料工程團隊維護最新schema定義的負擔。
結構化串流中的可擴充套件性挑戰
傳統上,使用Spark Structured Streaming來擷取新檔案(其中檔案被附加到雲端儲存位置)的資料管線在資料量增長到GB甚至TB時難以擴充套件。隨著新檔案被寫入雲端儲存容器,Structured Streaming將執行目錄清單。對於大型資料集(即由數百萬個或更多檔案組成的資料集),僅目錄清單過程就需要花費大量時間。此外,雲端提供商將評估這些目錄清單呼叫的API費用,增加整體雲端提供商費用。對於已經處理過的檔案,這種目錄清單既昂貴又低效。
使用通知模式提高可擴充套件性
Databricks Auto Loader支援兩種型別的雲端檔案偵測模式 – 通知模式和舊式目錄清單模式。在通知模式下,Auto Loader透過在幕後自動佈署更可擴充套件的架構,完全繞過了昂貴的目錄清單過程。只需幾行Python程式碼,Databricks預先提供後端雲端服務,這些服務將自動跟蹤已登入到雲端儲存的新檔案以及已處理的檔案。
通知模式下的Auto Loader架構
圖表翻譯: 此圖示展示了在通知模式下,Databricks Auto Loader如何使用事件串流來跟蹤雲端儲存中新未處理的檔案。
- 該過程從Databricks Auto Loader監聽特定雲端儲存路徑中的新檔案物件建立事件(也稱為PUT事件,以用於建立物件的HTTP動詞命名)開始。
- 當建立新檔案物件時,有關該新檔案的中繼資料將持久化到鍵值儲存中,如果發生系統故障,則該儲存作為檢查點位置。
- 接下來,與檔案物件相關的資訊將釋出到
cloudFiles資料來源讀取的事件串流中。 - 從事件串流讀取時,Databricks中的Auto Loader程式將僅擷取與雲端儲存中新的、未處理的檔案物件相關的資料。
- 最後,Auto Loader程式將更新鍵值儲存,將新檔案標記為系統中已處理的檔案。
這種根據通知的檔案處理實作避免了昂貴且低效的目錄清單過程,確保該過程可以從故障中還原,並且檔案只被處理一次。
將Auto Loader與DLT結合使用
Databricks Auto Loader可用於在DLT管線中建立串流表格。現在我們知道了背後發生的事情,只需幾行Python程式碼就可以建立一個強壯、可擴充套件的串流表格,可以擴充套件到數十億個檔案。事實上,對於將新檔案附加到雲端儲存的資料來源,始終建議使用Auto Loader來擷取資料。讓我們將前一節中的串流DataFrame定義與DLT資料集註解結合起來,在我們的管線中定義一個新的資料串流:
@dlt.table(
comment="原始雲端檔案串流,完成的計程車行程"
)
def yellow_taxi_events_raw():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.path", schema_path)
.load(raw_landing_zone_path))
內容解密:
@dlt.table:定義一個DLT表格。comment:為表格新增註解。spark.readStream.format("cloudFiles"):使用Auto Loader讀取雲端檔案。.option("cloudFiles.format", "json"):指定輸入檔案格式為JSON。.load(raw_landing_zone_path):從指定的路徑載入資料。
需要注意的是,在前面的程式碼片段中,我們提供了兩個雲端儲存路徑。第一個儲存路徑schem_path指的是將寫入schema資訊和鍵值儲存的雲端儲存路徑。第二個儲存位置raw_landing_zone_path指向外部資料來源將寫入新的、未處理的檔案的位置。
對下游表格應用變更
傳統上,Delta Lake提供了一個MERGE INTO命令,允許變更資料擷取透過匹配特定條件合併到目標表格中。但是,如果新資料碰巧是亂序的,則合併的變更將導致錯誤的結果,從而導致輸出不準確和誤導。為瞭解決這個問題,資料工程團隊需要建立複雜的對帳流程來處理亂序資料,從而在資料管線中新增另一層來管理和維護。
Delta Live Tables 資料轉換與治理
Delta Live Tables(DLT)提供了一種自動化處理資料變更的方法,能夠根據一或多個序列欄位處理亂序資料。DLT 允許資料工程團隊在資料管道中更新下游資料集,以反映上游資料來源的變更。
應用變更資料擷取
DLT 提供了 Python API 和 SQL 語法來應用變更資料擷取:
- APPLY CHANGES:用於使用 SQL 語法撰寫的管道
- apply_changes():用於使用 Python 撰寫的管道
程式碼範例:應用 SCD Type 2 變更
import dlt
import pyspark.sql.functions as F
dlt.create_streaming_table("iot_device_temperatures")
dlt.apply_changes(
target = "iot_device_temperatures",
source = "smart_thermostats",
keys = ["device_id"],
sequence_by = F.col("sequence_num"),
apply_as_deletes = F.expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequence_num"],
stored_as_scd_type = "2"
)
內容解密:
- dlt.create_streaming_table(“iot_device_temperatures”):建立一個名為
iot_device_temperatures的串流表格,用於儲存物聯網裝置的溫度資料。 - dlt.apply_changes():應用變更資料擷取至
iot_device_temperatures表格。- target:指定目標表格名稱。
- source:指定來源資料的表格名稱。
- keys:定義用於識別唯一列的鍵值欄位。
- sequence_by:指定用於排序變更的欄位。
- apply_as_deletes:定義如何應用刪除操作。
- except_column_list:指定在結果表格中排除的欄位。
- stored_as_scd_type:指定儲存為 SCD Type 2,表示保留歷史版本。
DLT 調解流程
DLT 在後台建立兩個資料集物件來準確應用表格變更:
- 隱藏的後端 Delta 表格:包含完整的變更歷史,用於執行調解流程,能夠處理亂序的列更新。
- 檢視(View):包含應用所有變更後的最新表格快照,使用指定的鍵值欄位來唯一識別每一列,並根據
sequence_by引數排序變更。
釋出資料集至 Unity Catalog
DLT 提供兩種方法來儲存資料集:舊版的 Hive Metastore 和 Unity Catalog。Unity Catalog 是集中式的治理儲存,跨越特定全球區域內的所有 Databricks 工作區。
為何選擇 Unity Catalog?
- 資料預設安全。
- 跨群組和使用者的一致存取策略定義。
- 開源技術,無供應商鎖定風險。
Unity Catalog 提供 Hive 相容的 API,允許第三方工具與其整合。
建立新目錄
Unity Catalog 引入了三層名稱空間:目錄(Catalog)、結構(Schema 或 Database)和表格。目錄是一個邏輯容器,可包含一到多個結構或資料函式庫。
@startuml
skinparam backgroundColor #FEFEFE
skinparam componentStyle rectangle
title Delta Live Tables 資料轉換應用
package "系統架構" {
package "前端層" {
component [使用者介面] as ui
component [API 客戶端] as client
}
package "後端層" {
component [API 服務] as api
component [業務邏輯] as logic
component [資料存取] as dao
}
package "資料層" {
database [主資料庫] as db
database [快取] as cache
}
}
ui --> client : 使用者操作
client --> api : HTTP 請求
api --> logic : 處理邏輯
logic --> dao : 資料操作
dao --> db : 持久化
dao --> cache : 快取
note right of api
RESTful API
或 GraphQL
end note
@enduml
圖表翻譯: 此圖示展示了 Unity Catalog 的三層名稱空間結構,從 Unity Catalog 到 Catalog,再到 Schema/Database,最後到 Table。這種結構使得資料的管理和治理更加清晰和有組織。