在資料工程領域,處理大量資料的效率至關重要。MapReduce 作為一種經典的分散式運算模型,被廣泛應用於各種大資料處理場景。本文將示範如何利用 Ray 框架,以 Python 語言實作一個簡化的 MapReduce 範例,用於統計多個檔案中的單詞出現頻率。這個範例將涵蓋資料分割槽、對映函式定義、分散式任務執行以及結果彙總等關鍵步驟,展現 Ray 框架在分散式運算方面的優勢。藉由程式碼的逐步講解,讀者可以清晰地理解 MapReduce 的運作原理,並學習如何使用 Ray 進行高效的資料處理。
使用Ray實作簡單的MapReduce範例
在分散式運算領域中,MapReduce是一個非常重要的里程碑。許多成功的Big Data技術,如Hadoop,都是根據這個程式設計模型。現在,我們將在Ray的背景下重新審視MapReduce。為了保持簡單,我們將把MapReduce的實作限制在一個特定的使用案例上,即計算多個檔案中的單詞出現次數。
為何選擇MapReduce?
MapReduce的實作非常經典,值得了解。這個經典的正規化根據三個簡單的步驟:
- 將一組檔案轉換或“對映”成鍵值對。在我們的例子中,當我們遇到一個單詞時,我們的對映函式將發出對(單詞,1)來表示我們找到了它的一個出現。
- 收集和分組對映階段的所有輸出。這可能需要在節點之間進行資料洗牌。
- 聚合或“減少”洗牌步驟中的元素。在我們的例子中,我們簡單地將每個節點上的單詞出現次數相加以獲得最終的計數。
Ray中的MapReduce實作
首先,我們載入範例資料以更好地瞭解我們正在操作的內容:
import subprocess
zen_of_python = subprocess.check_output(["python", "-c", "import this"])
corpus = zen_of_python.split()
num_partitions = 3
chunk = len(corpus) // num_partitions
partitions = [
corpus[i * chunk: (i + 1) * chunk] for i in range(num_partitions)
]
程式碼解密:
- 這段程式碼首先使用
subprocess模組執行Python命令import this來取得“Python之禪”的內容。 - 將內容分割成單詞列表,並將其分成三個分割槽。
對映和洗牌檔案資料
要定義對映階段,我們需要一個對映函式,將其應用於每個檔案。在我們的例子中,我們希望為每個在檔案中找到的單詞發出對(單詞,1)。
def map_function(document):
for word in document:
yield (word, 1)
程式碼解密:
map_function接受一個檔案(實際上是單詞列表),並對每個單詞產生一個鍵值對(單詞,1)。- 使用
yield關鍵字使函式成為生成器,這是一種在Python中建立迭代器的快速方法。
使用Ray平行化MapReduce
現在,我們將使用Ray來平行化MapReduce的計算。首先,我們需要定義一個Ray任務來執行對映階段:
import ray
ray.init()
@ray.remote
def map_task(partition):
result = {}
for word, count in map_function(partition):
if word not in result:
result[word] = 0
result[word] += count
return result
# 對每個分割槽執行對映任務
futures = [map_task.remote(partition) for partition in partitions]
results = ray.get(futures)
程式碼解密:
- 初始化Ray叢集。
- 定義一個遠端任務
map_task,它對給定的分割槽執行對映函式,並將結果聚合在一個字典中。 - 對每個分割槽遠端執行
map_task,並使用ray.get取得結果。
結合結果
最後一步是結合對映階段的結果以獲得最終的單詞計數:
def reduce_task(results):
final_result = {}
for result in results:
for word, count in result.items():
if word not in final_result:
final_result[word] = 0
final_result[word] += count
return final_result
final_counts = reduce_task(results)
print(final_counts)
程式碼解密:
reduce_task函式接受對映階段的結果列表,並將它們合併成一個最終結果字典。- 列印最終的單詞計數結果。
Ray 中的簡單 MapReduce 實作範例
在分散式運算中,MapReduce 是一種常見的程式設計模型,用於處理和生成大型資料集。本篇文章將介紹如何使用 Ray 實作一個簡單的 MapReduce 範例。
Map 階段的實作
首先,我們定義一個 map_function,它將輸入的檔案(document)轉換為單詞(word)及其出現次數(count)的鍵值對(key-value pair)。
def map_function(document):
for word in document.lower().split():
yield word.encode('utf-8'), 1
內容解密:
map_function接受一個檔案(document)作為輸入,並將其轉換為小寫後分割成單詞。- 對每個單詞,使用
yield傳回一個鍵值對,其中鍵是單詞的 UTF-8 編碼位元組串,值是 1,表示該單詞出現了一次。
接下來,我們定義一個 Ray 任務 apply_map,它將 map_function 應用於整個語料函式庫(corpus),並將結果分派到不同的分割區(partition)。
import ray
@ray.remote
def apply_map(corpus, num_partitions=3):
map_results = [list() for _ in range(num_partitions)]
for document in corpus:
for result in map_function(document):
first_letter = result[0].decode("utf-8")[0]
word_index = ord(first_letter) % num_partitions
map_results[word_index].append(result)
return map_results
內容解密:
apply_map是一個 Ray 遠端任務,它接受一個語料函式庫和分割區數量作為引數。- 對語料函式庫中的每個檔案,應用
map_function生成鍵值對。 - 使用單詞的第一個字母計算
word_index,並根據word_index將鍵值對分配到不同的分割區。 - 傳回包含所有分割區結果的列表。
Reduce 階段的實作
在 Reduce 階段,我們定義一個 apply_reduce 任務,它將來自不同分割區的鍵值對匯總,計算每個單詞的總出現次數。
@ray.remote
def apply_reduce(*results):
reduce_results = dict()
for res in results:
for key, value in res:
if key not in reduce_results:
reduce_results[key] = 0
reduce_results[key] += value
return reduce_results
內容解密:
apply_reduce接受多個結果(*results)作為輸入,它們是來自不同分割區的鍵值對列表。- 使用一個字典
reduce_results來匯總每個單詞的出現次數。 - 對每個輸入結果,遍歷其鍵值對,並更新字典中的計數。
執行 MapReduce
最後,我們執行 MapReduce 工作流程,包括呼叫 apply_map 和 apply_reduce 任務,並收集最終結果。
map_results = [apply_map.options(num_returns=num_partitions).remote(data, num_partitions) for data in partitions]
outputs = []
for i in range(num_partitions):
outputs.append(apply_reduce.remote(*[partition[i] for partition in map_results]))
counts = {k: v for output in ray.get(outputs) for k, v in output.items()}
sorted_counts = sorted(counts.items(), key=lambda item: item[1], reverse=True)
for count in sorted_counts:
print(f"{count[0].decode('utf-8')}: {count[1]}")
內容解密:
- 首先,遠端執行
apply_map任務,並指定傳回值的數量等於分割區數量。 - 然後,對每個分割區索引,收集來自所有
apply_map呼叫的相應結果,並傳遞給apply_reduce任務。 - 最後,收集所有
apply_reduce的輸出,將它們合併成一個字典,按單詞出現次數排序,並列印結果。
使用Ray構建分散式強化學習應用
在前面的章節中,我們已經瞭解了Ray API的基本操作,包括如何在物件儲存中存取值,如何使用@ray.remote修飾符宣告Python函式為Ray任務,以及如何使用.remote()呼叫在Ray叢集上執行它們。同樣,我們也熟悉瞭如何從Python類別宣告Ray角色,並利用它們進行有狀態的分散式計算。
本章節將透過建立一個強化學習(RL)問題來展示如何使用Ray構建更真實的分散式應用。我們將從零開始實作一個基本的RL演算法,並使用Ray任務和角色將其平行化到本地叢集,所有這些都將在少於250行程式碼中完成。
強化學習簡介
強化學習是一種機器學習的子領域,主要研究如何讓代理(agent)在環境中透過觀察和採取行動來學習並最大化某種獎勵。一個典型的例子是自動駕駛汽車或遊戲AI。
強化學習的特點
- 在動態環境中運作
- 對環境變化做出反應
- 採取一系列行動
- 實作長期目標
透過觀察環境,代理可以學習探索可能的行動並隨著時間的推移提出更好的解決方案。
設定一個簡單的迷宮問題
在本章節中,我們將建立一個簡單的迷宮問題來示範強化學習的基本概念。我們鼓勵讀者跟隨我們的程式碼一起建立這個應用。
import ray
import numpy as np
# 初始化Ray
ray.init()
# 定義迷宮環境
class Maze:
def __init__(self, width, height):
self.width = width
self.height = height
self.state = (0, 0)
def reset(self):
self.state = (0, 0)
return self.state
def step(self, action):
# 根據行動更新狀態
if action == 'up' and self.state[1] < self.height - 1:
self.state = (self.state[0], self.state[1] + 1)
elif action == 'down' and self.state[1] > 0:
self.state = (self.state[0], self.state[1] - 1)
elif action == 'left' and self.state[0] > 0:
self.state = (self.state[0] - 1, self.state[1])
elif action == 'right' and self.state[0] < self.width - 1:
self.state = (self.state[0] + 1, self.state[1])
# 計算獎勵
reward = -1 # 每一步都給予負獎勵
if self.state == (self.width - 1, self.height - 1):
reward = 10 # 到達終點給予正獎勵
return self.state, reward
# 定義一個Ray任務來模擬迷宮環境
@ray.remote
class MazeActor:
def __init__(self, width, height):
self.maze = Maze(width, height)
def reset(self):
return self.maze.reset()
def step(self, action):
return self.maze.step(action)
# 建立多個MazeActor例項
num_actors = 4
actors = [MazeActor.remote(5, 5) for _ in range(num_actors)]
# 重置所有迷宮環境
states = ray.get([actor.reset.remote() for actor in actors])
print(states)
# 對每個迷宮環境採取行動
actions = ['right'] * num_actors
next_states, rewards = zip(*ray.get([actor.step.remote(action) for actor, action in zip(actors, actions)]))
print(next_states, rewards)
程式碼解密:
- 初始化Ray:首先,我們需要初始化Ray以啟用分散式計算。
- 定義迷宮環境:
Maze類別定義了迷宮的狀態和行為,包括重置和採取行動。 - 定義Ray任務:
MazeActor類別被定義為一個Ray任務,使得我們可以在分散式環境中模擬多個迷宮。 - 建立多個MazeActor例項:我們建立了多個
MazeActor例項,以平行化模擬多個迷宮環境。 - 重置和採取行動:我們重置所有迷宮環境的狀態,並對每個環境採取行動,以展示如何使用Ray進行分散式強化學習。
簡易迷宮遊戲的實作與強化學習演算法的整合
在本文中,我們將實作一個簡單的2D迷宮遊戲,並利用強化學習(RL)演算法訓練一個智慧體(seeker)在迷宮中尋找目標(goal)。首先,我們需要定義迷宮的結構、智慧體的動作空間以及觀察空間。
迷宮環境的定義
首先,我們定義一個名為Discrete的類別,用於表示智慧體的動作空間。這個類別可以表示多個離散的動作,例如上下左右移動。
import random
class Discrete:
def __init__(self, num_actions: int):
"""離散動作空間,用於表示 num_actions 個動作"""
self.n = num_actions
def sample(self):
"""隨機取樣一個動作"""
return random.randint(0, self.n - 1)
# 定義一個具有4個動作的離散動作空間
space = Discrete(4)
print(space.sample())
內容解密:
Discrete類別用於表示離散的動作空間,例如上下左右移動。num_actions引數指定了動作空間的大小。sample方法用於隨機取樣一個動作。
接下來,我們定義一個名為Environment的類別,用於表示迷宮環境。這個類別包含了智慧體的位置、目標位置、動作空間以及觀察空間。
class Environment:
def __init__(self, *args, **kwargs):
"""初始化迷宮環境"""
self.seeker, self.goal = (0, 0), (4, 4)
self.info = {'seeker': self.seeker, 'goal': self.goal}
self.action_space = Discrete(4)
self.observation_space = Discrete(5*5)
內容解密:
Environment類別用於表示迷宮環境。seeker和goal屬性分別表示智慧體和目標的位置。action_space屬性表示智慧體的動作空間,是一個具有4個動作的離散動作空間。observation_space屬性表示觀察空間,是一個具有25個狀態的離散空間,用於表示智慧體的位置。
迷宮遊戲的實作
為了實作迷宮遊戲,我們需要定義幾個輔助方法,包括reset、get_observation、get_reward、is_done以及step。
def reset(self):
"""重置智慧體位置並傳回觀察值"""
self.seeker = (0, 0)
return self.get_observation()
def get_observation(self):
"""將智慧體位置編碼為整數"""
return 5 * self.seeker[0] + self.seeker[1]
def get_reward(self):
"""根據智慧體是否到達目標給予獎勵"""
return 1 if self.seeker == self.goal else 0
def is_done(self):
"""判斷遊戲是否結束"""
return self.seeker == self.goal
def step(self, action):
"""根據動作更新智慧體位置並傳回相關資訊"""
if action == 0: # 向下移動
self.seeker = (min(self.seeker[0] + 1, 4), self.seeker[1])
elif action == 1: # 向左移動
self.seeker = (self.seeker[0], max(self.seeker[1] - 1, 0))
elif action == 2: # 向上移動
self.seeker = (max(self.seeker[0] - 1, 0), self.seeker[1])
elif action == 3: # 向右移動
self.seeker = (self.seeker[0], min(self.seeker[1] + 1, 4))
return self.get_observation(), self.get_reward(), self.is_done(), self.info
內容解密:
reset方法重置智慧體位置並傳回初始觀察值。get_observation方法將智慧體位置編碼為整數。get_reward方法根據智慧體是否到達目標給予獎勵。is_done方法判斷遊戲是否結束。step方法根據動作更新智慧體位置並傳回相關資訊,包括新的觀察值、獎勵、遊戲是否結束以及額外資訊。