返回文章列表

Ray Core 分散式資料處理效能提升

Ray Core 提供簡便方法將 Python 函式轉換為可遠端執行的任務,提升資料處理效能。文章探討 Ray Core API、系統架構、物件儲存、任務依賴管理及非同步處理等關鍵技術,並深入解析 Raylet、Worker Processes、Head Node 與 GCS 的運作機制,展示如何利用 Ray

分散式系統 資料科學

Ray Core 提供了一種將 Python 函式轉換為分散式任務的機制,藉此提升資料處理效能。它允許開發者使用熟悉的 Python 語法編寫程式碼,並透過 Ray 的分散式框架在多個節點上平行執行這些任務。Ray 的物件儲存機制允許在不同節點之間高效分享資料,避免了資料傳輸的瓶頸。非同步任務執行和依賴管理簡化了複雜工作流程的開發,而 Ray Actors 則提供了管理狀態的有效方法,使得構建複雜的分散式應用程式更加容易。透過整合 Plasma 和 Apache Arrow,Ray 的物件儲存提供了高效的記憶體管理和資料分享能力。Raylet 作為節點上的核心元件,負責任務排程和資源管理,確保任務在合適的節點上執行。

使用 Ray Core 提升資料處理效能

Ray 提供了一個簡便的方法來將 Python 函式轉換為可遠端執行的任務(task),使得資料科學家能夠輕鬆地擴充套件他們的工作流程至多台機器上。在本章中,我們將探討如何使用 Ray Core 來提升資料處理的效能。

建立 Ray 任務

首先,我們需要將一個 Python 函式裝飾為 Ray 任務。這可以透過在函式定義前新增 @ray.remote 裝飾器來實作。以下是一個簡單的例子:

@ray.remote
def retrieve_task(item, db):
    time.sleep(item / 10.)
    return item, db[item]

內容解密:

  • @ray.remote 裝飾器將 retrieve_task 函式轉換為一個 Ray 任務。
  • retrieve_task 函式模擬了一個耗時操作,睡眠時間與輸入 item 相關。
  • 函式傳回 item 和從 db 中檢索到的對應值。

執行 Ray 任務

要執行 Ray 任務,我們需要使用 .remote() 方法。這樣做會立即傳回一個或多個物件參考(ObjectRef),代表任務的結果。

object_references = [retrieve_task.remote(item, db_object_ref) for item in range(8)]
data = ray.get(object_references)

內容解密:

  • .remote() 方法用於遠端執行 retrieve_task
  • ray.get() 方法用於取得 object_references 中的實際資料。
  • 由於 Ray 任務是非同步執行的,ray.get() 會阻塞直到所有任務完成。

使用物件儲存最佳化資料存取

在分散式環境中,多個工作節點需要存取相同的資料。Ray 提供了一個分散式物件儲存,用於在工作節點之間分享資料。

db_object_ref = ray.put(database)

內容解密:

  • ray.put()database 存入 Ray 的物件儲存,並傳回一個物件參考。
  • 物件參考可以傳遞給 Ray 任務,以便它們能夠存取分享資料。

非同步呼叫與超時處理

使用 ray.wait() 可以實作非阻塞的呼叫,並且可以設定合理的超時時間。

while len(object_references) > 0:
    finished, object_references = ray.wait(object_references, num_returns=2, timeout=7.0)
    data = ray.get(finished)
    print_runtime(data, start)
    all_data.extend(data)

內容解密:

  • ray.wait() 非同步等待任務完成,可以設定超時時間。
  • 已完成的任務結果透過 ray.get() 取得並處理。
  • 未完成的任務參考繼續等待處理,避免無限迴圈。

非同步處理與任務依賴管理

在前面的章節中,我們展示瞭如何使用 Ray 的 wait 函式來非同步地處理任務結果。在這個例子中,我們將結果以每兩個為一組的方式印出,並且持續將新的資料附加到 all_data 中,直到所有任務完成。

以區塊為單位處理結果

Ray 的 wait 函式傳回兩個引數:已完成的任務結果和仍在處理中的任務 Future。透過設定 num_returns 引數,我們可以控制 wait 函式何時傳回。在這個例子中,我們設定 num_returns=2,使得每當有兩個新的資料函式庫專案可用時,wait 就會傳回。

