在資料湖架構中,利用 S3 作為儲存層,搭配 Lambda 函式進行資料轉換,並整合 Glue 資料目錄管理中繼資料,已是常見的資料處理模式。本文將逐步說明如何設定 S3 觸發 Lambda 函式,自動將上傳的 CSV 檔案轉換為 Parquet 格式,並註冊至 Glue 資料目錄,提升資料處理效率與資料管理能力。此流程包含建立 S3 儲存桶、設定 IAM 角色與策略、撰寫 Lambda 函式程式碼,以及設定 S3 事件觸發器等步驟,確保資料能被有效地轉換與管理。
在 S3 儲存桶接收新檔案時觸發 AWS Lambda 函式的實作
建立新的 Amazon S3 儲存桶
首先,我們需要建立兩個 Amazon S3 儲存桶,分別作為原始資料的存放區(landing zone)以及經過處理後的資料存放區(clean zone)。這種多層儲存架構在資料湖(Data Lake)設計中相當常見,用於管理資料的不同處理階段。
- 登入 AWS 管理控制檯,確保位於所選擇的區域。
- 在頂部搜尋欄中搜尋並選擇 S3 服務,然後點選「建立儲存桶」。
- 為來源儲存桶命名,例如
dataeng-landing-zone-<initials>,此儲存桶將用於上傳檔案並觸發 Lambda 函式。- 注意:儲存桶名稱必須全域唯一,而非僅在您的帳戶中唯一。
- 確保儲存桶建立在所選區域,並接受其他預設設定,然後點選「建立儲存桶」。
- 重複上述步驟,建立另一個 S3 儲存桶作為目標儲存桶,例如
dataeng-clean-zone-<initials>,用於存放經過轉換的檔案。
為 Lambda 函式建立 IAM 策略和角色
為了使 Lambda 函式能夠在新檔案上傳至特定 S3 儲存桶時被觸發,我們需要為其組態適當的許可權,包括讀取來源 S3 儲存桶、寫入目標 S3 儲存桶、寫入 CloudWatch 日誌以及存取 Glue API 的許可權。
在 IAM 服務中建立新的策略,選擇 JSON 編輯模式,並輸入以下策略內容:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "logs:PutLogEvents", "logs:CreateLogGroup", "logs:CreateLogStream" ], "Resource": "arn:aws:logs:*:*:*" }, { "Effect": "Allow", "Action": [ "s3:*" ], "Resource": [ "arn:aws:s3:::dataeng-landing-zone-<initials>/*", "arn:aws:s3:::dataeng-landing-zone-<initials>", "arn:aws:s3:::dataeng-clean-zone-<initials>/*", "arn:aws:s3:::dataeng-clean-zone-<initials>" ] }, { "Effect": "Allow", "Action": [ "glue:*" ], "Resource": "*" } ] }- 注意替換
dataeng-landing-zone-<initials>和dataeng-clean-zone-<initials>為實際的儲存桶名稱。
- 注意替換
將策略命名為
DataEngLambdaS3CWGluePolicy並建立。建立新的 IAM 角色,選擇 Lambda 作為信任實體,並附加剛才建立的策略。
將角色命名為
DataEngLambdaS3CWGlueRole。
建立 Lambda 函式
現在,我們將建立一個 Lambda 函式,當 CSV 檔案上傳至來源 S3 儲存桶時,該函式將被觸發,將 CSV 檔案轉換為 Parquet 格式,並寫入目標儲存桶,同時在 Glue 目錄中註冊。
- 在 Lambda 服務中建立新的函式,選擇「從頭開始建立」,並命名函式,例如
CSVtoParquetLambda。 - 選擇 Python 3.8 作為執行環境。
- 在「變更預設執行角色」下,選擇「使用現有的角色」,並選取之前建立的角色
DataEngLambdaS3CWGlueRole。 - 新增 AWS Data Wrangler 層至 Lambda 函式。
- 在程式碼來源區段中,輸入以下程式碼:
import boto3 import awswrangler as wr from urllib.parse import unquote_plus def lambda_handler(event, context): for record in event['Records']: bucket = record['s3']['bucket']['name'] key = unquote_plus(record['s3']['object']['key']) key_list = key.split("/") print(f'key_list: {key_list}') db_name = key_list[len(key_list)-3] table_name = key_list[len(key_list)-2] # 後續程式碼邏輯內容解密:
- 程式碼首先匯入必要的函式庫,包括
boto3(AWS Python SDK)、awswrangler(AWS Data Wrangler)以及unquote_plus(用於處理 URL 編碼)。 lambda_handler是 Lambda 函式的入口點,當函式被觸發時呼叫。該函式從事件資料中提取 S3 儲存桶名稱和物件鍵,並根據物件路徑設定 Glue 目錄的資料函式庫名稱和表名稱。
- 程式碼首先匯入必要的函式庫,包括
設定與測試
完成上述步驟後,您的 Lambda 函式已經設定完成,可以測試其功能。上傳 CSV 檔案至來源 S3 儲存桶,應觸發 Lambda 函式,將檔案轉換為 Parquet 格式並寫入目標儲存桶,同時在 Glue 目錄中註冊。檢查目標儲存桶和 Glue 目錄以驗證結果。
此實作展示瞭如何在 AWS 上利用 S3、Lambda 和 Glue 等服務建立自動化的資料處理流程,為資料工程師提供了強大的工具和靈活性。
AWS 資料工程師工具包:觸發 AWS Lambda 函式處理 S3 上傳檔案
在前面的章節中,我們介紹了多項 AWS 服務,包括資料擷取、轉換和消費服務。現在,我們將動手建立一個解決方案,將 CSV 檔案轉換為 Parquet 格式,並將資料登入到 AWS Glue 資料目錄中。
設定 AWS Lambda 函式
首先,我們需要建立一個 AWS Lambda 函式,該函式將在 S3 儲存桶中上傳新檔案時觸發。以下程式碼顯示了 Lambda 函式的實作:
import boto3
import awswrangler as wr
import pandas as pd
def lambda_handler(event, context):
# 取得 S3 儲存桶和金鑰
bucket = event['Records'][0]['s3']['bucket']['name']
key = event['Records'][0]['s3']['object']['key']
db_name = 'testdb'
table_name = 'csvparquet'
# 輸出偵錯資訊
print(f'Bucket: {bucket}')
print(f'Key: {key}')
print(f'DB Name: {db_name}')
print(f'Table Name: {table_name}')
input_path = f"s3://{bucket}/{key}"
print(f'Input_Path: {input_path}')
output_path = f"s3://dataeng-clean-zone-INITIALS/{db_name}/{table_name}"
print(f'Output_Path: {output_path}')
# 使用 AWS Data Wrangler 讀取 CSV 檔案
input_df = wr.s3.read_csv([input_path])
# 建立 Glue 資料函式庫(如果不存在)
current_databases = wr.catalog.databases()
if db_name not in current_databases.values:
print(f'- Database {db_name} does not exist ... creating')
wr.catalog.create_database(db_name)
else:
print(f'- Database {db_name} already exists')
# 將資料寫入 Parquet 檔案
result = wr.s3.to_parquet(
df=input_df,
path=output_path,
dataset=True,
database=db_name,
table=table_name,
mode="append")
print("RESULT: ")
print(f'{result}')
return result
內容解密:
- 取得 S3 儲存桶和金鑰:從事件記錄中擷取 S3 儲存桶名稱和物件金鑰。
- 輸出偵錯資訊:輸出相關的偵錯資訊,以便在 Lambda 函式日誌中捕捉。
- 使用 AWS Data Wrangler 讀取 CSV 檔案:使用
wr.s3.read_csv方法讀取 CSV 檔案內容,並將其儲存在 Pandas DataFrame 中。 - 建立 Glue 資料函式庫(如果不存在):檢查 Glue 資料函式庫是否存在,如果不存在,則建立它。
- 將資料寫入 Parquet 檔案:使用
wr.s3.to_parquet方法將 DataFrame 中的資料寫入 Parquet 檔案。
設定 Lambda 函式觸發器
- 在 Lambda 函式概覽框中,點選「新增觸發器」。
- 選擇 Amazon S3 服務作為觸發器組態。
- 選擇您的 landing zone 儲存桶。
- 選擇「所有物件建立事件」作為觸發條件。
- 設定字尾為
.csv,以便僅在上傳 CSV 檔案時觸發 Lambda 函式。
測試 Lambda 函式
- 建立一個名為
test.csv的簡單 CSV 檔案。 - 將
test.csv上傳到您的來源 S3 儲存桶。 - 如果一切組態正確,Lambda 函式將被觸發,並將 Parquet 格式的檔案寫入目標 S3 儲存桶。
資料目錄、 安全性和治理
資料安全性是指組織如何保護資料,以確保資料安全儲存(例如加密狀態)並防止未經授權的存取。資料治理則是指確保只有需要存取特定資料集的人員具有存取許可權,並確保組織僅以核准的方式處理個人資料。
AWS Lake Formation 和資料安全性
AWS Lake Formation 提供了一個安全且可擴充套件的資料湖解決方案,可以幫助組織保護和管理其資料。AWS Lake Formation 提供了一系列功能,包括資料加密、存取控制和稽核功能,可以幫助組織滿足其安全性和合規性需求。
設定 Lake Formation 許可權
在本文中,我們將動手設定 Lake Formation 的許可權,以確保只有授權的使用者可以存取特定的資料集。
設定步驟:
- 登入 AWS 管理主控台並導航到 Lake Formation 主控台。
- 建立一個新的資料函式庫和資料表。
- 設定資料表的許可權,以允許特定的使用者或角色存取。
程式碼範例:
import boto3
lakeformation = boto3.client('lakeformation')
# 建立資料函式庫
lakeformation.create_database(
DatabaseInput={
'Name': 'testdb'
}
)
# 建立資料表
lakeformation.create_table(
DatabaseName='testdb',
TableInput={
'Name': 'csvparquet',
'StorageDescriptor': {
'Columns': [
{'Name': 'Name', 'Type': 'string'},
{'Name': 'favorite_num', 'Type': 'int'}
],
'Location': 's3://dataeng-clean-zone-INITIALS/testdb/csvparquet/'
}
}
)
# 設定資料表許可權
lakeformation.grant_permissions(
Principal={
'DataLakePrincipalIdentifier': 'arn:aws:iam::123456789012:user/testuser'
},
Resource={
'Table': {
'DatabaseName': 'testdb',
'Name': 'csvparquet'
}
},
Permissions=['SELECT']
)
內容解密:
- 建立資料函式庫和資料表:使用 Lake Formation 的
create_database和create_table方法建立新的資料函式庫和資料表。 - 設定資料表許可權:使用
grant_permissions方法設定資料表的許可權,以允許特定的使用者或角色存取。
資料安全與治理的重要性
資料安全和治理是現代企業面臨的一大挑戰。資料洩露或未遵循當地法規可能導致巨額罰款和聲譽受損。以2017年Equifax的資料洩露事件為例,該信用機構的資料洩露導致近1.5億人的個人和財務資訊被暴露。最終,Equifax同意支付至少5.75億美元與多個美國政府機構和州政府達成和解。
資料洩露的後果
資料洩露不僅會導致財務損失,還會對企業的聲譽和品牌造成無法估量的損害。一旦失去客戶的信任,將很難重新獲得。
法規遵循的重要性
除了資料洩露外,未能遵守當地法規也可能導致巨額罰款。越來越多的法律規定了公司在收集、儲存和處理個人資訊時的條件。例如,Google因未充分遵守歐盟的《通用資料保護條例》(GDPR)而被罰款超過5000萬歐元。即使Google對該決定提出上訴,但2020年法院維持原判,罰款仍然生效。
常見的資料監管要求
無論在哪裡開展業務,都需要了解並遵守相關的資料隱私和保護法規。以下是一些例子:
- 歐盟的《通用資料保護條例》(GDPR)
- 美國加利福尼亞州的《加利福尼亞消費者隱私法》(CCPA)和最近透過的《加利福尼亞隱私權法》(CPRA)
- 印度的《個人資料保護法案》(PDP Bill)
- 南非的《個人資訊保護法》(POPIA)
這些法律涉及個人有權瞭解公司持有關於他們的哪些資料,確保對個人資訊的充分保護,強制對資料處理實施嚴格控制,以及在某些情況下,個人有權要求從公司的系統中刪除其資料。
行業特定法規
除了廣泛的資料保護和隱私法規外,許多法規還對特定行業或功能施加額外的要求。例如:
- 《健康保險可攜性和責任法案》(HIPAA),適用於儲存個人醫療保健和醫療資料的組織
- 《支付卡行業資料安全標準》(PCI DSS),適用於儲存和處理信用卡資料的組織
瞭解這些法規的要求並遵守它們通常是複雜且耗時的。
資料保護核心概念
有幾個與保護資料相關的概念和術語對於資料工程師來說是重要的。以下是一些簡要定義:
個人可識別資訊(PII)
個人可識別資訊(PII)是北美常用的一個術語,用於指代可用於識別個人的任何資訊。這可以指單獨的資訊能夠識別個人,也可以指與其他可連結資訊結合起來識別個人的資訊。它包括諸如全名、社會安全號碼、IP地址、照片或影片等資訊。
PII 內容解密:
PII涵蓋了能夠提供有關個人特定方面的資訊,例如醫療狀況、位置或政治歸屬。確保PII的安全對於遵守法規和保護個人隱私至關重要。
與 DPO 和 CISO 合作
如果您的組織有資料保護官(DPO),請確保與DPO安排時間會面,以充分了解可能適用於您的組織的法規及其對分析資料的影響。或者,與您的首席資訊安全官(CISO)合作,以確保您的組織尋求關於哪些資料法規可能適用的法律建議。
參與合規稽核
如果您必須參與在AWS上執行的分析工作負載的合規稽核,請檢視AWS Artifact服務(https://aws.amazon.com/artifact/),這是一個按需存取AWS合規報告的自助服務門戶。
@startuml
skinparam backgroundColor #FEFEFE
skinparam defaultTextAlignment center
skinparam rectangleBackgroundColor #F5F5F5
skinparam rectangleBorderColor #333333
skinparam arrowColor #333333
title 參與合規稽核
rectangle "瞭解法規要求" as node1
rectangle "保護PII" as node2
rectangle "與DPO或CISO合作" as node3
rectangle "參與合規稽核" as node4
rectangle "降低風險" as node5
rectangle "簡化合規流程" as node6
node1 --> node2
node2 --> node3
node3 --> node4
node4 --> node5
node5 --> node6
@enduml
此圖示內容解密:
此圖示呈現了資料工程師在資料安全與治理中的關鍵角色。首先,資料工程師需要了解並遵守相關法規,如GDPR等,以確保組織的合規性。同時,保護個人可識別資訊(PII)是至關重要的,這需要實施有效的資料保護措施。與DPO或CISO合作能夠確保組織對相關法規的遵循,並降低違規風險。此外,參與合規稽核,如利用AWS Artifact服務,可以簡化合規流程,最終達到避免巨額罰款和聲譽受損的目標。