返回文章列表

高效能資料管線開發環境建置

本文探討如何建置高效能資料管線開發環境,涵蓋雲端資源使用、混合式開發、設定指令碼、資源清理、容器化等導向,並以程式碼範例說明模擬API回應、模組化設計、單一職責原則等最佳實務,同時討論如何管理不同編碼環境,例如使用小工具、網頁UI和IaC等策略,最終目標是開發易於變更、可擴充套件且穩定的資料管線。

資料工程 軟體開發

隨著資料規模和複雜度的提升,建置一個穩健且高效的資料管線開發環境至關重要。本文將探討如何有效利用雲端資源、容器化技術和軟體開發最佳實務,開發一個易於維護、測試和擴充套件的資料管線開發環境。首先,我們將探討如何控制雲端資源成本,並利用自動化工具進行資源清理。接著,我們將介紹混合式開發方法,結合本地開發和雲端服務的優勢。此外,我們還會探討如何使用設定指令碼和模擬選項來簡化開發流程。最後,我們將討論如何利用容器化技術,例如 Docker 和 Kubernetes,構建一致且可移植的開發環境,並確保程式碼在不同環境中都能穩定執行。

開發環境的最佳實踐

在開發過程中,建立一個有效的開發環境至關重要。一個好的開發環境不僅能提高開發效率,還能降低成本並減少除錯的複雜性。在本章中,我們將探討如何設定一個有效的開發環境,並討論一些常見的問題和解決方案。

雲端資源的使用

在開發過程中,使用雲端資源可以提供很大的便利性。然而,如果不加以控制,雲端資源的使用可能會導致成本迅速增加。圖 5-8 展示了一個極端的例子,其中大量的雲端資源被用來執行一個小型的範例資料集。

問題與挑戰

  • 雲端資源的使用導致成本增加
  • 除錯變得更加複雜,因為日誌檔案難以存取
  • 如果環境建立指令碼未能成功建立所有資源,需要手動刪除個別服務
  • 容量問題可能導致叢集無法執行,從而無法進行管道測試

解決方案

  • 使用容器化和良好的編碼實踐來減少對雲端資源的依賴
  • 設計程式碼時考慮到本地執行和測試的需求
  • 使用存根(stub)和模擬(mock)來測試程式碼

混合式開發方法

在某些情況下,可能需要連線到外部服務作為本地開發的一部分。例如,在一個分析管道中,可能需要從內部攝取流程的結果中取得資料。可以透過連線到源資料函式庫或將資料寫入臨時檔案來解決這個問題。

優點

  • 加快開發速度
  • 減少對雲端資源的依賴
  • 降低成本

實作方法

  • 使用本地容器或連線到外部服務
  • 提供替代的資料函式庫憑據以便在本地開發中使用
  • 使用模擬 API 回應來加速開發和測試

設定指令碼和模擬選項

在微服務環境中,內部 API 和資料來源被用作資料轉換過程的一部分。可以透過設定指令碼來填充本地資料函式庫容器,以便使用 API。

例子

  • 使用設定指令碼填充本地資料函式庫容器
  • 新增模擬 API 回應的選項以加速本地開發

資源清理

在開發和測試過程中,未使用的資源可能會累積,從而導致成本增加。設定自動終止和自動縮放可以幫助清理閒置資源並降低成本。

方法

  • 設定自動終止以清理閒置資源
  • 使用自動縮放在閒置時間減少容量

程式碼範例:使用 Plantuml 圖表展示混合式開發方法

圖表翻譯: 此圖示展示了混合式開發方法的流程。本地開發可以連線到外部服務或使用本地容器。外部服務可以提供替代的資料函式庫憑據,而本地容器可以加速開發和測試。

程式碼範例:模擬 API 回應

import unittest
from unittest.mock import Mock

def fetch_data_from_api():
    # 模擬 API 回應
    api_response = Mock()
    api_response.json.return_value = {'data': 'example'}
    return api_response.json()