程式碼範例

import ray
import time

# 初始化 Ray
ray.init()

# 定義一個遠端任務
@ray.remote
def retrieve_task(item, db_object_ref):
    # 模擬資料函式庫查詢
    time.sleep(0.1)
    return item, f"Data {item}"

# 建立任務參考
refs = [retrieve_task.remote(i, None) for i in range(8)]

# 處理結果
while refs:
    finished, refs = ray.wait(refs, num_returns=2)
    results = ray.get(finished)
    print(f"Runtime: {time.time() - start_time:.2f} seconds, data:")
    for result in results:
        print(result)

# 結束 Ray
ray.shutdown()

內容解密:

  1. 我們使用 ray.wait 函式來等待任務完成,並且設定 num_returns=2 以批次處理結果。
  2. while 迴圈中,我們持續呼叫 ray.wait 直到所有任務都完成。
  3. 對於每個批次完成的任務,我們使用 ray.get 來取得實際的結果。
  4. 最後,我們印出每個批次的結果。

處理任務依賴

在實際應用中,我們經常需要處理具有依賴關係的任務。也就是說,某些任務需要等待其他任務完成後才能開始執行。Ray 提供了一種簡單的方式來處理這種依賴關係。

程式碼範例

import ray

# 初始化 Ray
ray.init()

# 定義兩個遠端任務,第一個任務檢索資料,第二個任務根據第一個任務的結果進行後續處理
@ray.remote
def retrieve_task(item, db_object_ref):
    # 模擬資料函式庫查詢
    return item, f"Data {item}"

@ray.remote
def follow_up_task(retrieve_result):
    original_item, _ = retrieve_result
    follow_up_result = retrieve_task.remote(original_item + 1, None)
    return retrieve_result, ray.get(follow_up_result)

# 建立第一個任務的參考
retrieve_refs = [retrieve_task.remote(item, None) for item in [0, 2, 4, 6]]

# 建立第二個任務的參考,依賴於第一個任務的結果
follow_up_refs = [follow_up_task.remote(ref) for ref in retrieve_refs]

# 取得最終結果
results = ray.get(follow_up_refs)
for result in results:
    print(result)

# 結束 Ray
ray.shutdown()

內容解密:

  1. 我們定義了兩個遠端任務:retrieve_taskfollow_up_task
  2. follow_up_task 依賴於 retrieve_task 的結果。
  3. 我們使用 ray.getfollow_up_task 中取得 retrieve_task 的實際結果。
  4. Ray 自動處理了任務之間的依賴關係,無需顯式地呼叫 ray.getray.wait

從類別到 Actor

在 Ray 中,Actor 是一種特殊的遠端類別,可以用來執行有狀態的計算。Actor 可以跨多個任務保持其狀態,並且可以與其他 Actor 進行通訊。

程式碼範例(概念展示)

import ray

# 初始化 Ray
ray.init()

# 定義一個 Actor 類別
@ray.remote
class DataTracker:
    def __init__(self):
        self.count = 0

    def increment(self):
        self.count += 1

    def get_count(self):
        return self.count

# 建立一個 Actor 例項
tracker = DataTracker.remote()

# 在多個任務中更新 Actor 的狀態
@ray.remote
def update_tracker(tracker):
    tracker.increment.remote()

refs = [update_tracker.remote(tracker) for _ in range(10)]

# 等待所有任務完成
ray.get(refs)

# 取得最終的計數結果
final_count = ray.get(tracker.get_count.remote())
print(f"Final count: {final_count}")

# 結束 Ray
ray.shutdown()

內容解密:

  1. 我們定義了一個 DataTracker Actor 類別,用於跟蹤某個計數器的值。
  2. 在多個任務中,我們呼叫 update_tracker 來更新 DataTracker 的狀態。
  3. 最後,我們取得 DataTracker 的最終計數結果。

Ray Core API 簡介與系統架構解析

在前面的範例中,我們探討了 Ray 任務和演員(actors)作為分散式版本的 Python 函式和類別的用法。不僅如此,物件也是 Ray Core 中的一等公民,與任務和演員具有同等地位。物件儲存是 Ray 的核心元件之一。

