在建構分散式監控系統時,跨行程事件的協調與任務排程至關重要。本文利用 Python 的 multiprocessing 模組,實作了跨行程事件的產生與監聽,並以此為基礎,構建了一個類別似 Cron 的排程器,用於定時觸發監控任務。事件產生器 Oscillator 負責週期性地設定事件,而事件監聽器 Scheduler 則會在事件被設定時執行對應的操作。透過分享的事件物件,不同行程得以協同工作。此外,文章也說明瞭如何透過票務佇列管理待處理的監控任務,並利用 XML-RPC 實作伺服器與遠端監控代理程式之間的通訊。最後,文章還討論了遠端監控代理程式的設計,包括感測器互動、自我組態、自我更新以及系統的安全性考量。
管理與監控子系統:跨行程事件管理與排程實作
在分散式監控系統中,事件管理與排程是核心功能之一。本章節將探討如何使用Python的multiprocessing模組實作跨行程事件管理,以及如何建立一個類別似Cron的排程器。
跨行程事件管理機制
為了實作跨行程的事件管理,我們需要使用multiprocessing.Manager類別來建立分享的事件物件。這個事件物件可以被多個行程分享,並且可以用來觸發特定的動作。
Oscillator類別:事件產生器
首先,我們定義了一個Oscillator類別,負責週期性地產生事件。這個類別繼承自multiprocessing.Process,並且使用time.sleep()函式來控制事件產生的間隔。
class Oscillator(multiprocessing.Process):
def __init__(self, event, period):
self.period = period
self.event = event
super(Oscillator, self).__init__()
def run(self):
try:
while True:
self.event.clear()
time.sleep(self.period)
self.event.set()
except KeyboardInterrupt:
pass
內容解密:
Oscillator類別初始化時接收一個事件物件和一個週期引數。- 在
run()方法中,進入無限迴圈,週期性地清除和設定事件狀態。 - 使用
time.sleep()來控制事件產生的間隔。
Scheduler類別:事件監聽器
接下來,我們定義了一個Scheduler類別,用於監聽事件並執行特定的動作。在這個例子中,當事件被觸發時,Scheduler會列印出目前的時間。
class Scheduler(multiprocessing.Process):
def __init__(self, event):
self.event = event
super(Scheduler, self).__init__()
def run(self):
try:
while True:
self.event.wait()
print(datetime.now())
except KeyboardInterrupt:
pass
內容解密:
Scheduler類別初始化時接收一個事件物件。- 在
run()方法中,進入無限迴圈,等待事件被觸發。 - 當事件被觸發時,列印出目前的時間。
跨行程事件管理的實作
現在,讓我們看看如何將這些元件組合起來。在主行程中,我們建立了一個Manager例項,並使用它來建立一個分享的事件物件。然後,我們建立了Oscillator和Scheduler例項,並將分享的事件物件傳遞給它們。
mgr = multiprocessing.Manager()
e = mgr.Event()
o = Oscillator(e, 60)
s = Scheduler(e)
o.start()
s.start()
try:
while len(multiprocessing.active_children()) != 0:
time.sleep(1)
except KeyboardInterrupt:
o.terminate()
s.terminate()
o.join()
s.join()
內容解密:
- 建立一個
Manager例項,並使用它來建立一個分享的事件物件。 - 建立
Oscillator和Scheduler例項,並將分享的事件物件傳遞給它們。 - 啟動
Oscillator和Scheduler行程。 - 在主行程中,等待所有子行程結束。
Cron-like排程器實作
在Ticket Scheduler的實作中,我們使用了一個類別似Cron的排程機制。這個機制根據探測間隔(probing interval)來決定何時執行特定的動作。
TicketScheduler類別:排程器實作
class TicketScheduler(multiprocessing.Process):
def __init__(self, event):
self.event = event
self.con = sqlite3.connect('monitor.db')
super(TicketScheduler, self).__init__()
def run(self):
try:
from datetime import datetime
while True:
self.event.wait()
res = [r[0] for r in self.con.execute("""SELECT hostprobe_id
FROM probingschedule
WHERE (strftime('%s', 'now')/60) %
probingschedule.probeinterval = 0;""")]
for probe_id in res:
self.con.execute("INSERT INTO ticketqueue VALUES (NULL, ?, datetime('now'), 0)", (probe_id,))
self.con.commit()
except KeyboardInterrupt:
pass
內容解密:
TicketScheduler類別初始化時接收一個事件物件,並建立一個到SQLite資料函式庫的連線。- 在
run()方法中,進入無限迴圈,等待事件被觸發。 - 當事件被觸發時,查詢資料函式庫中需要執行的探測任務,並將對應的記錄插入到票券佇列(ticket queue)中。
管理與監控子系統的實作細節
在設計一個分散式監控系統時,管理與監控子系統扮演著至關重要的角色。本章節將探討該子系統的關鍵組成部分,包括票務處理流程、資料函式庫設計以及XML-RPC通訊機制。
票務分派器的運作機制
當票務被置於待處理票務佇列中後,系統會啟動一個名為票務分派器(Ticket Dispatcher)的程式。該程式負責搜尋待處理的票務,並向客戶端主機傳送請求。客戶端需要實作cmd_submit_reading XMLRPC呼叫,該呼叫預期接收以下資訊:
- 票號
- 感測器名稱
- 感測器引數
此外,系統還需要知道XML-RPC伺服器的主機名稱和埠號。這些資訊分散在多個資料表中,因此需要透過SQL查詢將其整合起來。
資料函式庫設計與ER圖
圖9-3展示了包含票務分派器元件所需資訊的資料表之間的關聯。該ER圖有助於定義SQL查詢陳述式,以檢索必要的資料。
實作票務分派器
首先,系統需要從ticketqueue表中檢索尚未分派的票務ID和hostprobe行ID:
pending_tickets = [r for r in self.con.execute("""SELECT id, hostprobe_id
FROM ticketqueue
WHERE dispatched = 0""")]
接著,系統需要根據hostprobe行ID檢索相關的感測器名稱、主機資訊和引數。這些資訊分散在多個表中,需要透過JOIN操作將其合併為單一SQL陳述式。
檢索多表資訊的SQL陳述式
for (ticket_id, hostprobe_id) in pending_tickets:
res = [r for r in self.con.execute("""SELECT host.address,
host.port,
sensor.name,
probe.parameter
FROM hostprobe, host, probe, sensor
WHERE hostprobe.id=?
AND hostprobe.host_id = host.id
AND hostprobe.probe_id = probe.id
AND probe.sensor_id = sensor.id""",
(hostprobe_id,) )][0]
self._send_request(ticket_id, res[0], res[1], res[2], res[3])
self.con.execute("UPDATE ticketqueue SET dispatched=1 WHERE id=?", (ticket_id,))
self.con.commit()
XML-RPC呼叫實作
在取得必要資訊後,系統會呼叫self._send_request函式,向遠端系統傳送XML-RPC請求。該函式的實作如清單9-12所示:
def _send_request(self, ticket, address, port, sensor, parameter_string=None):
url = "http://%s:%s/xmlrpc/" % (address, port)
proxy = xmlrpclib.ServerProxy(url, allow_none=True)
if parameter_string:
parameter = parameter_string.split(',')
else:
parameter = None
res = proxy.cmd_submit_reading(ticket, sensor, parameter)
return
內容解密:
url變數的構建:使用address和port組合出XML-RPC伺服器的URL。xmlrpclib.ServerProxy的使用:建立一個XML-RPC代理物件,用於與遠端伺服器進行通訊。- 引數處理:如果
parameter_string存在,則將其分割成列表;否則,設為None。 - XML-RPC呼叫:呼叫遠端伺服器的
cmd_submit_reading方法,傳遞票號、感測器名稱和引數。
遠端監控代理程式(Remote Monitoring Agents)設計與實作
本章是關於簡單分散式監控系統實作細節的四個章節中的第二個章節。在前一章中,我們已經概述了系統的整體設計並詳細描述了伺服器的實作。本章將重點介紹監控代理程式的實作、與感測器應用的互動以及安全模型。
設計
我們將進一步擴充套件前一章簡要提到的客戶端或監控代理程式的設計。如您所知,監控代理程式負責接收感測器讀取命令並回傳結果。它依賴外部工具來執行測量。
被動元件
監控代理程式元件將是一個被動元件,這意味著它只會對接收到的命令做出反應以執行操作。這種架構使我們能夠對整個系統的操作和通訊流程進行精細控制。監控伺服器可以決定何時查詢以及查詢什麼,並且這種行為可能會根據代理程式之前的回應而改變。
架構
監控代理程式的架構分為兩個不同的元件:作為守護程式執行並接受來自監控伺服器命令的代理程式碼;以及負責檢查系統狀態的感測器。
感測器程式碼可以是任何應用程式,並在代理程式接收到執行檢查的命令時被呼叫。由於感測器可以用任何程式設計或指令碼語言編寫,這為用於監控系統的工具提供了更大的靈活性。
動作
監控代理程式的主要目的是呼叫感測器程式碼、讀取結果並將其提交給監控伺服器。除此之外,它還執行自我組態和自我更新任務。
接受新組態
安全模型意味著每個監控代理程式必須知道其監控伺服器的地址並將其用於通訊。換句話說,代理程式在被查詢時,不會回應請求者,而是啟動與已知伺服器的連線並提交請求的資料。
這種方法需要將伺服器URL(用於XML-RPC通訊)儲存在每個代理程式的本地。儘管伺服器地址不太可能更改,但我們仍然需要處理它更改的情況。更改組態的一種方法是使用某種組態管理系統,如Puppet、Chef或CFEngine來維護組態;但我們將實作一個機制,使客戶端接受更新其組態的請求。
當客戶端接收到更新組態的命令時,它將啟動與當前註冊的伺服器的連線並請求新的URL。一旦檢索到新的URL,它將嘗試連線到新的伺服器。如果連線失敗,組態將不會被更新;否則,新資料將覆寫現有的設定,並且未來將使用新的URL進行通訊。
import xmlrpc.client
def update_configuration(server_url, new_url):
try:
# 建立與伺服器的連線
server = xmlrpc.client.ServerProxy(server_url)
# 請求新的URL
new_server_url = server.get_new_url()
# 更新本地組態
with open('config.txt', 'w') as f:
f.write(new_server_url)
return True
except Exception as e:
print(f"更新組態失敗:{e}")
return False
#### 內容解密:
1. **建立與伺服器的連線**:使用`xmlrpc.client.ServerProxy`建立與指定伺服器URL的連線。
2. **請求新的URL**:呼叫伺服器的`get_new_url`方法來取得新的伺服器URL。
3. **更新本地組態**:將取得的新URL寫入本地組態檔案中。
4. **錯誤處理**:捕捉並處理可能發生的異常,例如網路錯誤或伺服器回應錯誤。
升級感測器
感測器程式碼可能會在引入新功能(如新增新引數或改進現有檢查)時發生變化。因此,代理程式必須能夠使用新程式碼更新其感測器的基礎。
此功能與組態更新類別似——代理程式在接收到更新其感測器應用的命令後,會啟動與伺服器的連線並請求新程式碼。伺服器從其儲存函式庫傳送包含新程式碼的存檔。
當程式碼傳輸完成後,代理程式將程式碼解封裝到臨時位置,執行簡單的檢查命令以確保可執行檔未損壞,如果此操作成功,則用新應用程式替換現有的程式碼。
#!/bin/bash
# 更新感測器程式碼
update_sensors() {
# 從伺服器下載新的感測器程式碼存檔
wget http://server/sensors.tar.gz -O /tmp/sensors.tar.gz
# 解壓縮存檔到臨時位置
tar -xvf /tmp/sensors.tar.gz -C /tmp/sensors/
# 檢查新程式碼是否正確
if /tmp/sensors/check.sh; then
# 替換現有的感測器程式碼
cp -r /tmp/sensors/* /path/to/sensors/
else
echo "感測器更新失敗:檢查失敗"
fi
}
#### 內容解密:
1. **下載新的感測器程式碼存檔**:使用`wget`從指定的伺服器URL下載新的感測器程式碼存檔到臨時位置。
2. **解壓縮存檔**:使用`tar`命令解壓縮下載的存檔到臨時目錄。
3. **檢查新程式碼**:執行一個簡單的檢查指令碼來驗證新感測器程式碼的正確性。
4. **替換現有的感測器程式碼**:如果檢查成功,將新的感測器程式碼複製到指定的目錄,替換現有的程式碼。
安全模型
這種方法可能會引起一些安全問題,因為理論上任何人都可以向代理程式程式傳送查詢並取得讀數。有幾種可能的解決方案可以解決這個問題。一種方法是使用某種身份驗證機制,使請求者識別自己,而代理程式只回應授權方。另一種更簡單的方法是將請求-回應對話解耦為兩個不同的部分:請求或命令階段和回應(實際上是由代理元件啟動的操作)。
因此,我們不會對誰可以連線並向代理程式傳送請求實施任何限制。這樣做會增加另一層安全性,但也會帶來一些複雜性。如果您有興趣改進安全模型,您可能需要考慮新增雙向SSL證書,以便只有擁有SSL金鑰並且其金鑰已佈署在代理上的應用才能連線。