返回文章列表

深度學習工作流程協調系統設計與實踐

本文探討深度學習工作流程協調系統的設計與應用,旨在解決原型設計程式碼到生產環境佈署的挑戰。文中分析了工作流程構建的挑戰與頻率,並提出了簡化流程、統一體驗的設計原則。同時,以 Metaflow 為例展示了優秀實踐,並深入解析了工作流程協調的運作機制、設計原則以及 Airflow 和 Argo Workflows

深度學習 系統設計

深度學習模型開發過程中,從實驗到佈署的轉換通常耗時費力。構建穩健高效的工作流程協調系統能有效橋接研發與生產環境的差距,提升模型迭代效率。系統設計需考量資料科學家的使用習慣,降低學習成本,並提供統一的開發與執行體驗。理想的協調系統應具備簡潔易用的工作流程定義方式,同時支援本地除錯和雲端佈署,以滿足快速迭代的需求。

工作流程協調系統的設計與應用

在深度學習系統中,將原型設計程式碼平滑地轉移到生產環境是一個重要的挑戰。資料科學家需要在本地測試演算法,然後將其佈署到生產環境中,但這兩個階段之間的轉換並不順暢。本章將探討工作流程協調系統的設計和應用,以解決這個問題。

工作流程協調的重要性

資料科學家在本地測試和評估演算法後,需要將其佈署到生產環境中。但是,將原型設計程式碼轉換為生產環境中的工作流程並不容易。這不僅需要資料科學家學習新的技能,還需要他們花費大量時間進行除錯和測試。

工作流程構建的挑戰

  1. 學習曲線陡峭:資料科學家需要學習工作流程 DAG 語法、工作流程函式庫、程式設計正規化和故障排除等知識。
  2. 除錯困難:大多數協調系統不支援本地執行,資料科學家需要將工作流程提交到遠端協調系統進行測試,這使得除錯變得困難。

工作流程構建的頻率

  1. 迭代開發:深度學習開發是一個迭代過程,資料科學家需要不斷更新工作流程以測試新的改進。
  2. 頻繁的工作流程構建:由於工作流程構建是一個耗時且繁瑣的過程,因此頻繁的工作流程構建會阻礙開發速度。

工作流程協調系統的設計

本文將從三個步驟來探討工作流程協調系統的設計:首先,使用典型的資料科學家使用者場景來展示協調系統的工作原理;其次,學習通用的協調系統設計;最後,總結構建或評估協調系統的關鍵設計原則。

使用者場景

資料科學家的使用者場景可以分為兩個階段:開發階段和執行階段。

  1. 開發階段:資料科學家將訓練程式碼轉換為工作流程,使用協調系統提供的語法重建工作流程,並設定輸入/輸出引數和操作。
  2. 執行階段:資料科學家將工作流程提交到協調服務,並設定執行排程。

設計原則

  1. 簡化工作流程構建:協調系統應該提供工具幫助資料科學家輕鬆構建工作流程。
  2. 統一使用者經驗:協調系統應該提供統一的使用者經驗,無論是在本地還是在雲端生產環境中。

Metaflow:一個優秀的實踐案例

Metaflow是一個開源函式庫,允許資料科學家透過編寫Python程式碼和使用Python註解來授權工作流程。Metaflow還提供了一個統一的使用者經驗,用於在本地和雲端生產環境中執行模型。

# 定義工作流程 DAG
with DAG(
    description='Vena的樣本訓練工作流程',
    schedule_interval=timedelta(months=1),
    start_date=datetime(2022, 1, 1),
) as dag:
    # 定義每個步驟的執行邏輯
    data_parse_step = BashOperator(......)
    data_augment_step = BashOperator(......)
    dataset_building_step = BashOperator(......)
    training_step = BashOperator(......)
    
    # 宣告步驟依賴關係
    data_parse_step >> data_augment_step >> dataset_building_step >> training_step

內容解密:

此程式碼定義了一個工作流程 DAG,包括四個步驟:資料解析、資料增強、資料集構建和訓練。每個步驟使用 BashOperator 執行 bash 命令。步驟之間的依賴關係使用 >> 運算子定義,表示前一步驟完成後,才會執行下一步驟。

工作流程協調系統的執行階段與設計

執行階段的工作流程管理

在執行階段,工作流程協調服務會執行模型訓練工作流程,如 Vena 的範例所示:

  1. 當 Vena 提交工作流程後,協調服務會將工作流程的有向無環圖(DAG)儲存到資料函式庫中。
  2. 協調服務的排程器元件會檢測到 Vena 的工作流程,並將工作流程中的任務分派給後端工作節點。排程器會確保任務按照工作流程 DAG 中定義的順序執行。
  3. Vena 使用協調服務的網頁介面即時檢查工作流程的執行進度和結果。
  4. 如果工作流程產生了一個良好的模型,Vena 可以將其推廣到預發布和生產環境。如果沒有,Vena 會開始另一個原型設計的迭代。

評估一個協調系統是否適合深度學習的關鍵指標是將原型設計程式碼轉換為工作流程的難易程度。在圖 9.4 中,我們看到 Vena 每次原型設計新想法時,都需要將她的訓練程式碼轉換為工作流程。如果我們能夠減少將深度學習程式碼轉換為工作流程的摩擦,那麼將會節省大量人力成本。

注意: 工作流程應該始終保持輕量化。工作流程用於自動化流程,其目標是將一系列任務分組並連線起來,並按照定義的順序執行。使用工作流程的最大好處是,人們可以分享和重用任務,從而更快地自動化他們的流程。因此,工作流程本身不應該進行任何繁重的計算,真正的工作應該由工作流程中的任務來完成。

一般協調系統的設計

現在讓我們來看看通用的工作流程協調系統。為了幫助您瞭解協調系統的工作原理以及如何研究開源協調系統,我們準備了一個高層級的系統設計。透過忽略詳細實作並只保留核心元件,這個設計適用於大多數協調系統,包括開源系統,這些將在 9.3 節中討論。請參閱圖 9.5 中的設計提案。

一個工作流程協調系統通常由以下五個元件組成:

  • 網頁伺服器:網頁伺服器提供了一個網頁使用者介面和一套網頁 API,讓使用者能夠建立、檢查、觸發和除錯工作流程的行為。
  • 排程器和控制器:排程器和控制器元件做兩件事情。首先,排程器監控系統中的每個活動工作流程,並在合適的時間安排工作流程執行。其次,控制器將工作流程任務分派給工作節點。儘管排程器和控制器是兩個不同的功能單元,但它們通常被一起實作,因為它們都與工作流程執行相關。
  • 元資料資料函式庫:元資料資料函式庫儲存了工作流程的組態、DAG、編輯和執行歷史,以及任務的執行狀態。
  • 工作節點群組:工作節點群組提供了執行工作流程任務的計算資源。工作節點抽象了基礎設施,並且與執行的任務無關。例如,我們可能有不同型別的工作節點,例如 Kubernetes 工作節點和 Amazon Elastic Compute Cloud(EC2)工作節點,但它們都可以執行相同的任務,儘管是在不同的基礎設施上。
  • 物件儲存:物件儲存是所有其他元件分享的檔案儲存,通常建立在雲端物件儲存之上,例如 Amazon Simple Storage Service(S3)。物件儲存的一個用途是任務輸出分享。當一個工作節點執行一個任務時,它從物件儲存中讀取前一個任務的輸出值作為任務輸入;該工作節點還將任務輸出儲存到物件儲存中,以供其後續任務使用。

物件儲存和元資料資料函式庫都可以被協調系統的所有元件存取,包括排程器、網頁伺服器和工作節點元件。具有集中式資料儲存可以解耦核心元件,因此網頁伺服器、排程器和工作節點可以獨立運作。

工作流程是如何執行的?