Ray Core API 總覽

回顧前面的範例,我們總共使用了六種 API 方法:

  1. ray.init():初始化 Ray 叢集,可傳入位址以連線現有叢集。
  2. @ray.remote:將函式轉換為任務,將類別轉換為演員。
  3. ray.put():將值放入 Ray 的物件儲存中。
  4. ray.get():從物件儲存中取得值,可取得由任務或演員計算出的值。
  5. .remote():在 Ray 叢集上執行演員方法或任務,用於例項化演員。
  6. ray.wait():傳回兩個物件參考列表,一個包含已完成的任務,另一個包含未完成的任務。

Ray 系統元件解析

節點上的排程與執行

Ray 叢集由多個節點組成。首先,我們來看看單個節點上的運作。工作節點由多個工作程式組成,每個工作程式都有唯一的 ID、IP 位址和連線埠。工作程式被稱為「工作者」,因為它們盲目地執行分配給它們的工作。

每個工作節點都有一個稱為 Raylet 的元件。Raylet 是節點上的智慧元件,負責管理工作程式。Raylet 由兩個部分組成:任務排程器和物件儲存。

物件儲存

在前面的範例中,我們已經鬆散地使用了物件儲存的概念。每個 Ray 叢集的節點都配備了一個物件儲存,用於儲存和管理物件。

使用 Ray Actors 追蹤狀態

Ray actors 是被裝飾的 Python 類別,可以用來追蹤狀態。以下是一個簡單的計數器範例:

@ray.remote
class DataTracker:
    def __init__(self):
        self._counts = 0

    def increment(self):
        self._counts += 1

    def counts(self):
        return self._counts

內容解密:

  1. @ray.remote 裝飾器將 DataTracker 類別轉換為 Ray actor,使其能夠在分散式環境中使用。
  2. DataTracker 類別具有一個簡單的計數器 _counts,用於追蹤狀態。
  3. increment 方法使計數器加一,counts 方法傳回當前計數器的值。

在任務中使用 Ray Actors

我們可以將 DataTracker actor 傳遞給任務,以追蹤任務執行情況:

@ray.remote
def retrieve_tracker_task(item, tracker, db):
    time.sleep(item / 10.)
    tracker.increment.remote()
    return item, db[item]

tracker = DataTracker.remote()
object_references = [
    retrieve_tracker_task.remote(item, tracker, db_object_ref)
    for item in range(8)
]
data = ray.get(object_references)
print(data)
print(ray.get(tracker.counts.remote()))

內容解密:

  1. retrieve_tracker_task 任務接收 DataTracker actor 作為引數,並呼叫其 increment 方法。
  2. tracker.increment.remote() 非同步呼叫 increment 方法,使計數器加一。
  3. 最後,我們使用 ray.get(tracker.counts.remote()) 取得計數器的最終值。

Ray 系統架構深度解析

Ray 是一個強大的分散式運算框架,其核心元件包含了 Raylet、Worker Processes、以及 Head Node 等重要部分。這些元件共同協作,使得 Ray 能夠高效地管理分散式任務的執行、資源分配、以及物件儲存等關鍵功能。

Raylet:分散式物件儲存與任務排程的核心

Raylet 是 Ray 的核心元件之一,它由兩個主要部分組成:物件儲存(Object Store)和排程器(Scheduler)。

物件儲存:Plasma 與 Apache Arrow 的整合應用

物件儲存負責管理節點上的分享記憶體池,確保不同 Worker Processes 之間能夠高效地分享和存取物件。Ray 的物件儲存根據 Plasma 實作,而 Plasma 現在已經成為 Apache Arrow 專案的一部分。這樣的設計使得 Ray 能夠充分利用高效的記憶體管理和物件分享機制,從而提升整體的運算效能。

排程器:資源管理與依賴解析的關鍵角色

排程器是 Raylet 的另一個重要組成部分,它負責資源管理和任務排程。當一個任務需要特定的資源(如 CPU、GPU 或記憶體)時,排程器需要確保能夠找到一個具備足夠資源的 Worker Process 來執行該任務。排程器預設會偵測節點上的 CPU、GPU 和記憶體資源,並根據這些資訊來決定是否能夠排程某個任務。如果資源不足,任務將會被佇列,等待資源可用時再執行。

