返回文章列表

網路裝置重複IP偵測與多執行緒日誌收集

本文探討如何使用 Python 偵測網路裝置中的重複 IP 位址,並利用多執行緒技術提升日誌收集效率。文章提供使用 Netmiko 和 Paramiko 函式庫的程式碼範例,示範如何連線到多台裝置、執行命令並收集輸出。此外,文章也包含 IP 位址驗證器和子網路計算器的實作範例,展現 Python

網路管理 Python

網路管理中,IP 位址的唯一性與裝置日誌的收集至關重要。本文介紹如何結合 Python 與 Netmiko、Paramiko 等函式庫,有效偵測網路中的重複 IP 位址,並利用多執行緒技術同時從多台裝置收集日誌,提升管理效率。程式碼範例中,使用 show ip interface brief 命令取得裝置介面資訊,並透過正規表示式比對目標 IP。多執行緒部分則示範如何利用 threading 模組建立多個執行緒,平行連線裝置、執行命令及收集日誌,最後將結果輸出或儲存至檔案。

網路裝置中重複IP位址的偵測與多執行緒日誌收集

在網路管理中,確保IP位址的唯一性至關重要。本文將介紹如何使用Python腳原本偵測網路中的重複IP位址,並探討如何利用多執行緒技術高效地收集裝置日誌。

重複IP位址偵測

程式碼解析

以下是一個使用Netmiko和正規表示式(re)模組來偵測重複IP位址的範例程式碼:

from netmiko import Netmiko
import re

# 定義網路裝置資訊
device1 = {"host": "10.10.10.1", "username": "admin", "password": "cisco", "device_type": "cisco_ios", "global_delay_factor": 0.1}
device2 = {"host": "10.10.10.2", "username": "admin", "password": "cisco", "device_type": "cisco_ios", "global_delay_factor": 0.1}
device3 = {"host": "10.10.10.3", "username": "admin", "password": "cisco", "device_type": "cisco_ios", "global_delay_factor": 0.1}

host = [device1, device2, device3]
check_ip = "10.10.10.2"  # 待檢查的IP位址
duplicated_list = []  # 用於儲存重複IP位址的列表
command = "show ip interface brief"  # 要執行的命令

