返回文章列表

Ray 分散式運算與機器學習應用

Ray 是一個功能強大的分散式運算框架,適用於各種機器學習任務,包括資料處理、模型訓練、強化學習和超引數調優。本文將介紹 Ray 的核心概念,包括 Datasets、RLlib、Train、Tune 和 Serve,並示範如何使用這些工具來構建和佈署高效能的機器學習應用程式。此外,文章也將探討 Ray

機器學習 分散式系統

Ray 提供了簡潔易用的 API,讓開發者能輕鬆地將 Python 應用程式擴充套件到分散式環境。Ray Datasets 可有效處理大規模資料,而 Ray RLlib 則專注於強化學習模型的訓練與評估。此外,Ray Tune 提供了超引數調優功能,可自動搜尋最佳引陣列合,提升模型效能。Ray Serve 則簡化了模型佈署流程,讓開發者能快速將訓練好的模型上線提供服務。這些功能都建構在 Ray Core 的分散式運算引擎之上,使其具備高效能和可擴充套件性。

Ray 資料集與分散式訓練概述

Ray 提供了一套完整的資料科學函式庫,涵蓋了資料處理、模型訓練等眾多功能。本篇文章將重點介紹 Ray Datasets 和 Ray RLlib,分別用於分散式資料處理和強化學習模型的訓練。

使用 Ray Datasets 進行分散式資料處理

Ray Datasets 是一個高效能的分散式資料處理函式庫,設計用於處理大規模的資料集。以下是一個簡單的範例,展示如何使用 Ray Datasets 進行資料處理:

import ray

# 初始化 Ray
ray.init()

# 建立一個包含數值 0 到 9999 的資料集
ds = ray.data.range(10000)

# 對資料集中的每個元素進行平方運算
ds = ds.map(lambda x: {"data": x ** 2})

# 篩選出偶數
ds = ds.filter(lambda x: x["data"] % 2 == 0)

# 對剩餘的元素進行擴增,加入它們的立方
pipe = ds.window().map(lambda x: x["data"]).filter(lambda x: x % 2 == 0).flat_map(lambda x: [x, x ** 3])

# 顯示前 10 個結果
result = pipe.take(10)
print(result)

內容解密:

  1. 初始化 Ray:首先,我們需要初始化 Ray,以便使用其分散式運算功能。
  2. 建立資料集:使用 ray.data.range(10000) 建立一個包含數值 0 到 9999 的資料集。
  3. 平方運算:透過 map 方法對資料集中的每個元素進行平方運算。
  4. 篩選偶數:使用 filter 方法篩選出偶數。
  5. 擴增資料:利用 flat_map 方法對剩餘的元素進行擴增,加入它們的立方。
  6. 顯示結果:最後,使用 take 方法取出前 10 個結果並列印出來。

將 Dataset 轉換為 Pipeline

Dataset 的轉換是同步執行的,這在某些複雜任務中可能不是最佳選擇。Ray 提供了 DatasetPipeline,可以將 Dataset 轉換為 pipeline,以實作非同步執行:

pipe = ds.window()
result = pipe.map(lambda x: x["data"] ** 2).filter(lambda x: x % 2 == 0).flat_map(lambda x: [x, x ** 3])
result.show(10)

內容解密:

  1. 轉換為 Pipeline:透過呼叫 window() 方法將 Dataset 轉換為 pipeline。
  2. 鏈式操作:pipeline 的步驟可以鏈式呼叫,與 Dataset 的操作類別似。

使用 Ray RLlib 進行強化學習

Ray RLlib 是 Ray 中用於強化學習的函式庫,支援 TensorFlow 和 PyTorch 等現代機器學習框架。以下是一個使用 RLlib 的範例,示範如何訓練一個強化學習代理來解決 CartPole-v1 問題:

rllib example run cartpole-ppo

這個命令會執行一個預先組態好的 PPO 演算法範例,用於解決 CartPole-v1 環境中的控制問題。

CartPole-v1 環境與 PPO 演算法組態