class TestDataFetching(unittest.TestCase):
    def test_fetch_data(self):
        data = fetch_data_from_api()
        self.assertEqual(data, {'data': 'example'})

if __name__ == '__main__':
    unittest.main()

內容解密:

  1. fetch_data_from_api 函式模擬了一個 API 回應,使用 Mock 物件傳回一個包含範例資料的 JSON 回應。
  2. TestDataFetching 類別定義了一個單元測試,測試 fetch_data_from_api 函式是否正確傳回了模擬的 API 回應。
  3. unittest.main() 執行了單元測試。

這個範例展示瞭如何使用模擬 API 回應來加速本地開發和測試。

高效率資料管線開發環境建置

在資料管線的開發過程中,建立一個完善的環境架構至關重要。這不僅能提升開發效率,還能有效控制成本,並確保資料處理流程的穩定性和可擴充套件性。

自動化與成本控制

為了降低雲端資源的浪費,開發者可以利用 Kubernetes 的 Pod 自動縮放功能,在閒置時縮減資源使用。此外,善用閒置資源監控工具,可以主動發現並終止不再使用的資源。許多雲端資料函式庫也提供自動擴充套件功能,例如 DynamoDB 和 RDS 的讀寫容量調整,能在非使用期間降低資源消耗。

資源標記與自動清理

為雲端資源新增標籤和標記,有助於識別開發相關資源,並進行自動清理。例如,設定自動化指令碼定期檢查並終止過期的 EMR 叢集。然而,需要注意的是,部分小時計費、啟動和終止時間仍會產生費用,因此需合理規劃終止時機。

本地開發與測試

為了提高開發效率,開發者可以使用 LocalStack 在本地模擬雲端服務,避免頻繁呼叫雲端資源。這不僅能加快開發速度,還能節省雲端費用。在 CI/CD 流程中,也可以先在本地執行單元測試,再提交到雲端執行。

環境規劃與管理

資料管線的開發需要結合軟體開發生命週期與資料規模,逐步測試功能性和效能。建立一個長期執行的預發布環境,可以幫助觀察資源爭用或串流延遲等問題,從而提前發現基礎設施問題。

共用環境與暫時環境

規劃環境時,應考慮如何在滿足測試需求的同時,平衡雲端資源成本和基礎設施工程開銷。可以透過分享環境或使用暫時環境來降低成本和維護負擔。暫時環境尤其適用於開發和測試階段,用完即丟棄,能有效減少資源浪費。

容器化開發環境

使用容器化技術(如 Docker)可以將資料管線拆解為獨立服務,並減少開發與生產環境之間的差異。透過 Docker Compose,可以掛載原生程式碼進行測試,確保環境的一致性。

環境一致性與安全性

為了保持環境的一致性,建議定期清理本地容器並重新提取或重建映像檔。此外,使用 .env 檔案管理敏感資訊,避免將機密資料提交到程式碼倉函式庫,能提高安全性。

共用組態與擴充套件性

使用共用的 Docker Compose 檔案,可以減少團隊間的組態差異,並透過 YAML 錨點和擴充套件欄位分享設定,提升可維護性。同時,考慮在測試環境中使用替代方案,例如使用 Postgres 容器代替 Cloud SQL,或將檔案寫入本地而非雲端儲存,以降低雲端資源使用。

自動終止與資源管理

在開發過程中建立的雲端資源應及時清理,以避免不必要的費用。為資源新增標籤和標記,便於識別和自動清理。同時,利用自動終止和自動擴充套件功能,可以進一步降低成本。

總之,建立一個完善的資料管線開發環境,需要綜合考慮開發、測試和佈署的需求,並透過容器化、自動化和資源管理等技術手段,提高開發效率,降低成本,確保資料處理流程的穩定性和可擴充套件性。接下來的章節將探討如何構建靈活的程式碼函式庫,以應對資料管線開發中的變化。

軟體開發策略:開發易於變更的資料管執行緒式碼

