在資料工程領域中,建構高效能的資料處理管線至關重要。AWS 提供了多種服務,協助資料工程師協調複雜的工作流程並最佳化資料消費。Step Functions 讓開發者能以視覺化方式協調 AWS 服務,簡化除錯流程。MWAA 則提供託管式 Airflow 環境,方便管理和擴充套件資料處理任務。Athena 讓使用者能直接查詢 S3 資料湖,無需管理基礎設施。Redshift 作為雲端原生資料倉儲,搭配 Redshift Spectrum,更能延伸查詢能力至資料湖,滿足不同分析需求。此外,文章也示範如何使用 Lambda 函式自動轉換資料格式,提升資料處理效率。
AWS 資料處理管線協調服務
在現代資料工程中,資料處理管線的協調至關重要。AWS 提供了多種服務來協助資料工程師協調複雜的資料處理工作流程。在本文中,我們將探討 AWS Step Functions 和 Amazon Managed Workflows for Apache Airflow(MWAA)這兩種主要的服務。
使用 AWS Step Functions 協調資料處理工作流程
AWS Step Functions 是一種無伺服器協調服務,可用於協調多個 AWS 服務,以建立複雜的工作流程。Step Functions 提供了一種視覺化的介面來設計和執行工作流程,使得管理和除錯變得更加容易。
以下是一個使用 Step Functions 協調資料處理工作流程的範例:
- 啟動執行: Step Functions 接收到一個啟動執行的請求,例如來自 S3 的新檔案通知。
- Lambda 函式觸發: Step Functions 觸發一個 Lambda 函式,該函式處理輸入檔案並將其轉換為 Parquet 格式。
- 寫入 Parquet 檔案: Lambda 函式將轉換後的資料寫入 S3 中的 Parquet 檔案。
- 執行 AWS Glue Crawler: 由於 Step Functions 目前不支援直接執行 Glue Crawler,因此使用另一個 Lambda 函式來觸發 Crawler 的執行。
- 錯誤處理: 如果工作流程中的任何步驟失敗,Step Functions 將執行錯誤處理步驟,例如傳送通知給資料工程團隊。
程式碼範例:Step Functions 狀態機定義
{
"StartAt": "ProcessFile",
"States": {
"ProcessFile": {
"Type": "Task",
"Resource": "arn:aws:lambda:REGION:ACCOUNT_ID:function:PROCESS_FILE_LAMBDA",
"Next": "RunGlueCrawler"
},
"RunGlueCrawler": {
"Type": "Task",
"Resource": "arn:aws:lambda:REGION:ACCOUNT_ID:function:RUN_GLUE_CRAWLER_LAMBDA",
"Next": "JobFailed"
},
"JobFailed": {
"Type": "Fail",
"Cause": "Job failed"
}
}
}
內容解密:
- StartAt:指定狀態機的起始步驟。
- States:定義狀態機中的各個步驟。
- Type 和 Resource:指定每個步驟的型別和對應的 Lambda 函式 ARN。
Next欄位:定義步驟之間的流程控制。
Amazon Managed Workflows for Apache Airflow(MWAA)
Apache Airflow 是一種流行的開源工作流程管理工具,用於協調複雜的資料工程工作流程。MWAA 是 AWS 提供的一種受管服務,簡化了 Airflow 的佈署和管理。
MWAA 環境包含以下元件:
- 排程器:控制任務的執行時間和地點。
- 工作節點:執行任務的節點,可以根據需求自動擴充套件或縮減。
- 元資料函式庫:儲存任務狀態的資料函式庫。
- 網頁伺服器:提供用於監控和執行任務的網頁介面。
使用 MWAA 的優勢
- 自動擴充套件和縮減工作節點,以滿足需求。
- 簡化了 Airflow 的佈署和管理。
- 與 AWS 服務緊密整合。
資料消費者存取最佳化資料集
一旦資料經過轉換和最佳化,各種資料消費者需要透過不同的介面存取資料。AWS 提供了多種服務來滿足不同資料消費者的需求,例如 Amazon Athena 用於 SQL 查詢。
Amazon Athena 簡介
Amazon Athena 是一種無伺服器的 SQL 查詢服務,允許使用者直接在 S3 資料湖中查詢資料,而無需設定或管理任何基礎設施。
使用 Athena 的好處
- 無伺服器架構,無需管理基礎設施。
- 直接在 S3 資料湖中查詢資料,提高了生產力和靈活性。
- 與 AWS Glue Data Catalog 緊密整合,簡化了資料目錄的管理。
程式碼範例:使用 Athena 查詢 S3 資料
SELECT *
FROM "my_database"."my_table"
WHERE year = '2023';
內容解密:
SELECT陳述式:從指定的資料表中選取所有欄位。FROM子句:指定要查詢的資料表。WHERE子句:根據條件篩選結果。
綜上所述,AWS 提供了多種強大的服務來支援資料工程師協調和管理複雜的資料處理工作流程,並滿足不同資料消費者的需求。無論是使用 Step Functions、MWAA 或 Athena,這些服務都能夠提高資料工程團隊的生產力和效率。
AWS 資料工程師工具箱:資料消費與分析
許多工具都設計用於透過 SQL 與資料介面,而這些工具通常使用 JDBC 或 ODBC 資料函式庫連線來連線到 SQL 資料來源。Amazon Athena 使資料消費者能夠透過 AWS 管理主控台介面,或透過 JDBC 或 ODBC 驅動程式,查詢資料湖(或其他已連線的資料來源)中的資料集。
圖形化 SQL 查詢工具與 Athena 的整合
圖形化的 SQL 查詢工具,例如 SQL Workbench,可以透過 JDBC 驅動程式連線到 Amazon Athena。此外,您也可以透過程式設計方式連線到 Amazon Athena,並透過 ODBC 驅動程式在程式碼中執行 SQL 查詢。
Athena Federated Query:擴充套件資料來源查詢能力
Athena 的一項功能——Athena Federated Query,使您能夠建立聯結器,以便 Athena 能夠查詢 S3 資料湖以外的其他資料來源。Amazon 提供了一系列預建的開源聯結器,用於將 Athena 連線到諸如 Amazon DynamoDB(NoSQL 資料函式庫)等資料來源,以及其他 Amazon 管理的關聯式資料函式庫引擎,甚至是 Amazon CloudWatch Logs(一個集中式日誌服務)。利用此功能,資料消費者可以執行一項使用 Athena 的查詢,從 Amazon DynamoDB 取得活躍訂單,將該資料與在 PostgreSQL 上執行的客戶資料函式庫進行參考,然後從 S3 資料湖中匯入該客戶的歷史訂單資料——所有這些都在一條 SQL 陳述式中完成。
Amazon Redshift 與 Redshift Spectrum:資料倉儲與資料湖倉架構
資料倉儲技術概述
資料倉儲並不是一個新概念或新技術(如我們在第 2 章《分析的資料管理架構》中所討論的),但 Amazon Redshift 是第一個雲端原生資料倉儲。Redshift 於 2012 年推出,到 2015 年時已成為 AWS 增長最快的服務,如今已有成千上萬的客戶在使用它。
Redshift 資料倉儲設計與 OLAP 工作負載
Redshift 資料倉儲專為報告和分析工作負載(通常稱為線上分析處理(OLAP)工作負載)而設計。Redshift 提供了一個叢集環境,使叢集中的所有計算節點能夠協同處理 SQL 查詢所涉及的資料部分,從而為您提供最佳的效能,尤其是在處理高度結構化的資料並需要定期在多個大型表格之間進行複雜連線的場景中。因此,Redshift 是報告和視覺化服務的理想查詢引擎,這些服務需要處理大型資料集。
Redshift 的典型應用場景
針對 Redshift 叢集執行的典型 SQL 查詢可能會檢索資料函式庫中的數百、數千甚至數百萬行資料,通常會在不同的表格之間執行複雜的連線操作,並可能對某些資料欄進行聚合或平均值等計算。對資料倉儲執行的查詢通常用於回答諸如「去年我們商店銷售額在各個美國郵遞區號中的平均銷售金額是多少?」或「在我們所有的商店中,哪些產品在去年第四季度和今年第一季度之間銷售額增長了 20%?」等問題。
現代分析環境中的 Redshift 與 Redshift Spectrum
在現代分析環境中,資料倉儲的一個常見應用場景是根據需要最頻繁查詢的資料和需要最佳效能的查詢,將資料湖中的部分資料載入到倉儲中。在這種情況下,資料工程師可能會建立一個管道,每天將客戶、產品、銷售和庫存資料載入到資料倉儲中。知道 80% 的報告和查詢將根據過去 12 個月的銷售資料,資料工程師還可以設計一個流程,從資料倉儲中移除所有超過 12 個月歷史的資料。
Redshift Spectrum:擴充套件查詢能力至資料湖
但是,那些需要包含超過 12 個月的歷史資料的 20% 查詢怎麼辦?這就是 Redshift Spectrum 的用武之地。Redshift Spectrum 是 Amazon Redshift 的一項功能,讓使用者能夠編寫單一查詢,同時查詢已載入到資料倉儲中的資料以及存在於資料倉儲外部、在資料湖中的資料。為了實作這一點,資料工程師可以組態 Redshift 叢集以連線到 AWS Glue Data Catalog,其中定義了我們資料湖的所有資料函式庫和表格。一旦組態完成,使用者就可以同時參考內部的 Redshift 表格和在 Glue 資料目錄中註冊的表格。
Redshift 與 Redshift Spectrum 架構圖解
以下圖表展示了 Redshift 和 Redshift Spectrum 的架構:
@startuml
skinparam backgroundColor #FEFEFE
skinparam defaultTextAlignment center
skinparam rectangleBackgroundColor #F5F5F5
skinparam rectangleBorderColor #333333
skinparam arrowColor #333333
title Redshift 與 Redshift Spectrum 架構圖解
rectangle "JDBC/ODBC" as node1
rectangle "查詢計畫" as node2
rectangle "查詢" as node3
rectangle "歷史銷售資料" as node4
rectangle "當前銷售資料" as node5
rectangle "結果" as node6
rectangle "最終結果" as node7
node1 --> node2
node2 --> node3
node3 --> node4
node4 --> node5
node5 --> node6
node6 --> node7
@enduml
此圖示說明瞭使用者如何透過 JDBC 或 ODBC 連線到 Redshift Leader Node,並執行跨越 Redshift 表格和 S3 資料湖的查詢。
圖解說明:
使用者透過 SQL 使用者端連線到 Redshift Leader Node,並傳送一條 SQL 陳述式,該陳述式同時查詢
current_sales表格(存在於 Redshift 叢集中的表格,包含過去 12 個月的銷售資料)和historical_sales表格(在 Glue 資料目錄中註冊的表格,其資料檔案位於 Amazon S3 資料湖中,包含過去 10 年的歷史銷售資料)。Leader Node 分析並最佳化查詢,編譯查詢計畫,並將個別的查詢執行計畫推播到叢集中的計算節點。
#### 內容解密:
- 圖表展示了 Redshift 的分散式架構,其中 Leader Node 負責管理和分配查詢任務給各個 Compute Node。
- Compute Node 負責實際執行查詢任務,無論是查詢 Redshift 內部的表格還是 S3 資料湖中的資料。
- 這種架構允許 Redshift 同時處理結構化和非結構化資料,提供高效能的分析能力。
使用AWS服務進行資料消費與視覺化
在資料工程領域,高效的資料消費與視覺化是至關重要的。AWS提供了多種服務來滿足這些需求,包括Amazon Redshift、Amazon QuickSight等。
Amazon Redshift與Redshift Spectrum
Amazon Redshift是一種完全託管的資料倉儲服務,能夠處理PB級別的資料。它不僅能夠高效地儲存和查詢大量資料,還能透過Redshift Spectrum擴充套件其查詢能力至Amazon S3中的資料。
Redshift Spectrum的工作原理
- 當使用者提交查詢請求時,Redshift叢集首先分析查詢並將相關部分推播到Redshift Spectrum進行處理。
- Redshift Spectrum利用其數千個工作節點(根據Amazon EC2例項)掃描、過濾和聚合S3中的資料,並將結果傳回給Redshift叢集。
- Redshift叢集對傳回的結果進行最終處理,如連線和合併資料,最終將結果傳回給使用者的SQL客戶端。
優勢
- 高效查詢:能夠跨Redshift叢集和S3資料湖進行高效查詢。
- 無需管理基礎設施:Redshift Spectrum自動擴充套件,無需使用者管理底層資源。
Amazon QuickSight:資料視覺化的利器
資料視覺化是商業智慧的重要組成部分。Amazon QuickSight是一種快速、雲端原生、無伺服器的BI服務,能夠輕鬆建立和發布互動式儀錶板。
QuickSight的特點
- 互動式視覺化:使用者可以過濾資料、深入分析,獲得更詳細的洞察。
- 無伺服器架構:無需設定或管理伺服器,按使用者型別(創作者或讀者)收費。
- 多源資料接入:支援從多種資料來源接入資料,包括Amazon S3資料湖透過Amazon Athena。
實際應用場景
假設一位銷售經理需要在週一早上9點的會議前快速瞭解上個季度的銷售業績。相比於檢視詳細的銷售資料包表,透過QuickSight生成的圖形化報表能夠讓經理一目瞭然地比較不同地區、不同細分市場的銷售表現,並能深入分析具體資料。
實踐:使用AWS Lambda在S3上傳新檔案時觸發資料轉換
在這一章的實踐環節中,我們將組態一個S3儲存桶,使其在有新檔案上傳時自動觸發一個Lambda函式。該Lambda函式將利用AWS Data Wrangler函式庫將CSV檔案轉換為Parquet格式,並更新AWS Glue Data Catalog。
建立包含AWS Data Wrangler函式庫的Lambda層
- 從GitHub下載特定版本的AWS Data Wrangler函式庫(例如
awswrangler-layer-2.10.0-py3.8.zip)。 - 登入AWS管理控制檯,導航至Lambda服務。
- 在左側選單中選擇“層”,然後點選“建立層”。
- 上傳下載的
.zip檔案,選擇相容的執行時(Python 3.8),並建立層。
組態Lambda函式
透過建立包含AWS Data Wrangler函式庫的Lambda層,我們可以在任何Lambda函式中使用該函式庫,只需將該層附加到函式即可。這樣,我們就能簡化ETL任務,提高資料處理效率。
內容解密:
此步驟中建立的Lambda層使得我們可以在多個Lambda函式中重用AWS Data Wrangler函式庫,無需在每個函式中單獨封裝。該層包含特定版本的Python函式庫,能夠與Python 3.8執行時相容,從而簡化了依賴管理,提高了程式碼復用率。
import awswrangler as wr
def lambda_handler(event, context):
# 從事件中取得S3物件資訊
bucket = event['Records'][0]['s3']['bucket']['name']
key = event['Records'][0]['s3']['object']['key']
# 讀取CSV檔案
df = wr.s3.read_csv(f's3://{bucket}/{key}')
# 轉換為Parquet格式並寫入S3
wr.s3.to_parquet(
df=df,
path=f's3://{bucket}/parquet/{key}.parquet',
dataset=True,
mode='overwrite'
)
return {
'statusCode': 200,
'statusMessage': 'OK'
}
內容解密:
此Lambda函式程式碼實作了當S3儲存桶接收到新檔案時自動觸發CSV到Parquet格式的轉換。首先,它從事件記錄中提取S3儲存桶和物件的鍵值。然後,使用AWS Data Wrangler函式庫讀取CSV檔案並將其轉換為Parquet格式,最後將轉換後的資料寫入S3。該過程自動化了資料格式轉換,提高了資料處理效率,並簡化了後續的分析工作。