CartPole-v1 環境模擬了一個在小車上的倒立擺控制問題。PPO(Proximal Policy Optimization)是一種強大的強化學習演算法,用於解決這類別問題。組態檔案中指定了環境、演算法和相關引數。

cartpole-ppo:
  env: CartPole-v1
  run: PPO
  stop:
    episode_reward_mean: 150
    timesteps_total: 100000
  config:
    framework: tf
    gamma: 0.99
    lr: 0.0003
    num_workers: 1
    observation_filter: MeanStdFilter
    num_sgd_iter: 6
    vf_loss_coeff: 0.01
    model:
      fcnet_hiddens: [32]
      fcnet_activation: linear
      vf_share_layers: true
    enable_connectors: True

圖表翻譯:

此圖示呈現了 CartPole-v1 環境的控制流程。小車可以左右移動,以保持擺桿直立。強化學習代理透過與環境互動,學習最佳的控制策略。

@startuml
skinparam backgroundColor #FEFEFE
skinparam defaultTextAlignment center
skinparam rectangleBackgroundColor #F5F5F5
skinparam rectangleBorderColor #333333
skinparam arrowColor #333333

title 圖表翻譯:

rectangle "狀態" as node1
rectangle "動作" as node2
rectangle "獎勵" as node3

node1 --> node2
node2 --> node3

@enduml

Ray 在機器學習中的應用:從強化學習到超引數調優

Ray 是一個強大的分散式運算框架,不僅能支援強化學習(Reinforcement Learning),還能處理其他型別的機器學習任務,如監督式學習(Supervised Learning)。本篇文章將介紹 Ray 在強化學習和超引數調優(Hyperparameter Tuning)中的應用。

使用 RLlib 進行強化學習

Ray 的 RLlib 是一個專門為強化學習設計的函式庫,提供了簡單易用的介面來訓練和評估強化學習演算法。以下是一個使用 RLlib 訓練 CartPole-v1 環境的範例:

rllib train --algo PPO --env CartPole-v1

訓練完成後,可以使用以下命令評估訓練好的模型:

rllib evaluate <checkpoint-path>/checkpoint_<number> --algo PPO

內容解密:

  • rllib train 命令用於啟動訓練程式,--algo PPO 指定使用 PPO 演算法,--env CartPole-v1 指定訓練環境。
  • rllib evaluate 命令用於評估訓練好的模型,<checkpoint-path>/checkpoint_<number> 是訓練過程中儲存的檢查點路徑。

使用 Ray Train 進行分散式訓練

除了強化學習,Ray 還提供了 Ray Train 函式庫來支援其他型別的機器學習任務的分散式訓練。雖然目前尚未介紹足夠的深度學習框架知識,但可以先跳到第 6 章瞭解更多關於 Ray Train 的內容。

使用 Ray Tune 進行超引數調優

Ray Tune 是 Ray 的另一個重要元件,專門用於超引數調優。它提供了一系列的超引數最佳化演算法,能夠有效地搜尋最佳的超引陣列合。以下是一個使用 Ray Tune 的範例:

from ray import tune
import math
import time

def training_function(config):
    x, y = config["x"], config["y"]
    time.sleep(10)
    score = objective(x, y)
    tune.report(score=score)

def objective(x, y):
    return math.sqrt((x**2 + y**2)/2)

result = tune.run(
    training_function,
    config={
        "x": tune.grid_search([-1, -.5, 0, .5, 1]),
        "y": tune.grid_search([-1, -.5, 0, .5, 1])
    }
)

print(result.get_best_config(metric="score", mode="min"))

內容解密:

  • training_function 模擬了一個耗時的訓練過程,接受一個包含超引數 xy 的組態字典。
  • objective 函式計算了一個常見的目標函式值,即 xy 的平方和的平方根。
  • tune.run 初始化了超引數最佳化過程,並指定了 xy 的搜尋空間。
  • result.get_best_config 取得了使目標函式值最小的最佳超引陣列態。

此範例展示瞭如何使用 Ray Tune 進行網格搜尋(Grid Search),以找到使目標函式最小的最佳超引陣列合。由於 Ray 的平行化能力,這個過程可以大大加速。