首先,Vena 定義了工作流程的 DAG。在 DAG 中,Vena 宣告了一組任務,並定義了任務執行順序的控制流程。對於每個任務,Vena 可以使用系統的預設運算元,例如 Shell 命令運算元或 Python 運算元,也可以建立自己的運算元來執行任務。

其次,Vena 透過網頁介面或命令列將工作流程(包含相關程式碼的 DAG)提交給網頁伺服器。該工作流程被儲存在元資料資料函式庫中。

第三,排程器定期(每隔幾秒或幾分鐘)掃描元資料資料函式庫並檢測到新的工作流程;然後它在預定的時間啟動該工作流程。為了執行一個工作流程,排程器呼叫控制器元件,根據 DAG 中定義的任務順序,將工作流程的任務分派給工作節點佇列。

@startuml
skinparam backgroundColor #FEFEFE
skinparam defaultTextAlignment center
skinparam rectangleBackgroundColor #F5F5F5
skinparam rectangleBorderColor #333333
skinparam arrowColor #333333

title 工作流程是如何執行的?

rectangle "restful API & UI" as node1
rectangle "dispatch tasks" as node2
rectangle "store results" as node3
rectangle "store metadata" as node4
rectangle "read input from" as node5
rectangle "retrieve metadata" as node6

node1 --> node2
node2 --> node3
node3 --> node4
node4 --> node5
node5 --> node6

@enduml

此圖示展示了一個通用的工作流程協調服務的設計概覽,包括網頁伺服器、排程器與控制器、工作節點、雲端物件儲存和元資料資料函式庫之間的互動關係。

內容解密:

  1. 網頁伺服器提供使用者介面和 API 以便與系統互動。
  2. 排程器和控制器負責管理工作流程的執行,包括監控、排程和分派任務。
  3. 元資料資料函式庫儲存了所有與工作流程相關的中繼資料。
  4. 工作節點群組提供了計算資源來執行任務,並且可以支援不同的基礎設施。
  5. 物件儲存用於分享檔案和任務輸出結果。
  6. 系統中的每個元件都可以存取物件儲存和元資料資料函式庫,以實作解耦和獨立運作。

透過這種架構設計,可以實作一個通用且靈活的工作流程協調系統,能夠支援不同的應用場景和基礎設施。

工作流程協調(Workflow Orchestration)深度解析

工作流程協調是深度學習系統中的關鍵組成部分,負責管理和執行複雜的工作流程。本章節將探討工作流程協調的內部運作機制、設計原則以及開源工具的實際應用。

工作流程協調的運作機制

工作流程協調系統的核心任務是管理和執行工作流程(workflow)。以Vena系統為例,其工作流程協調過程如下:

  1. 工作流程定義與儲存:首先,使用者透過Web UI定義工作流程,並將其儲存於中繼資料資料函式庫中。
  2. 排程與執行:排程器/控制器元件定期檢查中繼資料資料函式庫,以確定是否有新的工作流程需要執行。一旦發現新的工作流程,排程器便會將其任務推入共用任務佇列中。
  3. 任務執行:工作節點(worker)從共用任務佇列中提取任務,並根據中繼資料資料函式庫中的任務定義執行相應的操作。在執行過程中,工作節點將任務的輸出結果儲存至物件儲存系統,並將執行狀態回報至中繼資料資料函式庫。
  4. 監控與管理:最後,使用者可以透過Web UI即時監控工作流程的執行狀態。由於排程器/控制器元件和工作節點會即時更新中繼資料資料函式庫,因此Web UI始終能夠顯示最新的工作流程狀態。

工作流程協調的設計原則

一個優秀的工作流程協調系統應遵循以下六大設計原則:

原則1:可靠性(Criticality)

工作流程協調系統必須提供穩定的工作流程執行體驗。有效的工做流程應當被正確、重複且準時地執行。

原則2:可用性(Usability)

可用性的衡量標準在於是否能夠最佳化資料科學家的生產力。一個使用者友善的協調系統應當讓使用者能夠輕鬆地建立、監控和故障排除工作流程。

