在網路安全與協定分析領域,深入理解網路封包的結構與行為是基礎且關鍵的技能。無論是進行網路除錯、安全稽核或滲透測試,能夠精確地擷取、解析與操控網路封包都至關重要。傳統的網路分析工具如 Wireshark 提供了豐富的視覺化介面與強大的過濾功能,但在自動化分析、批次處理與客製化測試場景中,往往需要更靈活的程式化方案。
Scapy 是一個基於 Python 的互動式封包操作框架,它將網路協定的複雜性抽象化為直觀的 Python 物件模型。這種設計理念讓研究人員與工程師能以程式化的方式建構、解析與傳送任意協定層級的網路封包。相較於傳統的 socket 程式設計,Scapy 提供了更高層次的抽象,開發者不需要處理位元組序與位元操作等細節,就能快速實現複雜的網路協定互動。
在台灣的資安產業與網路工程領域,Scapy 已成為專業人士工具箱中的標準配備。從無線網路安全評估到 IoT 裝置分析,從協定逆向工程到網路攻防演練,Scapy 的應用場景極為廣泛。本文將從實務角度深入探討 Scapy 的核心功能與應用技術,涵蓋多種網路協定的封包分析方法,並透過實際案例展示如何運用 Scapy 解決真實的網路問題。
Scapy 架構原理與核心概念
Scapy 的設計哲學是將網路協定視為可組合的物件層級結構。在 OSI 模型中,網路通訊由多個協定層組成,每一層負責特定的功能。Scapy 將這種層級結構直接映射為 Python 類別的繼承與組合關係,讓開發者能以直觀的方式建構完整的封包堆疊。
一個典型的網路封包包含多個協定層,例如乙太網路框架包裹 IP 封包,IP 封包又包含 TCP 或 UDP 區段。在 Scapy 中,這種結構透過斜線運算子表達。當我們撰寫 Ether()/IP()/TCP() 時,實際上是在建構一個三層的協定堆疊,每一層都是獨立的 Python 物件,擁有該協定定義的所有欄位與方法。
@startuml
!define PLANTUML_FORMAT svg
!theme _none_
skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100
package "Scapy 封包處理架構" {
component "封包建構器\n(Packet Builder)" as builder
component "協定解析器\n(Protocol Parser)" as parser
component "封包傳送器\n(Packet Sender)" as sender
component "封包擷取器\n(Packet Sniffer)" as sniffer
package "協定層物件" {
component "第二層\n(Ethernet/802.11)" as layer2
component "第三層\n(IP/IPv6/ARP)" as layer3
component "第四層\n(TCP/UDP/ICMP)" as layer4
component "應用層\n(DNS/HTTP/TLS)" as layer7
}
database "封包資料\n(Raw Bytes)" as raw
}
[使用者腳本] --> builder : 定義封包結構
builder --> layer2 : 建構各層
builder --> layer3 : 建構各層
builder --> layer4 : 建構各層
builder --> layer7 : 建構各層
layer2 --> raw : 序列化
layer3 --> raw : 序列化
layer4 --> raw : 序列化
layer7 --> raw : 序列化
sender --> raw : 傳送至網路
sniffer --> raw : 擷取網路封包
raw --> parser : 解析封包
parser --> layer2 : 還原物件
parser --> layer3 : 還原物件
parser --> layer4 : 還原物件
note right of builder
支援協定組合
自動欄位計算
檢查碼處理
end note
note bottom of sniffer
即時封包擷取
BPF 過濾支援
多介面監聽
end note
@enduml
Scapy 的另一個核心特性是其互動式命令列介面。研究人員可以在 Python 直譯器中直接操作封包物件,即時查看欄位值、修改內容並傳送到網路。這種即時反饋的開發體驗大幅縮短了測試週期,特別適合協定研究與安全測試場景。當需要快速驗證某個假設或測試特定的封包變異時,不需要撰寫完整的程式碼,只需在互動式介面中執行幾行指令即可。
在自動化場景中,Scapy 可以作為 Python 模組匯入到腳本中。這讓複雜的網路測試流程能夠被封裝為可重複執行的程式,並與其他 Python 生態系統工具整合。例如,可以結合 unittest 框架編寫網路協定的自動化測試,或是與機器學習函式庫結合進行異常流量分析。
無線網路 802.11 封包分析
無線網路的安全評估是資訊安全領域的重要課題。IEEE 802.11 協定族定義了無線區域網路的實體層與資料鏈結層規範,包含認證、加密與數據傳輸等機制。Scapy 提供了完整的 802.11 協定支援,能夠擷取與建構各種類型的無線網路封包。
在分析無線網路流量時,首先需要將無線網路介面卡設定為監控模式。不同於一般的管理模式只能接收發送給本機的封包,監控模式允許網路卡擷取周遭空中傳輸的所有無線訊框,包含管理訊框、控制訊框與資料訊框。這種能力是進行無線網路分析的基礎。
from scapy.all import *
from scapy.layers.dot11 import *
import sys
class WiFiAnalyzer:
"""
無線網路封包分析器
提供 802.11 訊框的擷取、解析與統計功能
"""
def __init__(self, interface="wlan0"):
self.interface = interface
self.ap_list = {}
self.client_list = {}
self.packet_count = 0
def packet_handler(self, packet):
"""
封包處理回呼函式
解析並統計 802.11 訊框資訊
"""
self.packet_count += 1
# 檢查是否為 802.11 訊框
if not packet.haslayer(Dot11):
return
# 處理 Beacon 訊框(AP 廣播)
if packet.haslayer(Dot11Beacon):
self._process_beacon(packet)
# 處理 Probe Request(客戶端搜尋)
elif packet.haslayer(Dot11ProbeReq):
self._process_probe_request(packet)
# 處理 Deauthentication 訊框
elif packet.haslayer(Dot11Deauth):
self._process_deauth(packet)
# 處理資料訊框
elif packet.type == 2: # Data frame
self._process_data(packet)
def _process_beacon(self, packet):
"""處理 Beacon 訊框,發現 AP"""
bssid = packet[Dot11].addr2
try:
# 解析 SSID
ssid = packet[Dot11Elt].info.decode('utf-8', errors='ignore')
except:
ssid = "<Hidden SSID>"
# 解析加密類型
crypto = set()
if packet.haslayer(Dot11EltRSN):
crypto.add("WPA2")
# 從 Dot11Elt 中尋找加密資訊
elt = packet[Dot11Elt]
while isinstance(elt, Dot11Elt):
if elt.ID == 48: # RSN Information
crypto.add("WPA2")
elif elt.ID == 221: # Vendor Specific (WPA)
if b'\x00\x50\xf2\x01\x01\x00' in bytes(elt):
crypto.add("WPA")
elt = elt.payload
if not crypto:
crypto.add("Open")
# 儲存 AP 資訊
if bssid not in self.ap_list:
channel = self._get_channel(packet)
self.ap_list[bssid] = {
'ssid': ssid,
'crypto': crypto,
'channel': channel,
'seen_count': 1
}
print(f"\n[發現 AP] BSSID: {bssid}")
print(f" SSID: {ssid}")
print(f" 加密: {', '.join(crypto)}")
print(f" 頻道: {channel}")
else:
self.ap_list[bssid]['seen_count'] += 1
def _process_probe_request(self, packet):
"""處理 Probe Request,發現客戶端"""
client_mac = packet[Dot11].addr2
try:
ssid = packet[Dot11Elt].info.decode('utf-8', errors='ignore')
except:
ssid = "<Broadcast>"
if client_mac not in self.client_list:
self.client_list[client_mac] = {
'probed_ssids': set([ssid]) if ssid else set(),
'seen_count': 1
}
else:
if ssid:
self.client_list[client_mac]['probed_ssids'].add(ssid)
self.client_list[client_mac]['seen_count'] += 1
def _process_deauth(self, packet):
"""處理 Deauthentication 攻擊偵測"""
src = packet[Dot11].addr2
dst = packet[Dot11].addr1
reason = packet[Dot11Deauth].reason
print(f"\n[警告] 偵測到 Deauth 封包")
print(f" 來源: {src}")
print(f" 目標: {dst}")
print(f" 原因碼: {reason}")
def _process_data(self, packet):
"""處理資料訊框"""
# 檢查是否為加密資料
if packet.FCfield & 0x40: # Protected bit
# 識別加密類型
if packet.haslayer(Dot11CCMP):
crypto = "CCMP (AES)"
elif packet.haslayer(Dot11TKIP):
crypto = "TKIP"
else:
crypto = "Unknown"
def _get_channel(self, packet):
"""從 Beacon 訊框中提取頻道資訊"""
try:
elt = packet[Dot11Elt]
while isinstance(elt, Dot11Elt):
if elt.ID == 3: # DS Parameter Set
return elt.info[0]
elt = elt.payload
except:
pass
return "Unknown"
def start_sniffing(self, count=0, timeout=None):
"""
開始封包擷取
參數:
count: 擷取封包數量限制(0 表示無限制)
timeout: 擷取時間限制(秒)
"""
print(f"開始在 {self.interface} 介面上擷取無線網路封包...")
print("按 Ctrl+C 停止擷取\n")
try:
sniff(
iface=self.interface,
prn=self.packet_handler,
count=count,
timeout=timeout,
store=False
)
except KeyboardInterrupt:
print("\n\n擷取已停止")
self.print_summary()
except Exception as e:
print(f"錯誤: {e}")
def print_summary(self):
"""輸出統計摘要"""
print("\n" + "="*60)
print("擷取統計摘要")
print("="*60)
print(f"總封包數: {self.packet_count}")
print(f"發現 AP 數量: {len(self.ap_list)}")
print(f"發現客戶端數量: {len(self.client_list)}")
if self.ap_list:
print("\n偵測到的接入點:")
print("-" * 60)
for bssid, info in self.ap_list.items():
print(f"BSSID: {bssid}")
print(f" SSID: {info['ssid']}")
print(f" 加密: {', '.join(info['crypto'])}")
print(f" 頻道: {info['channel']}")
print(f" 出現次數: {info['seen_count']}")
print()
# 使用範例
if __name__ == "__main__":
if len(sys.argv) > 1:
interface = sys.argv[1]
else:
interface = "wlan0"
analyzer = WiFiAnalyzer(interface)
analyzer.start_sniffing(timeout=60)
這個完整的無線網路分析器展示了 Scapy 在實務應用中的強大能力。它不僅能擷取封包,還能解析協定細節、統計網路狀態,並即時輸出有用的資訊。在實際的安全評估中,這類工具能幫助研究人員快速了解目標環境的無線網路拓撲與安全配置。
Bluetooth LE 封包分析與 IoT 安全
藍牙低功耗技術在物聯網裝置中的普及帶來了新的安全挑戰。BLE 的設計目標是以極低的功耗實現短距離無線通訊,這使其成為穿戴式裝置、智慧家居與工業感測器的首選技術。然而,BLE 的複雜協定堆疊也為攻擊者提供了多個潛在的攻擊面。
Scapy 支援 BLE 協定的多個層級,從 HCI 命令介面到 ATT/GATT 服務層。HCI 是主機控制器介面,定義了應用處理器與藍牙控制器之間的通訊協定。透過 Scapy 操作 HCI 命令,研究人員能夠精確控制 BLE 裝置的行為,包含掃描、連線建立、服務發現與數據傳輸。
from scapy.all import *
from scapy.layers.bluetooth import *
from scapy.layers.bluetooth4LE import *
import time
class BLEAnalyzer:
"""
Bluetooth LE 封包分析器
提供 BLE 裝置掃描、連線與服務分析功能
"""
def __init__(self, interface="hci0"):
self.interface = interface
self.devices = {}
def scan_devices(self, duration=10):
"""
掃描周遭的 BLE 裝置
參數:
duration: 掃描時間(秒)
"""
print(f"開始掃描 BLE 裝置({duration} 秒)...")
def process_packet(packet):
if packet.haslayer(BTLE_ADV):
addr = packet.addr
# 解析廣告資料
adv_data = {}
if packet.haslayer(BTLE_ADV_IND):
adv_data['type'] = 'ADV_IND'
elif packet.haslayer(BTLE_ADV_DIRECT_IND):
adv_data['type'] = 'ADV_DIRECT_IND'
elif packet.haslayer(BTLE_ADV_NONCONN_IND):
adv_data['type'] = 'ADV_NONCONN_IND'
elif packet.haslayer(BTLE_ADV_SCAN_IND):
adv_data['type'] = 'ADV_SCAN_IND'
# 解析 EIR 資料(Extended Inquiry Response)
if packet.haslayer(EIR_Hdr):
eir = packet[EIR_Hdr]
while eir:
if isinstance(eir, EIR_CompleteLocalName):
adv_data['name'] = eir.local_name.decode('utf-8', errors='ignore')
elif isinstance(eir, EIR_TX_Power_Level):
adv_data['tx_power'] = eir.level
elif isinstance(eir, EIR_Manufacturer_Specific_Data):
adv_data['manufacturer'] = eir.payload.hex()
eir = eir.payload if hasattr(eir, 'payload') else None
if addr not in self.devices:
self.devices[addr] = adv_data
print(f"\n發現裝置: {addr}")
if 'name' in adv_data:
print(f" 名稱: {adv_data['name']}")
print(f" 類型: {adv_data.get('type', 'Unknown')}")
try:
sniff(
iface=self.interface,
prn=process_packet,
timeout=duration,
store=False
)
except Exception as e:
print(f"掃描錯誤: {e}")
print(f"\n總共發現 {len(self.devices)} 個裝置")
return self.devices
def send_connection_request(self, target_addr):
"""
發送 BLE 連線請求
參數:
target_addr: 目標裝置 MAC 地址
"""
print(f"嘗試連線到 {target_addr}...")
# 建構 CONNECT_REQ 封包
connect_req = BTLE() / BTLE_ADV(RxAdd=0, TxAdd=0) / \
BTLE_CONNECT_REQ(
InitA="00:11:22:33:44:55",
AdvA=target_addr,
AA=0x8e89bed6,
CRCInit=0x123456,
WinSize=1,
WinOffset=0,
Interval=6,
Latency=0,
Timeout=500,
ChM=0x1FFFFFFFFF,
Hop=5,
SCA=0
)
# 傳送連線請求
sendp(connect_req, iface=self.interface, verbose=False)
print("連線請求已發送")
def analyze_gatt_services(self, packets):
"""
分析 GATT 服務與特性
參數:
packets: 擷取的封包串列
"""
services = {}
for packet in packets:
if packet.haslayer(ATT_Hdr):
att = packet[ATT_Hdr]
# 讀取服務宣告
if isinstance(att, ATT_Read_By_Group_Type_Response):
for item in att.attributes:
uuid = item.uuid
handle = item.handle
services[handle] = {
'uuid': uuid,
'characteristics': []
}
# 讀取特性宣告
elif isinstance(att, ATT_Read_By_Type_Response):
for item in att.attributes:
char_uuid = item.uuid
char_handle = item.handle
# 將特性加入對應的服務
for svc_handle, svc_data in services.items():
if svc_handle <= char_handle:
svc_data['characteristics'].append({
'uuid': char_uuid,
'handle': char_handle
})
return services
# 使用範例
if __name__ == "__main__":
analyzer = BLEAnalyzer()
devices = analyzer.scan_devices(duration=20)
if devices:
print("\n掃描結果:")
for addr, info in devices.items():
print(f"{addr}: {info}")
BLE 安全分析的一個關鍵面向是理解配對與綁定機制。BLE 支援多種安全模式,從無安全防護的模式到需要數字比對或密碼輸入的配對過程。在滲透測試中,研究人員需要評估目標裝置採用的配對方法是否存在弱點,例如使用固定的配對碼或缺乏中間人攻擊防護。
IPv6 協定深度分析
IPv6 作為下一代網際網路協定,提供了遠超 IPv4 的地址空間與改進的協定特性。然而,IPv6 的複雜性也帶來了新的安全挑戰。Scapy 對 IPv6 及其相關協定提供了全面的支援,包含 ICMPv6、鄰居發現協定與各種擴展標頭。
@startuml
!define PLANTUML_FORMAT svg
!theme _none_
skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100
package "IPv6 封包結構" {
component "IPv6 基本標頭\n(40 bytes)" as ipv6_base
component "擴展標頭鏈" as ext_chain
package "擴展標頭類型" {
component "Hop-by-Hop\n選項" as hop
component "目的地選項" as dest
component "路由標頭" as routing
component "分片標頭" as fragment
component "認證標頭\n(AH)" as ah
component "封裝安全\n(ESP)" as esp
}
component "上層協定\n(TCP/UDP/ICMPv6)" as upper
}
ipv6_base --> ext_chain : 下一標頭
ext_chain --> hop : 可選
ext_chain --> dest : 可選
ext_chain --> routing : 可選
ext_chain --> fragment : 可選
ext_chain --> ah : 可選
ext_chain --> esp : 可選
ext_chain --> upper : 最終指向
note right of ipv6_base
固定 40 位元組
簡化的標頭結構
無選項欄位
end note
note bottom of ext_chain
鏈式結構
靈活擴展
順序有規範
end note
@enduml
ICMPv6 在 IPv6 網路中扮演比 ICMP 在 IPv4 更重要的角色。除了錯誤報告與診斷功能外,ICMPv6 還負責鄰居發現、地址自動配置與路由器通告等關鍵功能。理解並能夠操作 ICMPv6 封包對於 IPv6 網路的分析與測試至關重要。
from scapy.all import *
from scapy.layers.inet6 import *
import time
class IPv6Analyzer:
"""
IPv6 網路分析器
提供 IPv6 鄰居發現、路由器探測與擴展標頭分析
"""
def __init__(self, interface="eth0"):
self.interface = interface
self.neighbors = {}
self.routers = {}
def send_neighbor_solicitation(self, target_ip):
"""
發送鄰居請求(類似 IPv4 的 ARP)
參數:
target_ip: 目標 IPv6 地址
"""
print(f"發送鄰居請求到 {target_ip}...")
# 建構 ICMPv6 鄰居請求
solicited_node_multicast = self._get_solicited_node_multicast(target_ip)
packet = IPv6(dst=solicited_node_multicast) / \
ICMPv6ND_NS(tgt=target_ip) / \
ICMPv6NDOptSrcLLAddr(lladdr=get_if_hwaddr(self.interface))
# 發送並等待回應
response = sr1(packet, timeout=2, verbose=False, iface=self.interface)
if response and response.haslayer(ICMPv6ND_NA):
mac_addr = response[ICMPv6NDOptDstLLAddr].lladdr
print(f"收到鄰居通告: {target_ip} -> {mac_addr}")
return mac_addr
else:
print("未收到回應")
return None
def _get_solicited_node_multicast(self, ipv6_addr):
"""計算請求節點多播地址"""
# 取 IPv6 地址的最後 24 位元
addr_parts = ipv6_addr.split(':')
last_24bits = addr_parts[-1][-6:]
return f"ff02::1:ff{last_24bits[0:2]}:{last_24bits[2:6]}"
def send_router_solicitation(self):
"""
發送路由器請求,發現網路中的 IPv6 路由器
"""
print("發送路由器請求...")
packet = IPv6(dst="ff02::2") / \
ICMPv6ND_RS() / \
ICMPv6NDOptSrcLLAddr(lladdr=get_if_hwaddr(self.interface))
# 發送並監聽路由器通告
responses = sr(packet, timeout=3, verbose=False, iface=self.interface, multi=True)
for sent, received in responses[0]:
if received.haslayer(ICMPv6ND_RA):
router_addr = received[IPv6].src
print(f"\n發現路由器: {router_addr}")
# 解析路由器通告選項
ra = received[ICMPv6ND_RA]
print(f" Hop Limit: {ra.chlim}")
print(f" Managed: {bool(ra.M)}")
print(f" Other: {bool(ra.O)}")
print(f" Router Lifetime: {ra.routerlifetime}s")
# 解析選項
opt = received[ICMPv6NDOptPrefixInfo] if received.haslayer(ICMPv6NDOptPrefixInfo) else None
if opt:
print(f" 前綴: {opt.prefix}/{opt.prefixlen}")
print(f" 有效時間: {opt.validlifetime}s")
def test_extension_headers(self, target):
"""
測試 IPv6 擴展標頭處理
參數:
target: 目標 IPv6 地址
"""
print(f"\n測試擴展標頭處理: {target}")
# 測試 1: Hop-by-Hop 選項
print("\n1. 測試 Hop-by-Hop 選項標頭...")
pkt1 = IPv6(dst=target) / \
IPv6ExtHdrHopByHop(options=[PadN(optdata=b"\x00"*4)]) / \
ICMPv6EchoRequest()
resp1 = sr1(pkt1, timeout=2, verbose=False)
if resp1:
print(" 回應: 支援 Hop-by-Hop 選項")
else:
print(" 無回應: 可能被過濾")
# 測試 2: 目的地選項
print("\n2. 測試目的地選項標頭...")
pkt2 = IPv6(dst=target) / \
IPv6ExtHdrDestOpt(options=[PadN(optdata=b"\x00"*4)]) / \
ICMPv6EchoRequest()
resp2 = sr1(pkt2, timeout=2, verbose=False)
if resp2:
print(" 回應: 支援目的地選項")
else:
print(" 無回應: 可能被過濾")
# 測試 3: 路由標頭
print("\n3. 測試路由標頭...")
pkt3 = IPv6(dst=target) / \
IPv6ExtHdrRouting(addresses=["2001:db8::1"]) / \
ICMPv6EchoRequest()
resp3 = sr1(pkt3, timeout=2, verbose=False)
if resp3:
print(" 回應: 支援路由標頭")
else:
print(" 無回應: 可能被過濾或禁用")
# 測試 4: 分片標頭
print("\n4. 測試分片標頭...")
pkt4 = IPv6(dst=target) / \
IPv6ExtHdrFragment(offset=0, m=0, id=12345) / \
ICMPv6EchoRequest()
resp4 = sr1(pkt4, timeout=2, verbose=False)
if resp4:
print(" 回應: 支援分片")
else:
print(" 無回應: 可能不支援或被過濾")
def demonstrate_fragmentation(self, target):
"""
展示 IPv6 分片機制
參數:
target: 目標 IPv6 地址
"""
print(f"\n展示 IPv6 分片: {target}")
# 建立大型有效載荷
payload = b"A" * 2000
# 計算分片
mtu = 1280 # IPv6 最小 MTU
ipv6_header_size = 40
fragment_header_size = 8
max_payload_per_fragment = mtu - ipv6_header_size - fragment_header_size
fragments = []
offset = 0
frag_id = 54321
while offset < len(payload):
chunk = payload[offset:offset + max_payload_per_fragment]
more_frags = 1 if (offset + len(chunk)) < len(payload) else 0
frag = IPv6(dst=target) / \
IPv6ExtHdrFragment(
offset=offset // 8,
m=more_frags,
id=frag_id
) / \
Raw(load=chunk)
fragments.append(frag)
offset += len(chunk)
print(f" 建立分片 {len(fragments)}: "
f"偏移={offset//8}, 更多分片={more_frags}, "
f"大小={len(chunk)}")
# 發送所有分片
print(f"\n發送 {len(fragments)} 個分片...")
for frag in fragments:
send(frag, verbose=False, iface=self.interface)
time.sleep(0.01)
print("分片發送完成")
# 使用範例
if __name__ == "__main__":
analyzer = IPv6Analyzer()
# 鄰居發現
target = "fe80::1"
analyzer.send_neighbor_solicitation(target)
# 路由器發現
analyzer.send_router_solicitation()
# 擴展標頭測試
analyzer.test_extension_headers("2001:db8::1")
# 分片展示
analyzer.demonstrate_fragmentation("2001:db8::1")
IPv6 擴展標頭的鏈式結構提供了極大的靈活性,但也帶來了安全隱憂。攻擊者可以利用過度複雜的擴展標頭鏈來繞過安全設備的檢測,或是觸發實作缺陷導致拒絕服務。在安全評估中,測試目標系統對各種擴展標頭組合的處理能力是重要的環節。
協定安全測試與封包操控
Scapy 的強大之處不僅在於分析現有流量,更在於能夠建構任意的封包進行主動測試。這種能力讓研究人員能夠測試協定實作的健壯性,驗證安全機制的有效性,或是模擬各種異常情況。
在進行安全測試時,常見的技術包含模糊測試、協定異常注入與中間人攻擊模擬。模糊測試透過產生大量隨機或半隨機的封包變異,尋找可能導致系統崩潰或異常行為的輸入。Scapy 的 fuzz 函式能自動對協定欄位進行隨機變異,大幅簡化了模糊測試工具的開發。
from scapy.all import *
import random
class ProtocolFuzzer:
"""
協定模糊測試工具
對目標協定進行自動化安全測試
"""
def __init__(self, target, port=80):
self.target = target
self.port = port
self.responses = []
def fuzz_tcp_options(self, count=100):
"""
模糊測試 TCP 選項
參數:
count: 測試封包數量
"""
print(f"對 {self.target}:{self.port} 進行 TCP 選項模糊測試...")
for i in range(count):
# 建立基本 TCP SYN 封包
ip = IP(dst=self.target)
tcp = TCP(dport=self.port, flags="S")
# 隨機 TCP 選項組合
options = []
# MSS 選項
if random.choice([True, False]):
mss = random.randint(500, 65535)
options.append(("MSS", mss))
# Window Scale
if random.choice([True, False]):
wscale = random.randint(0, 14)
options.append(("WScale", wscale))
# SACK Permitted
if random.choice([True, False]):
options.append(("SAckOK", ""))
# Timestamp
if random.choice([True, False]):
ts_val = random.randint(0, 2**32 - 1)
ts_ecr = random.randint(0, 2**32 - 1)
options.append(("Timestamp", (ts_val, ts_ecr)))
# 應用選項
if options:
tcp.options = options
# 發送並記錄回應
packet = ip / tcp
response = sr1(packet, timeout=1, verbose=False)
if response:
self.responses.append({
'test_num': i + 1,
'options': options,
'response': response.summary()
})
# 檢查異常回應
if response.haslayer(TCP):
tcp_resp = response[TCP]
if tcp_resp.flags & 0x04: # RST flag
print(f" 測試 {i+1}: 收到 RST(可能的異常)")
elif response.haslayer(ICMP):
print(f" 測試 {i+1}: 收到 ICMP 錯誤")
if (i + 1) % 10 == 0:
print(f" 已完成 {i+1}/{count} 測試")
print(f"\n模糊測試完成,收到 {len(self.responses)} 個回應")
def test_ip_fragmentation(self):
"""
測試 IP 分片處理
"""
print(f"\n測試 IP 分片處理: {self.target}")
# 測試 1: 正常分片
print("\n1. 正常分片測試...")
payload = b"X" * 1500
frags = fragment(IP(dst=self.target)/ICMP()/payload)
for i, frag in enumerate(frags):
print(f" 發送分片 {i+1}/{len(frags)}")
send(frag, verbose=False)
time.sleep(0.1)
# 測試 2: 重疊分片
print("\n2. 重疊分片測試...")
frag1 = IP(dst=self.target, flags="MF", frag=0)/Raw(load=b"A"*100)
frag2 = IP(dst=self.target, flags=0, frag=10)/Raw(load=b"B"*50)
send(frag1, verbose=False)
time.sleep(0.1)
send(frag2, verbose=False)
# 測試 3: 分片炸彈(Tiny Fragment)
print("\n3. Tiny Fragment 測試...")
tiny_frag = IP(dst=self.target, flags="MF", frag=0)/Raw(load=b"X"*8)
send(tiny_frag, verbose=False)
print("分片測試完成")
def test_malformed_packets(self):
"""
測試畸形封包處理
"""
print(f"\n測試畸形封包: {self.target}")
# 測試 1: 無效的 TCP 旗標組合
print("\n1. 無效 TCP 旗標...")
invalid_flags = [
"SF", # SYN+FIN
"SR", # SYN+RST
"FPU", # FIN+PUSH+URG
"FSRPAUEC" # 所有旗標
]
for flags in invalid_flags:
pkt = IP(dst=self.target)/TCP(dport=self.port, flags=flags)
resp = sr1(pkt, timeout=1, verbose=False)
if resp:
print(f" 旗標 {flags}: {resp.summary()}")
else:
print(f" 旗標 {flags}: 無回應")
# 測試 2: 無效的序號
print("\n2. 無效 TCP 序號...")
pkt = IP(dst=self.target)/TCP(dport=self.port, flags="A", seq=0, ack=0)
resp = sr1(pkt, timeout=1, verbose=False)
if resp:
print(f" 回應: {resp.summary()}")
# 測試 3: 無效的 IP 選項
print("\n3. 無效 IP 選項...")
pkt = IP(dst=self.target, options=[IPOption(b"\x00\x00\x00\x00")])/ICMP()
resp = sr1(pkt, timeout=1, verbose=False)
if resp:
print(f" 回應: {resp.summary()}")
# 使用範例
if __name__ == "__main__":
fuzzer = ProtocolFuzzer("192.168.1.1", 80)
# TCP 選項模糊測試
fuzzer.fuzz_tcp_options(count=50)
# 分片測試
fuzzer.test_ip_fragmentation()
# 畸形封包測試
fuzzer.test_malformed_packets()
這類安全測試工具在實務中具有重要價值。它們能幫助開發團隊在產品發布前發現潛在的安全弱點,或是協助資安團隊評估既有系統的安全性。然而,使用這些工具時必須謹慎,確保只在授權的環境中進行測試,避免對生產系統或第三方網路造成影響。
總結與實務建議
Scapy 作為網路封包分析與操控的瑞士刀,為研究人員與工程師提供了無與倫比的靈活性。從無線網路安全評估到 IPv6 協定測試,從 IoT 裝置分析到協定模糊測試,Scapy 的應用場景極為廣泛。本文探討的技術涵蓋了多個網路層級與協定,展示了 Scapy 在實務應用中的強大能力。
在台灣的資安產業環境中,掌握 Scapy 等專業工具已成為安全專家的必備技能。隨著網路技術的持續演進,5G、IoT 與雲原生應用帶來了新的安全挑戰,也為 Scapy 的應用開啟了新的可能性。持續學習與實踐,深入理解網路協定的細節,並熟練運用 Scapy 等工具,將有助於建構更安全可靠的網路環境。
在使用 Scapy 進行任何測試或分析時,務必遵守相關法律法規與道德準則。僅在獲得明確授權的環境中進行安全測試,尊重他人的網路與系統,是每位專業人士應該堅守的原則。透過負責任的使用與持續的技術精進,我們能為網路安全領域貢獻更多價值。