Ray 的模型服務與生態系統整合

Ray 提供了一系列高階函式庫來簡化機器學習工作流程,其中 Ray Serve 專門用於模型服務。為了展示其功能,我們將使用一個預先訓練好的 GPT-2 語言模型,該模型能夠根據輸入的文字生成延續或完成的文字。

佈署 GPT-2 模型

首先,我們需要安裝 Hugging Face 的 transformers 函式庫,該函式庫包含了我們所需的 GPT-2 模型。執行以下命令進行安裝:

pip install transformers

接下來,我們將使用 Ray Serve 佈署 GPT-2 模型。以下是範例程式碼:

from ray import serve
from transformers import pipeline
import requests

# 啟動 Ray Serve
serve.start()

# 定義模型服務
@serve.deployment
def model(request):
    # 載入語言模型
    language_model = pipeline("text-generation", model="gpt2")
    # 取得查詢引數
    query = request.query_params["query"]
    # 使用模型生成文字
    return language_model(query, max_length=100)

# 佈署模型
model.deploy()

# 傳送查詢請求
query = "What's the meaning of life?"
response = requests.get(f"http://localhost:8000/model?query={query}")
print(response.text)

內容解密:

  1. serve.start():在本機啟動 Ray Serve。
  2. @serve.deployment:將 model 函式轉換為 Ray Serve 的佈署單元。
  3. language_model = pipeline("text-generation", model="gpt2"):載入 GPT-2 語言模型。
  4. query = request.query_params["query"]:從請求中取得查詢引數。
  5. language_model(query, max_length=100):使用 GPT-2 模型生成最多 100 個字元的文字。
  6. model.deploy():正式佈署模型,使其能夠接收 HTTP 請求。
  7. requests.get(f"http://localhost:8000/model?query={query}"):使用 requests 函式庫傳送查詢請求。

Ray 的生態系統整合

Ray 的高階函式庫功能強大,但並不意味著 Ray 是您所需的全部。事實上,最成功和最強大的框架通常能夠與現有的解決方案和想法良好整合。Ray 的生態系統中有許多與現有工具整合的例子,例如:

  • Dask-on-Ray:允許在 Ray 叢集上執行整個 Dask 生態系統。
  • Spark on Ray:將 Spark 工作負載與 Ray 整合。
  • Modin:一個分散式的 Pandas DataFrame 替代品,使用 Ray(或 Dask)作為分散式執行引擎。

Ray 的許多函式庫無縫整合了常見工具作為後端,創造了通用的介面,而不是試圖建立新的標準。這些介面允許以分散式方式執行任務,這是大多數相關後端所不具備的特性。

Ray 核心分散式運算引擎介紹

Ray 提供了一個強大且靈活的分散式運算框架,其核心 API 為開發者提供了建構分散式應用的基礎。在本章中,我們將探討 Ray Core 的基本概念,並透過實際範例來展示如何使用 Ray 進行分散式運算。

Ray 的三層架構概述

Ray 的架構可以分為三層:

  1. Ray Core API:作為 Ray 的核心,提供基本的分散式運算功能。
  2. Ray ML Libraries:包括 Ray Datasets、Ray RLlib、Ray Train、Ray Tune 和 Ray Serve 等,專為機器學習工作流程設計。
  3. 擴充套件與整合:Ray 的生態系統中有許多第三方整合和後端擴充套件。

為何需要 Ray Core?

Python 本身在分散式運算方面存在侷限性,例如其直譯器本質上是單執行緒的,這使得利用多核 CPU 或整個叢集變得困難。Ray Core API 提供了一個通用的分散式程式設計解決方案,讓 Python 社群能夠更容易地進行分散式運算。

Ray Core 基本概念

本章將重點介紹 Ray Core 的基本概念,包括:

  • 任務(Tasks):分散式版本的函式。
  • 角色(Actors):分散式版本的類別。
  • 物件(Objects):儲存在 Ray 的物件儲存中,可以被取回使用。