原則3:可擴充套件性(Extensibility)

為了適應多樣化的深度學習基礎設施,使用者應能夠輕鬆定義自訂的任務運算元和執行器,而無需擔心佈署環境。

原則4:隔離性(Isolation)

隔離性包括工作流程建立隔離和工作流程執行隔離。前者確保使用者在建立工作流程時不會相互幹擾;後者則確保每個工作流程在獨立的環境中執行,避免資源競爭和故障傳播。

原則5:可擴充套件性(Scaling)

一個良好的協調系統應能夠處理大量平行工作流程和龐大的工作流程。這包括透過增加計算資源來支援無限平行的工作流程執行,以及提供水平平行運算元來最佳化單一龐大工作流程的效能。

原則6:以人為中心的原型開發與生產支援

深度學習專案開發是一個從原型設計到生產的迭代過程。因此,一個適合深度學習的工作流程協調系統應當能夠無縫地將資料科學家的本地原型程式碼轉換為生產就緒的工作流程。

開源工作流程協調工具巡禮

本文將介紹三種廣泛使用的開源工作流程協調工具:Airflow、Argo Workflows和Metaflow。我們將探討它們的基本功能,並從深度學習專案開發的角度進行評估。

為了進行公平比較,我們使用相同的範例工作流程實作了這三種工具的偽程式碼。該範例工作流程的功能是:每天檢查是否有新資料,如果有,則轉換資料並儲存至新的資料函式庫表格中,然後通知資料科學團隊。

# 使用Python定義一個簡單的工作流程
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 3, 20),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# 定義DAG
dag = DAG(
    'data_processing',
    default_args=default_args,
    schedule_interval=timedelta(days=1),
)

# 定義任務
def check_new_data(**kwargs):
    # 檢查是否有新資料
    return True

def transform_data(**kwargs):
    # 轉換資料
    pass

def notify_data_science_team(**kwargs):
    # 通知資料科學團隊
    pass

# 定義任務運算元
check_new_data_task = PythonOperator(
    task_id='check_new_data',
    python_callable=check_new_data,
    dag=dag,
)

transform_data_task = PythonOperator(
    task_id='transform_data',
    python_callable=transform_data,
    dag=dag,
)

notify_data_science_team_task = PythonOperator(
    task_id='notify_data_science_team',
    python_callable=notify_data_science_team,
    dag=dag,
)

# 設定任務依賴關係
check_new_data_task >> transform_data_task >> notify_data_science_team_task

內容解密:

  1. default_args字典:定義了DAG的預設引數,包括擁有者、開始日期、重試次數等。
  2. DAG物件:代表一個有向無環圖,定義了工作流程的結構和排程間隔。
  3. PythonOperator:用於執行Python函式的運算元,分別對應檢查新資料、轉換資料和通知資料科學團隊三個任務。
  4. 任務依賴關係:透過>>運算元設定任務之間的依賴關係,確保工作流程按照預期的順序執行。

9.3 工作流程協調系統導覽

9.3.1 Airflow 工作流程管理系統

Airflow 最初於 2014 年在 Airbnb 開發,目前隸屬於 Apache 基金會。它是一個以程式設計方式編寫、排程和監控工作流程的平台。雖然 Airflow 最初是為了協調日益複雜的 ETL(提取、轉換、載入)資料管線而設計,但由於其良好的擴充性和生產品質,加上支援圖形使用者介面(GUI),因此被廣泛應用於多個領域,包括深度學習。目前,Airflow 是最廣泛採用的工作流程協調系統。

典型使用案例

在 Airflow 中建立工作流程需要兩個步驟:首先,定義工作流程的 DAG(有向無環圖)及任務;其次,在 DAG 中宣告任務之間的依賴關係。Airflow 的 DAG 本質上是 Python 程式碼。以下程式碼展示瞭如何使用 Airflow 實作範例工作流程:

# 宣告工作流程 DAG
with DAG(dag_id="data_process_dag",
         schedule_interval="@daily",
         default_args=default_args,
         template_searchpath=[f"{os.environ['AIRFLOW_HOME']}"],
         catchup=False) as dag:
    # 定義工作流程中的任務,每個程式碼區塊代表一個任務
    is_new_data_available = FileSensor(
        task_id="is_new_data_available",
        fs_conn_id="data_path",
        filepath="data.csv",
        # ...
    )
    
    # 定義資料轉換任務
    transform_data = PythonOperator(
        task_id="transform_data",
        python_callable=transform_data
    )
    
    # 定義資料表建立任務
    create_table = PostgresOperator(
        task_id="create_table",
        sql='''CREATE TABLE IF NOT EXISTS invoices (
            # ...
        );''',
        postgres_conn_id='postgres',
        database='customer_data'
    )
    
    save_into_db = PythonOperator(
        task_id='save_into_db',
        python_callable=store_in_db
    )
    
    notify_data_science_team = SlackWebhookOperator(
        task_id='notify_data_science_team',
        http_conn_id='slack_conn',
        webhook_token=slack_token,
        message="Data Science Notification \n"
        # ...
    )
    
    # 第二步:在工作流程中宣告任務依賴關係
    is_new_data_available >> transform_data
    transform_data >> create_table >> save_into_db
    save_into_db >> notify_data_science_team
    save_into_db >> create_report

# 實際的資料轉換邏輯,在 "transform_data" 任務中被參照
def transform_data(*args, **kwargs):
    # ...

內容解密:

  1. DAG 定義:使用 DAG 上下文管理器定義工作流程,指定 dag_idschedule_interval 等屬性。
  2. 任務定義:使用不同的 Operator(如 FileSensorPythonOperatorPostgresOperator)定義各個任務,例如檢查新資料、轉換資料、建立資料表等。
  3. 任務依賴:使用位移運算元(>>)定義任務之間的依賴關係,確保工作流程按照預期順序執行。
  4. 自定義函式:使用 PythonOperator 執行自定義的 Python 函式,如 transform_datastore_in_db

主要特點

Airflow 提供以下主要特點:

  • DAGs:Airflow 使用 DAGs 對複雜工作流程進行抽象,DAG 是透過 Python 程式函式庫實作的。
  • 程式化工作流程管理:Airflow 支援動態建立任務,並允許建立複雜的動態工作流程。
  • 內建運算元:提供大量預定義的運算元,幫助使用者無需編寫程式碼即可完成任務。
  • 任務依賴和執行管理:具備自動重試政策,並提供不同型別的感測器來處理執行時依賴關係。
  • 擴充性:允許自定義感測器、鉤子和運算元,便於與不同系統整合。
  • 監控和管理介面:提供強大的 UI,方便使用者快速瞭解工作流程和任務的執行狀態和歷史記錄。

限制

雖然 Airflow 是優秀的工作流程協調工具,但在深度學習場景下仍存在一些缺點:

  • 高昂的學習成本:對於未受過訓練的資料科學家來說,Airflow 的學習曲線較陡。
  • 將深度學習原型程式碼遷移到生產環境的摩擦較大:需要將本地模型訓練程式碼轉換為 Airflow DAG,增加了額外的工作量。
  • 在 Kubernetes 上操作複雜度高:在 Kubernetes 上佈署和操作 Airflow 並非易事,若要在 Kubernetes 上執行工作流程,Argo Workflows 是更好的選擇。

9.3.2 Argo Workflows 工作流程管理系統

Argo Workflows 是一個開源的、容器原生的工作流程引擎,用於在 Kubernetes 上協調平行工作流程和任務。Argo Workflows 與 Airflow 解決相同的問題,但採用了 Kubernetes 原生的方式。最大的不同之處在於,Argo Workflows 的工作流程和任務是以 Kubernetes 自定義資源定義(CRD)物件實作的,每個任務(步驟)都作為 Kubernetes Pod 執行。圖 9.6 提供了一個高階系統概覽。