在《The Pragmatic Programmer》(20週年版)這本文中,David Thomas和Andrew Hunt提出了一個重要的觀念:程式碼應該是「易於變更」(Easy To Change, ETC)。這個觀念在資料管線開發中尤其重要,因為資料管線需要支援多方面的變化,包括資料大小、格式、形狀、資料擷取和儲存方式,以及資料轉換和驗證需求的演變。

面對多變的資料管線環境

在開發資料管線時,我們會遇到不同的程式設計環境,包括IDE、筆記本介面(如Jupyter)和雲端服務的網頁UI(如AWS Lambda和AWS Glue)。如何有效地管理這些不同環境中的程式碼,是一個很大的挑戰。從可靠性角度來看,程式碼應該遵循軟體開發的最佳實踐,進行原始碼控制和測試。然而,對於筆記本和網頁UI來說,這些操作可能很困難或根本不可能,從而增加了錯誤和bug的風險。

以多模態管線為例

曾經有一個資料管線專案,同時使用了筆記本、IDE和AWS Lambda網頁UI三種環境。分析師團隊使用Databricks筆記本開發了管線的轉換邏輯,該邏輯執行在資料倉儲上,並將轉換結果儲存到S3,觸發了Lambda函式。Lambda函式啟動了一個EMR叢集,進行第二步資料轉換,然後將資料載入DynamoDB。

筆記本的優缺點

筆記本適合用於協作和原型設計,能夠快速測試查詢並提供即時反饋。然而,筆記本程式碼的測試相對困難,因此建議將關鍵程式碼封裝成可測試的套件。此外,筆記本的原始碼控制仍存在一些不足。

開發模組化且可組態的程式碼函式庫

為了應對資料管線設計中的變化,我們需要採用軟體工程的最佳實踐,開發模組化的程式碼函式庫。可組態的設計是一種強大的技術,可以幫助我們快速佈署新功能,同時降低引入bug的風險。

模組化設計

模組化設計的核心是將程式碼分解成獨立、可重複使用的模組。每個模組負責特定的功能,並透過清晰的介面與其他模組互動。這種設計方式使得我們可以獨立地開發、測試和維護每個模組,從而提高了程式碼的可維護性和可擴充套件性。

可組態設計

可組態設計則是透過引數或組態檔案來控制程式碼的行為,從而實作動態調整功能。這種方式使得我們可以在不修改程式碼的情況下,根據不同的需求或環境進行調整,大大提高了程式碼的靈活性和可重用性。

程式碼結構與範例

# 定義一個簡單的資料轉換模組
def transform_data(data):
    # 資料轉換邏輯
    transformed_data = data.upper()
    return transformed_data

# 使用模組進行資料轉換
data = "example data"
transformed_data = transform_data(data)
print(transformed_data)

內容解密:

  • transform_data函式接受一個data引數,並將其轉換為大寫。
  • 這個函式代表了一個簡單的資料轉換邏輯,可以根據實際需求進行擴充套件。
  • 透過將資料轉換邏輯封裝在獨立的函式中,我們可以輕鬆地重用和測試這段程式碼。

管理不同編碼環境的策略

在軟體開發過程中,管理不同編碼環境是一個重要的課題。隨著雲端運算和無伺服器架構的興起,開發人員需要能夠靈活地在不同環境中佈署和管理程式碼。本文將探討一些用於管理不同編碼環境的策略,包括使用小工具(Widgets)、網頁使用者介面(Web UIs)和基礎設施即程式碼(Infrastructure as Code, IaC)。

使用小工具(Widgets)

小工具提供了一種在筆記本中指定值而不必修改程式碼的方法,使得開發人員能夠在不同輸入下執行筆記本。小工具在 Jupyter 和 Databricks 筆記本中都得到了支援。

小工具範例

在一個執行查詢效能分析的筆記本中,小工具允許使用者指定時間範圍和查詢名稱,以便檢查查詢的效能,而無需修改程式碼。這種方法使得筆記本能夠被用於各種不同的使用案例。