for ip in host:
    print(f"\n
---
嘗試登入{ip['host']} 
---
\n")
    try:
        net_connect = Netmiko(**ip)
        output = net_connect.send_command(command)
        duplicate_ip = re.findall(check_ip, output)

        while duplicate_ip:
            interface = re.findall(f"(.*){check_ip}", output)
            duplicated_list.append(check_ip)
            duplicate_device = ip["host"]
            break
    except:
        print(f"***無法登入到 {ip['host']}")

if duplicated_list:
    print(f"
---
-
---
--\n重複IP{check_ip} \n重複裝置IP位址{duplicate_device} \n介面{interface[0]} \n 
---
-
---
-
---
")
else:
    print(f"{check_ip} IP位址可使用")

內容解密:

  1. 裝置連線與命令執行:使用Netmiko建立與網路裝置的連線,並執行show ip interface brief命令來收集介面資訊。
  2. 重複IP偵測:利用正規表示式在命令輸出中搜尋待檢查的IP位址。若找到,則將其相關資訊(如介面資訊和裝置IP)儲存起來。
  3. 結果輸出:若發現重複IP位址,則輸出相關資訊;否則,提示該IP位址可供使用。

多執行緒日誌收集

傳統的循序執行方式在處理大量裝置時效率低下。以下介紹如何使用Python的多執行緒技術來加速日誌收集過程。

多執行緒技術簡介

多執行緒允許多個任務平行執行,顯著提高了程式的執行效率。在網路管理中,這意味著可以同時連線到多台裝置並收集日誌,大大縮短了整體處理時間。

範例程式碼

import paramiko
import threading
from time import sleep

# 假設我們有一個函式用於連線裝置並收集日誌
def collect_logs(device_ip, command):
    # 建立SSH連線
    ssh = paramiko.SSHClient()
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    ssh.connect(device_ip, username='admin', password='cisco')
    
    # 執行命令並收集輸出
    stdin, stdout, stderr = ssh.exec_command(command)
    output = stdout.read().decode()
    
    # 處理輸出(例如,儲存到檔案或資料函式庫)
    print(f"從 {device_ip} 收集的日誌:{output}")
    
    ssh.close()

# 定義裝置IP列表和命令
devices = ["10.10.10.1", "10.10.10.2", "10.10.10.3"]
command = "show ip interface brief"

# 使用多執行緒收集日誌
threads = []
for device in devices:
    thread = threading.Thread(target=collect_logs, args=(device, command))
    threads.append(thread)
    thread.start()

# 等待所有執行緒完成
for thread in threads:
    thread.join()

內容解密:

  1. 多執行緒建立:為每個裝置建立一個執行緒,負責連線到該裝置並執行指定的命令。
  2. 平行執行:所有執行緒同時啟動,實作平行處理。
  3. 結果處理:每個執行緒將收集到的日誌輸出或儲存到指定位置。

透過上述方法,可以大幅提升網路管理和維護的效率。特別是在需要處理大量網路裝置的場景中,多執行緒技術能夠顯著減少整體作業時間,提高工作效率。

多執行緒網路裝置日誌收集工具實作

在現代網路管理中,自動化收集網路裝置的日誌資訊是一項重要的任務。本文將介紹如何使用Python的Paramiko函式庫和多執行緒技術,實作同時對多台網路裝置執行命令並收集日誌的功能。

實作步驟解析

1. 多執行緒函式設計

首先,我們定義了一個名為SSH_Thread的函式,用於讀取包含裝置IP清單的檔案,並為每個IP建立一個執行緒來執行ssh_conn函式。

def SSH_Thread():
    with open("ip_list.txt") as r:
        host = r.read()
        host = re.split("\n", host)

    for ip in host:
        trd = threading.Thread(target=ssh_conn, args=(ip,))
        trd.start()

2. SSH連線與命令執行

ssh_conn函式中,我們實作了與網路裝置的SSH連線、命令執行以及日誌收集。

def ssh_conn(ip):
    with open("command_list.txt") as c:
        command_list = c.read()
        command_list = re.split("\n", command_list)

    client = paramiko.SSHClient()
    client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    client.connect(ip, 22, "admin", "cisco")
    commands = client.invoke_shell()
    result = ""

    for comm in command_list:
        commands.send(f"{comm} \n")
        sleep(1.5)
        output = commands.recv(1000000).decode("utf-8").replace("\r", "")
        result += str(output)
        print(output)

    with open(f"{ip}.log", "a") as wr:
        wr.write(result)

#### 內容解密:

  1. 開啟IP清單檔案:使用with open()陳述式開啟ip_list.txt檔案,讀取所有IP並以換行符\n分割成清單。
  2. 建立多執行緒:遍歷IP清單,為每個IP建立一個執行緒,目標函式為ssh_conn,並傳入IP作為引數。
  3. SSH連線:使用Paramiko函式庫建立與網路裝置的SSH連線,設定主機金鑰政策並連線到裝置。
  4. 命令執行:讀取command_list.txt中的命令清單,遍歷執行每條命令,並將輸出結果儲存在result變數中。
  5. 日誌儲存:將收集到的日誌資訊寫入以對應IP命名的檔案中。

自定義工具開發

除了日誌收集工具,我們還可以使用Python開發其他自定義工具,如IP位址驗證器和子網計算器。

IP位址驗證器實作

enter_ip = input("\nEnter an IP address: ")
ip = enter_ip.split(".")
valid = 0

if len(ip) == 4:
    try:
        for x in ip:
            if 0 <= int(x) <= 255:
                valid += 1
            else:
                print("This is NOT a VALID IP Address")
                break
        if valid == 4:
            print("This is a VALID IP Address")
    except ValueError:
        print("This is NOT a VALID IP Address")
else:
    print("This is NOT a VALID IP Address")

#### 內容解密:

  1. 使用者輸入:提示使用者輸入一個IP位址。
  2. IP分割:將輸入的IP字串以.分割成四個部分。
  3. 驗證邏輯:檢查分割後的每個部分是否為數字且在0到255之間,若全部符合則判定為有效IP位址。
  4. 錯誤處理:使用try-except捕捉非數字輸入的情況,並給出相應的錯誤提示。

子網路計算器實作與IP位址驗證

IP 位址驗證

在網路程式設計中,驗證IP位址的有效性是一個基本且重要的任務。以下是一個簡單的Python指令碼,用於檢查輸入的IP位址是否有效。

enter_ip = input("\n輸入一個IP位址: ")
ip = enter_ip.split(".")
valid = 0
if len(ip) == 4:
    try:
        for x in ip:
            if 0 <= int(x) < 256:
                valid = valid + 1
            else:
                print("這不是一個有效的IP位址")
                break
        if valid == 4:
            print(f"{enter_ip} 是一個有效的IP位址")
    except:
        print ("這不是一個有效的IP位址")
else:
    print ("這不是一個有效的IP位址")

程式碼解析:

  1. 首先,程式要求使用者輸入一個IP位址,並將其以點號(.)分割成四個部分。
  2. 檢查分割後的IP位址部分是否恰好為四個,若不是,則判定為無效IP位址。
  3. 對每個部分進行檢查,確保其為數字且介於0至255之間。若任何部分不符合此條件,則判定為無效IP位址。
  4. 若所有部分均有效,則輸出該IP位址有效;否則,輸出無效。

子網路計算器實作

進一步地,我們可以實作一個子網路計算器,不僅能夠驗證IP位址,還能根據輸入的子網路遮罩計算出相關網路引數。

enter_ip = input("\n輸入一個IP位址: ")
octet_list = []
ip = enter_ip.split(".")

for octet in ip:
    try:
        octet_list.append(int(octet))
    except:
        continue

if len(octet_list) == 4 and all(0 <= octet <= 255 for octet in octet_list):
    mask = input("\n輸入子網路遮罩(1至32): ")
    try:
        number = int(mask)
        if 0 < number <= 32:
            a = int(number / 8)
            b = number % 8
            octet1 = 2 ** 8 - 2 ** (8 - b)

            z = octet_list[a]
            k = int(z / 2 ** (8 - b))
            net = ((2 ** (8 - b)) * k)
            brod = ((2 ** (8 - b)) * (k + 1)) - 1

            if a == 0:
                subnet = f"{octet1}.0.0.0"
                wildcard = f"{255-octet1}.255.255.255"
                total_host = ((256-octet1)*(256**3))-2
                network = f"{net}.0.0.1"
                broadcast = f"{brod}.255.255.255"
                min_host = f"{net}.0.0.2"
                max_host = f"{brod}.255.255.254"
            elif a == 1:
                subnet = f"255.{octet1}.0.0"
                wildcard = f"0.{255-octet1}.255.255"
                total_host = ((256-octet1)*(256**2))-2
                network = f"{octet_list[0]}.{net}.0.1"
                broadcast = f"{octet_list[0]}.{brod}.255.255"
                min_host = f"{octet_list[0]}.{net}.0.2"
                max_host = f"{octet_list[0]}.{brod}.255.254"
            elif a == 2:
                subnet = f"255.255.{octet1}.0"
                wildcard = f"0.0.{255-octet1}.255"
                total_host = ((256-octet1)*256)-2
                network = f"{octet_list[0]}.{octet_list[1]}.{net}.1"
                broadcast = f"{octet_list[0]}.{octet_list[1]}.{brod}.255"
                min_host = f"{octet_list[0]}.{octet_list[1]}.{net}.2"
                max_host = f"{octet_list[0]}.{octet_list[1]}.{brod}.254"
            elif a == 3:
                subnet = f"255.255.255.{octet1}"
                wildcard = f"0.0.0.{255-octet1}"
                total_host = (256-octet1)-2
                network = f"{octet_list[0]}.{octet_list[1]}.{octet_list[2]}.{net}"
                broadcast = f"{octet_list[0]}.{octet_list[1]}.{octet_list[2]}.{brod}"
                min_host = f"{octet_list[0]}.{octet_list[1]}.{octet_list[2]}.{net+1}"
                max_host = f"{octet_list[0]}.{octet_list[1]}.{octet_list[2]}.{brod-1}"

            print("
---
-
---
-
---
--\nIP 位址: {}".format(enter_ip))
            print("子網路遮罩: {}".format(mask))
            print("子網路: {}".format(subnet))
            print("萬用字元: {}".format(wildcard))
            print("總主機數: {}".format(total_host))
            print("網路位址: {}".format(network))
            print("廣播位址: {}".format(broadcast))
            print("IP 位址範圍: {} - {}".format(min_host, max_host))
        else:
            print("錯誤:無效的子網路遮罩")
    except:
        print("錯誤:無效的子網路遮罩")
else:
    print("錯誤:無效的IP位址")

程式碼解析:

此指令碼首先檢查輸入的IP位址是否有效,然後要求輸入子網路遮罩。根據子網路遮罩,計算出子網路、萬用字元、總主機數、網路位址、廣播位址以及可用IP位址範圍等引數。