網路管理中,IP 位址的唯一性與裝置日誌的收集至關重要。本文介紹如何結合 Python 與 Netmiko、Paramiko 等函式庫,有效偵測網路中的重複 IP 位址,並利用多執行緒技術同時從多台裝置收集日誌,提升管理效率。程式碼範例中,使用 show ip interface brief 命令取得裝置介面資訊,並透過正規表示式比對目標 IP。多執行緒部分則示範如何利用 threading 模組建立多個執行緒,平行連線裝置、執行命令及收集日誌,最後將結果輸出或儲存至檔案。
網路裝置中重複IP位址的偵測與多執行緒日誌收集
在網路管理中,確保IP位址的唯一性至關重要。本文將介紹如何使用Python腳原本偵測網路中的重複IP位址,並探討如何利用多執行緒技術高效地收集裝置日誌。
重複IP位址偵測
程式碼解析
以下是一個使用Netmiko和正規表示式(re)模組來偵測重複IP位址的範例程式碼:
from netmiko import Netmiko
import re
# 定義網路裝置資訊
device1 = {"host": "10.10.10.1", "username": "admin", "password": "cisco", "device_type": "cisco_ios", "global_delay_factor": 0.1}
device2 = {"host": "10.10.10.2", "username": "admin", "password": "cisco", "device_type": "cisco_ios", "global_delay_factor": 0.1}
device3 = {"host": "10.10.10.3", "username": "admin", "password": "cisco", "device_type": "cisco_ios", "global_delay_factor": 0.1}
host = [device1, device2, device3]
check_ip = "10.10.10.2" # 待檢查的IP位址
duplicated_list = [] # 用於儲存重複IP位址的列表
command = "show ip interface brief" # 要執行的命令
for ip in host:
print(f"\n
---
嘗試登入:{ip['host']}
---
\n")
try:
net_connect = Netmiko(**ip)
output = net_connect.send_command(command)
duplicate_ip = re.findall(check_ip, output)
while duplicate_ip:
interface = re.findall(f"(.*){check_ip}", output)
duplicated_list.append(check_ip)
duplicate_device = ip["host"]
break
except:
print(f"***無法登入到 {ip['host']}")
if duplicated_list:
print(f"
---
-
---
--\n重複IP:{check_ip} \n重複裝置IP位址:{duplicate_device} \n介面:{interface[0]} \n
---
-
---
-
---
")
else:
print(f"{check_ip} IP位址可使用")
內容解密:
- 裝置連線與命令執行:使用Netmiko建立與網路裝置的連線,並執行
show ip interface brief命令來收集介面資訊。 - 重複IP偵測:利用正規表示式在命令輸出中搜尋待檢查的IP位址。若找到,則將其相關資訊(如介面資訊和裝置IP)儲存起來。
- 結果輸出:若發現重複IP位址,則輸出相關資訊;否則,提示該IP位址可供使用。
多執行緒日誌收集
傳統的循序執行方式在處理大量裝置時效率低下。以下介紹如何使用Python的多執行緒技術來加速日誌收集過程。
多執行緒技術簡介
多執行緒允許多個任務平行執行,顯著提高了程式的執行效率。在網路管理中,這意味著可以同時連線到多台裝置並收集日誌,大大縮短了整體處理時間。
範例程式碼
import paramiko
import threading
from time import sleep
# 假設我們有一個函式用於連線裝置並收集日誌
def collect_logs(device_ip, command):
# 建立SSH連線
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(device_ip, username='admin', password='cisco')
# 執行命令並收集輸出
stdin, stdout, stderr = ssh.exec_command(command)
output = stdout.read().decode()
# 處理輸出(例如,儲存到檔案或資料函式庫)
print(f"從 {device_ip} 收集的日誌:{output}")
ssh.close()
# 定義裝置IP列表和命令
devices = ["10.10.10.1", "10.10.10.2", "10.10.10.3"]
command = "show ip interface brief"
# 使用多執行緒收集日誌
threads = []
for device in devices:
thread = threading.Thread(target=collect_logs, args=(device, command))
threads.append(thread)
thread.start()
# 等待所有執行緒完成
for thread in threads:
thread.join()
內容解密:
- 多執行緒建立:為每個裝置建立一個執行緒,負責連線到該裝置並執行指定的命令。
- 平行執行:所有執行緒同時啟動,實作平行處理。
- 結果處理:每個執行緒將收集到的日誌輸出或儲存到指定位置。
透過上述方法,可以大幅提升網路管理和維護的效率。特別是在需要處理大量網路裝置的場景中,多執行緒技術能夠顯著減少整體作業時間,提高工作效率。
多執行緒網路裝置日誌收集工具實作
在現代網路管理中,自動化收集網路裝置的日誌資訊是一項重要的任務。本文將介紹如何使用Python的Paramiko函式庫和多執行緒技術,實作同時對多台網路裝置執行命令並收集日誌的功能。
實作步驟解析
1. 多執行緒函式設計
首先,我們定義了一個名為SSH_Thread的函式,用於讀取包含裝置IP清單的檔案,並為每個IP建立一個執行緒來執行ssh_conn函式。
def SSH_Thread():
with open("ip_list.txt") as r:
host = r.read()
host = re.split("\n", host)
for ip in host:
trd = threading.Thread(target=ssh_conn, args=(ip,))
trd.start()
2. SSH連線與命令執行
在ssh_conn函式中,我們實作了與網路裝置的SSH連線、命令執行以及日誌收集。
def ssh_conn(ip):
with open("command_list.txt") as c:
command_list = c.read()
command_list = re.split("\n", command_list)
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(ip, 22, "admin", "cisco")
commands = client.invoke_shell()
result = ""
for comm in command_list:
commands.send(f"{comm} \n")
sleep(1.5)
output = commands.recv(1000000).decode("utf-8").replace("\r", "")
result += str(output)
print(output)
with open(f"{ip}.log", "a") as wr:
wr.write(result)
#### 內容解密:
- 開啟IP清單檔案:使用
with open()陳述式開啟ip_list.txt檔案,讀取所有IP並以換行符\n分割成清單。 - 建立多執行緒:遍歷IP清單,為每個IP建立一個執行緒,目標函式為
ssh_conn,並傳入IP作為引數。 - SSH連線:使用Paramiko函式庫建立與網路裝置的SSH連線,設定主機金鑰政策並連線到裝置。
- 命令執行:讀取
command_list.txt中的命令清單,遍歷執行每條命令,並將輸出結果儲存在result變數中。 - 日誌儲存:將收集到的日誌資訊寫入以對應IP命名的檔案中。
自定義工具開發
除了日誌收集工具,我們還可以使用Python開發其他自定義工具,如IP位址驗證器和子網計算器。
IP位址驗證器實作
enter_ip = input("\nEnter an IP address: ")
ip = enter_ip.split(".")
valid = 0
if len(ip) == 4:
try:
for x in ip:
if 0 <= int(x) <= 255:
valid += 1
else:
print("This is NOT a VALID IP Address")
break
if valid == 4:
print("This is a VALID IP Address")
except ValueError:
print("This is NOT a VALID IP Address")
else:
print("This is NOT a VALID IP Address")
#### 內容解密:
- 使用者輸入:提示使用者輸入一個IP位址。
- IP分割:將輸入的IP字串以
.分割成四個部分。 - 驗證邏輯:檢查分割後的每個部分是否為數字且在0到255之間,若全部符合則判定為有效IP位址。
- 錯誤處理:使用
try-except捕捉非數字輸入的情況,並給出相應的錯誤提示。
子網路計算器實作與IP位址驗證
IP 位址驗證
在網路程式設計中,驗證IP位址的有效性是一個基本且重要的任務。以下是一個簡單的Python指令碼,用於檢查輸入的IP位址是否有效。
enter_ip = input("\n輸入一個IP位址: ")
ip = enter_ip.split(".")
valid = 0
if len(ip) == 4:
try:
for x in ip:
if 0 <= int(x) < 256:
valid = valid + 1
else:
print("這不是一個有效的IP位址")
break
if valid == 4:
print(f"{enter_ip} 是一個有效的IP位址")
except:
print ("這不是一個有效的IP位址")
else:
print ("這不是一個有效的IP位址")
程式碼解析:
- 首先,程式要求使用者輸入一個IP位址,並將其以點號(
.)分割成四個部分。 - 檢查分割後的IP位址部分是否恰好為四個,若不是,則判定為無效IP位址。
- 對每個部分進行檢查,確保其為數字且介於0至255之間。若任何部分不符合此條件,則判定為無效IP位址。
- 若所有部分均有效,則輸出該IP位址有效;否則,輸出無效。
子網路計算器實作
進一步地,我們可以實作一個子網路計算器,不僅能夠驗證IP位址,還能根據輸入的子網路遮罩計算出相關網路引數。
enter_ip = input("\n輸入一個IP位址: ")
octet_list = []
ip = enter_ip.split(".")
for octet in ip:
try:
octet_list.append(int(octet))
except:
continue
if len(octet_list) == 4 and all(0 <= octet <= 255 for octet in octet_list):
mask = input("\n輸入子網路遮罩(1至32): ")
try:
number = int(mask)
if 0 < number <= 32:
a = int(number / 8)
b = number % 8
octet1 = 2 ** 8 - 2 ** (8 - b)
z = octet_list[a]
k = int(z / 2 ** (8 - b))
net = ((2 ** (8 - b)) * k)
brod = ((2 ** (8 - b)) * (k + 1)) - 1
if a == 0:
subnet = f"{octet1}.0.0.0"
wildcard = f"{255-octet1}.255.255.255"
total_host = ((256-octet1)*(256**3))-2
network = f"{net}.0.0.1"
broadcast = f"{brod}.255.255.255"
min_host = f"{net}.0.0.2"
max_host = f"{brod}.255.255.254"
elif a == 1:
subnet = f"255.{octet1}.0.0"
wildcard = f"0.{255-octet1}.255.255"
total_host = ((256-octet1)*(256**2))-2
network = f"{octet_list[0]}.{net}.0.1"
broadcast = f"{octet_list[0]}.{brod}.255.255"
min_host = f"{octet_list[0]}.{net}.0.2"
max_host = f"{octet_list[0]}.{brod}.255.254"
elif a == 2:
subnet = f"255.255.{octet1}.0"
wildcard = f"0.0.{255-octet1}.255"
total_host = ((256-octet1)*256)-2
network = f"{octet_list[0]}.{octet_list[1]}.{net}.1"
broadcast = f"{octet_list[0]}.{octet_list[1]}.{brod}.255"
min_host = f"{octet_list[0]}.{octet_list[1]}.{net}.2"
max_host = f"{octet_list[0]}.{octet_list[1]}.{brod}.254"
elif a == 3:
subnet = f"255.255.255.{octet1}"
wildcard = f"0.0.0.{255-octet1}"
total_host = (256-octet1)-2
network = f"{octet_list[0]}.{octet_list[1]}.{octet_list[2]}.{net}"
broadcast = f"{octet_list[0]}.{octet_list[1]}.{octet_list[2]}.{brod}"
min_host = f"{octet_list[0]}.{octet_list[1]}.{octet_list[2]}.{net+1}"
max_host = f"{octet_list[0]}.{octet_list[1]}.{octet_list[2]}.{brod-1}"
print("
---
-
---
-
---
--\nIP 位址: {}".format(enter_ip))
print("子網路遮罩: {}".format(mask))
print("子網路: {}".format(subnet))
print("萬用字元: {}".format(wildcard))
print("總主機數: {}".format(total_host))
print("網路位址: {}".format(network))
print("廣播位址: {}".format(broadcast))
print("IP 位址範圍: {} - {}".format(min_host, max_host))
else:
print("錯誤:無效的子網路遮罩")
except:
print("錯誤:無效的子網路遮罩")
else:
print("錯誤:無效的IP位址")
程式碼解析:
此指令碼首先檢查輸入的IP位址是否有效,然後要求輸入子網路遮罩。根據子網路遮罩,計算出子網路、萬用字元、總主機數、網路位址、廣播位址以及可用IP位址範圍等引數。