import ipywidgets as widgets

# 建立一個文字輸入小工具
text_widget = widgets.Text(
    value='',
    placeholder='輸入名稱',
    description='名稱:',
    disabled=False
)

# 顯示小工具
display(text_widget)

內容解密:

這段程式碼建立了一個文字輸入小工具,使用者可以在其中輸入名稱,而無需修改筆記本中的程式碼。小工具的值可以透過 text_widget.value 取得。

網頁使用者介面(Web UIs)

像 AWS Lambda 這樣的服務提供了網頁使用者介面,允許開發人員直接在瀏覽器中編寫程式碼。這種方法可以快速啟動和執行,但也可能導致立即佈署錯誤程式碼。

網頁使用者介面的優缺點

網頁使用者介面允許開發人員指定不同服務之間的關係,例如在 Lambda UI 中點選 SNS 觸發器並寫入應觸發 Lambda 函式的 SNS 主題名稱。然而,這種方法可能會建立難以追蹤的依賴關係。

import boto3

# 建立一個 Lambda 使用者端
lambda_client = boto3.client('lambda')

# 建立一個 SNS 使用者端
sns_client = boto3.client('sns')

# 指定 SNS 主題名稱
sns_topic_name = 'my-sns-topic'

# 建立 SNS 主題
response = sns_client.create_topic(Name=sns_topic_name)

# 取得 SNS 主題 ARN
sns_topic_arn = response['TopicArn']

# 建立 Lambda 函式
lambda_function_name = 'my-lambda-function'
lambda_client.create_function(
    FunctionName=lambda_function_name,
    Runtime='python3.8',
    Role='arn:aws:iam::123456789012:role/lambda-execution-role',
    Handler='index.handler',
    Code={'S3Bucket': 'my-bucket', 'S3ObjectKey': 'lambda-code.zip'}
)

# 建立 SNS 觸發器
lambda_client.create_event_source_mapping(
    EventSourceArn=sns_topic_arn,
    FunctionName=lambda_function_name
)

內容解密:

這段程式碼使用 AWS SDK for Python(Boto3)建立了一個 Lambda 函式和一個 SNS 主題,並將 SNS 主題指定為 Lambda 函式的觸發器。這種方法比使用網頁使用者介面更具可控性和可重複性。

基礎設施即程式碼(IaC)

基礎設施即程式碼是一種透過組態檔案來管理和佈署雲端基礎設施的方法,例如儲存桶、無伺服器函式和叢集。這種方法可以提高佈署的一致性和可重複性。

IaC 的優點

IaC 可以快速佈署多個環境,例如開發、測試和生產環境,並且可以輕鬆地回復到先前的版本。

AWSTemplateFormatVersion: '2010-09-09'
Transform: 'AWS::Serverless-2016-10-31'
Resources:
  MyLambdaFunction:
    Type: 'AWS::Serverless::Function'
    Properties:
      FunctionName: !Sub 'my-lambda-function-${AWS::Region}'
      Runtime: python3.8
      Handler: index.handler
      CodeUri: s3://my-bucket/lambda-code.zip
      Role: !GetAtt 'MyLambdaExecutionRole.Arn'
  MyLambdaExecutionRole:
    Type: 'AWS::IAM::Role'
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          -
            Effect: Allow
            Principal:
              Service:
                - lambda.amazonaws.com
            Action:
              - 'sts:AssumeRole'
      Policies:
        -
          PolicyName: execution-role-policy
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              -
                Effect: Allow
                Action:
                  - 'logs:CreateLogGroup'
                  - 'logs:CreateLogStream'
                  - 'logs:PutLogEvents'
                Resource: !Sub 'arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/lambda/*'

內容解密:

這段程式碼使用 AWS Serverless Application Model(SAM)定義了一個 Lambda 函式和其執行角色。SAM 可以將程式碼上傳到 S3,並建立必要的 IAM 角色和策略。