初始化 Ray 叢集

要開始使用 Ray,首先需要初始化一個本地叢集:

import ray
ray.init()

執行上述程式碼後,你將看到類別似以下的輸出:

INFO services.py:1263 -- View the Ray dashboard at http://127.0.0.1:8265
{'node_ip_address': '192.168.1.41', 'node_id': '...'}

這表示你的 Ray 叢集已經啟動並執行。

檢視 Ray 叢集資源

你可以使用 ray.cluster_resources() 來檢視叢集的資源利用情況:

print(ray.cluster_resources())

輸出結果可能如下:

{'CPU': 12.0, 'memory': 14203886388.0}

內容解密:

  • ray.cluster_resources() 用於顯示當前叢集的資源,包括 CPU 和記憶體。
  • 資源資訊對於規劃和調優分散式任務至關重要。

使用 Ray 進行分散式運算

透過 Ray,你可以輕鬆地將任務分配到不同的 CPU 核心或機器上。以下是一個簡單的範例,展示如何使用 Ray 進行資料平行處理:

import ray

# 初始化 Ray
ray.init()

# 定義一個 Ray 任務
@ray.remote
def my_task(x):
    # 模擬一些計算工作
    import time
    time.sleep(1)
    return x * x

# 提交多個任務
futures = [my_task.remote(i) for i in range(10)]

# 取得結果
results = ray.get(futures)
print(results)

# 關閉 Ray
ray.shutdown()

內容解密:

  • @ray.remote 修飾符用於將普通函式轉換為 Ray 任務,使其能夠在分散式環境中執行。
  • ray.get() 用於取得任務的執行結果,如果結果尚未準備好,它將阻塞直到結果可用。
  • ray.shutdown() 用於關閉 Ray 環境,釋放資源。

Ray Core 簡介:Python 分散式運算的新選擇

Ray Core 是 Ray 專案中的核心元件,提供了一個通用且直觀的分散式運算介面。對於 Python 開發者來說,Ray Core 的學習曲線相對平緩,因為它使用了熟悉的概念,如裝飾器、函式和類別。

為什麼需要 Ray Core?

Python 的 Global Interpreter Lock(GIL)限制了多執行緒的效能,使得 CPU 密集型任務無法充分利用多核心處理器。Ray Core 透過將任務分散到不同的程式甚至不同的機器上,繞過了 GIL 的限制,從而實作了真正的平行處理。

第一個 Ray Core 例子

假設我們有一個從資料函式庫檢索資料的函式 retrieve,並且這個操作非常耗時。我們可以透過 Ray Core 將這個函式轉換為一個分散式任務。

import time
import ray

# 初始化 Ray
ray.init()

database = [
    "Learning", "Ray",
    "Flexible", "Distributed", "Python", "for", "Machine", "Learning"
]

def retrieve(item):
    time.sleep(item / 10.)
    return item, database[item]

@ray.remote
def retrieve_task(item):
    return retrieve(item)

#### 內容解密:
- `retrieve` 函式模擬了一個耗時的操作透過 `time.sleep` 來模擬真實世界的延遲
- `@ray.remote` 裝飾器將 `retrieve_task` 函式轉換為一個 Ray 任務使其能夠在不同的程式或機器上執行
- 這種方式使得原本依序執行的任務能夠平行化大幅提升整體效能

### 平行化處理的威力

如果我們直接呼叫 `retrieve` 函式 8總執行時間大約是 2.8但是如果我們使用 Ray Core 將這些任務平行化理論上總執行時間不會超過最長的單一任務時間0.7)。

```python
start = time.time()
data = [retrieve_task.remote(item) for item in range(8)]
results = ray.get(data)
print(f'Runtime: {time.time() - start:.2f} seconds, data:')
for result in results:
    print(result)

內容解密:

  • retrieve_task.remote(item) 將任務提交給 Ray 執行。
  • ray.get(data) 等待所有任務完成並取得結果。
  • 這種平行化的方式能夠顯著減少總執行時間,充分利用多核心處理器的優勢。