此外,排程器還負責依賴解析(Dependency Resolution),確保 Worker Processes 擁有執行任務所需的所有物件。這涉及到檢查本地物件儲存中的依賴物件,如果某些依賴物件不在本地,則需要從其他節點提取遠端依賴。

Worker Processes 與所有權模型

Worker Processes 是實際執行任務的程式,每個 Worker Process 都會儲存它所呼叫的任務的元資料和物件參照。Ray 引入了「所有權」(Ownership)的概念,規定產生物件參照的程式也負責該物件的解析。這意味著每個 Worker Process 都「擁有」它提交的任務,包括任務的正確執行和結果的可用性。

為了實作這一機制,Worker Processes 維護了一個所謂的「所有權表」(Ownership Table),記錄了它們所擁有的任務和物件參照。這樣,當某個任務失敗需要重新計算時,擁有該任務的 Worker Process 已經具備了所有必要的資訊來完成這一過程。

程式碼範例:所有權與依賴關係解析

@ray.remote
def task_owned():
    return

@ray.remote
def task(dependency):
    res_owned = task_owned.remote()
    return

val = ray.put("value")
res = task.remote(dependency=val)

內容解密:

  1. 函式定義task_ownedtask 兩個函式被定義為遠端任務(使用 @ray.remote 修飾器)。這意味著它們可以被 Ray 的排程器分配到不同的 Worker Processes 中執行。
  2. task_owned 函式:簡單地傳回一個值,但具體傳回值未指定。在實際應用中,這裡可以是任何計算邏輯。
  3. task 函式:接受一個依賴引數 dependency,並在內部呼叫 task_owned.remote() 來啟動一個新的遠端任務。這裡展現了任務之間的依賴關係和巢狀呼叫。
  4. valresval 被放入物件儲存中,而 restask 函式遠端執行的結果參照。這裡展現瞭如何使用 ray.put 將資料放入物件儲存,以及如何透過 remote 方法啟動遠端任務。
  5. 所有權關係:主程式擁有 taskvalres。當 task 被呼叫時,它擁有 res_owned,而 res_owned 依賴於 task_owned 的執行結果。

Head Node 與 Global Control Store (GCS)

每個 Ray Cluster 都有一個特殊的節點稱為 Head Node。Head Node 不僅可以執行 Driver Process 和 Worker Processes,還負責執行一些叢集管理元件,如 Autoscaler 和 Global Control Store (GCS)。GCS 是一個關鍵值儲存,用於儲存叢集的全域性資訊,如系統級別的元資料、心跳訊號等。Raylets 會定期向 GCS 傳送心跳訊號,以表明它們仍然活躍。

分散式排程與執行

Ray 的分散式排程和執行涉及多個關鍵步驟,包括分散式記憶體管理、節點間通訊、以及資源管理等。以下是相關步驟的概述:

  1. 分散式記憶體管理:各個節點上的 Raylet 管理本地記憶體,並在需要時進行物件傳輸,以解決遠端依賴問題。

@startuml skinparam backgroundColor #FEFEFE skinparam defaultTextAlignment center skinparam rectangleBackgroundColor #F5F5F5 skinparam rectangleBorderColor #333333 skinparam arrowColor #333333

title 分散式排程與執行

rectangle “分散式排程” as n1 rectangle “執行” as n2

n1 –> n2

@enduml

   
   **圖表翻譯:** 此圖示展示了不同節點之間的物件傳輸過程,用於解決遠端依賴問題。

2. **節點間通訊**:大多數通訊,如物件傳輸,是透過 gRPC 進行的。

3. **資源管理與分配**:Raylets 負責授予資源和租賃 Worker Processes 給任務擁有者。跨節點的所有排程器共同構成了分散式排程器,使得節點能夠在其他節點上排程任務。

總而言之,Ray 透過其精心設計的架構,能夠有效地管理和執行分散式任務,充分利用叢集資源,為大規模資料處理和機器學習工作負載提供強大的支援。