軟體開發策略:模組化設計與單一職責原則

在處理資料處理管線時,開發人員經常會遇到程式碼難以變更的問題。就像圖 6-3 中的機器學習(ML)引擎與執行層緊密耦合的例子一樣,當系統需求變化時,緊密耦合的程式碼會導致測試、偵錯和維護變得更加困難。本文將探討如何透過模組化設計和單一職責原則來改善程式碼的可維護性和可擴充套件性。

圖 6-3 與圖 6-4:系統演進過程

在圖 6-3 中,ML 引擎與執行引擎緊密耦合,這使得系統在初期能夠快速上線,但隨著「股票問題」功能變得非常流行,團隊決定將其擴充套件為主要功能。這導致系統架構發生變化,如圖 6-4 所示,執行引擎增加了更多介面和資料來源,從而增加了系統的複雜性和潛在的故障點。

@startuml
skinparam backgroundColor #FEFEFE
skinparam componentStyle rectangle

title 高效能資料管線開發環境建置

package "Docker 架構" {
    actor "開發者" as dev

    package "Docker Engine" {
        component [Docker Daemon] as daemon
        component [Docker CLI] as cli
        component [REST API] as api
    }

    package "容器運行時" {
        component [containerd] as containerd
        component [runc] as runc
    }

    package "儲存" {
        database [Images] as images
        database [Volumes] as volumes
        database [Networks] as networks
    }

    cloud "Registry" as registry
}

dev --> cli : 命令操作
cli --> api : API 呼叫
api --> daemon : 處理請求
daemon --> containerd : 容器管理
containerd --> runc : 執行容器
daemon --> images : 映像檔管理
daemon --> registry : 拉取/推送
daemon --> volumes : 資料持久化
daemon --> networks : 網路配置

@enduml

圖表翻譯: 此圖示展示了 ML 引擎與執行引擎之間的緊密耦合關係,以及執行引擎如何與多個資料來源進行互動。

模組化設計

模組化設計的核心思想是將程式碼視為可重用的建構塊,就像 LEGO 磚塊一樣。LEGO 磚塊具有標準化的介面,可以相互連線,形成穩定的結構。透過模組化設計,程式碼變得更容易維護、測試和擴充套件。

單一職責原則

單一職責原則(SRP)鼓勵開發人員將程式碼的功能限制在一個明確定義的範圍內。這有助於隔離不同的功能,提高模組化、可測試性和可讀性,同時減少錯誤和維護負擔。

程式碼範例:create_aggregate_data 方法

def create_aggregate_data(data):
    bird_agg = aggregate_by_species(data)
    key = f"aggregate_data_{datetime.utcnow().isoformat()}.json"
    s3.put_object(Bucket="bucket_name", Key=key, Body=bytes(json.dumps(bird_agg)))

內容解密:

  1. 聚合資料aggregate_by_species 方法根據物種聚合資料。
  2. 儲存資料:將聚合後的資料儲存到 S3 儲存桶中。
  3. 問題:該方法同時負責資料聚合和儲存,違反了單一職責原則。

重構後的程式碼

def write_to_s3(data, key):
    """將資料寫入 S3 儲存桶"""
    s3 = boto3.client('s3', region_name='us-east-1')
    s3.put_object(Bucket="bucket_name", Key=key, Body=data)

def create_species_agg(data):
    """根據物種聚合資料並寫入 S3"""
    bird_agg = aggregate_by_species(data)
    key = f"aggregate_data_{datetime.utcnow().isoformat()}.json"
    write_to_s3(bytes(json.dumps(bird_agg)), key)

內容解密:

  1. write_to_s3 方法:封裝了將資料寫入 S3 的邏輯,使其獨立於資料聚合邏輯。
  2. create_species_agg 方法:負責根據物種聚合資料,並呼叫 write_to_s3 將結果寫入 S3。
  3. 改進:透過將不同的功能分離到不同的函式中,程式碼變得更加模組化和易於維護。