返回文章列表

Python 建立多功能網路代理程式與伺服器架設

本文介紹如何使用 Python 從零開始建立一個多功能的網路代理程式,並探討代理伺服器的架設細節。文中涵蓋了開發環境設定、HEX 轉儲函式、資料接收函式、請求與回應處理、主代理處理邏輯以及伺服器主迴圈的程式碼範例與說明。此外,還探討了代理伺服器在網路安全和監控方面的應用,以及未來發展趨勢。

網路安全 Web 開發

Python 的 socket 模組提供強大的網路程式設計能力,讓開發者能輕易建立網路應用程式。本文的網路代理程式利用此模組,實作了資料包的攔截、分析與修改,為網路流量監控和安全稽核提供了基礎。代理程式透過建立本地監聽埠,將客戶端請求轉發至遠端伺服器,並將伺服器回應傳回給客戶端,同時可對資料流進行分析和修改。此機制讓開發者能深入瞭解網路通訊流程,並根據需求調整代理行為。

import socket
import sys
import threading

HEX_FILTER = ''.join(
    [(len(repr(chr(i))) == 3) and chr(i) or '.' for i in range(256)])

def hexdump(src, length=16, show=True):
    if isinstance(src, bytes):
        src = src.decode()

    results = list()
    for i in range(0, len(src), length):
        word = str(src[i:i+length])
        printable = word.translate(HEX_FILTER)
        hexa = ' '.join([f'{ord(c):02X}' for c in word])
        hexwidth = length*3
        results.append(f'{i:04x}  {hexa:<{hexwidth}}  {printable}')
    if show:
        for line in results:
            print(line)
    else:
        return results

def receive_from(connection):
    buffer = b""
    connection.settimeout(5)
    try:
        while True:
            data = connection.recv(4096)
            if not data:
                break
            buffer += data
    except:
        pass
    return buffer

def request_handler(buffer):
    return buffer

def response_handler(buffer):
    return buffer

def proxy_handler(client_socket, remote_host, remote_port, receive_first):
    remote_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    remote_socket.connect((remote_host, remote_port))

    if receive_first:
        remote_buffer = receive_from(remote_socket)
        hexdump(remote_buffer)
        remote_buffer = response_handler(remote_buffer)

        if len(remote_buffer):
            print(f"[<==] Sending {len(remote_buffer)} bytes to localhost.")
            client_socket.send(remote_buffer)

    while True:
        local_buffer = receive_from(client_socket)

        if len(local_buffer):
            print(f"[==>] Received {len(local_buffer)} bytes from localhost.")
            hexdump(local_buffer)

            local_buffer = request_handler(local_buffer)
            remote_socket.send(local_buffer)
            print("[==>] Sent to remote.")

        remote_buffer = receive_from(remote_socket)

        if len(remote_buffer):
            print(f"[<==] Received {len(remote_buffer)} bytes from remote.")
            hexdump(remote_buffer)

            remote_buffer = response_handler(remote_buffer)
            client_socket.send(remote_buffer)
            print("[<==] Sent to localhost.")

        if not len(local_buffer) or not len(remote_buffer):
            client_socket.close()
            remote_socket.close()
            print("[*] No more data. Closing connections.")
            break

def server_loop(local_host, local_port, remote_host, remote_port, receive_first):
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    try:
        server.bind((local_host, local_port))
    except Exception as e:
        print(f'problem on bind: {e}')
        print(f"[!!] Failed to listen on {local_host}:{local_port}")
        print("[!!] Check for other listening sockets or correct permissions.")
        sys.exit(0)

    print(f"[*] Listening on {local_host}:{local_port}")
    server.listen(5)

    while True:
        client_socket, addr = server.accept()
        print(f"> Received incoming connection from {addr[0]}:{addr[1]}")

        proxy_thread = threading.Thread(
            target=proxy_handler,
            args=(client_socket, remote_host, remote_port, receive_first))
        proxy_thread.start()

def main():
    if len(sys.argv[1:]) != 5:
        print("Usage: ./proxy.py [localhost] [localport] [remotehost] [remoteport] [receive_first]")
        print("Example: ./proxy.py 127.0.0.1 9000 10.12.132.1 9000 True")
        sys.exit(0)

    local_host = sys.argv[1]
    local_port = int(sys.argv[2])
    remote_host = sys.argv[3]
    remote_port = int(sys.argv[4])

    receive_first = sys.argv[5]

    if "True" in receive_first:
        receive_first = True
    else:
        receive_first = False

    server_loop(local_host, local_port, remote_host, remote_port, receive_first)

if __name__ == '__main__':
    main()

從零開始:建立多功能的網路代理程式

在當今數位化的世界,網路安全與監控是不可或缺的技能。玄貓今天要與各位分享的是如何從零開始建立一個多功能的網路代理程式。這個代理程式不僅可以幫助我們監控網路流量,還可以在需要時修改流經的資料包。

開發環境設定

首先,我們需要一些基本的 Python 函式庫來進行開發。這些函式庫包括 socketsys。以下是一些必要的進口宣告:

import socket
import sys

HEX 轉儲函式

接下來,我們要實作一個 hexdump 函式,這個函式可以將原始的二進位制資料轉換成可讀的十六進位和 ASCII 字元表示。這對於理解未知的協定或找到明文中的使用者憑證非常有用。

原始碼

src = src.decode()
results = list()
for i in range(0, len(src), length):
    word = str(src[i:i+length])
    printable = word.translate(HEX_FILTER)
    hexa = ' '.join([f'{ord(c):02X}' for c in word])
    hexwidth = length*3
    results.append(f'{i:04x} {hexa:<{hexwidth}} {printable}')
if show:
    for line in results:
        print(line)
else:
    return results

內容解密:

在這段程式碼中,我們首先將原始輸入解碼為字串。然後,我們定義了一個 results 列表來儲存最終的轉換結果。

接著,我們使用一個迴圈來處理每一段長度為 length 的字串片段。對於每一段字串,我們進行以下操作:

  1. 提取字串片段:從原始輸入中提取指定長度的字串片段。
  2. 轉換為可列印字元:使用 translate 函式將不可列印的字元轉換為點號(.)。
  3. 生成十六進位表示:將每個字元轉換為對應的十六進位值。
  4. 格式化輸出:將索引、十六進位值和可列印字元組合成一個格式化的字串,並將其新增到 results 列表中。

如果 show 引數為真,則直接列印結果;否則,傳回結果列表。

接收資料函式

接下來,我們需要一個函式來接收來自連線的資料。這個函式會從指定的 socket 讀取資料並將其儲存到緩衝區中。

原始碼

def receive_from(connection):
    buffer = b""
    connection.settimeout(5)
    try:
        while True:
            data = connection.recv(4096)
            if not data:
                break
            buffer += data
    except Exception as e:
        pass
    return buffer

內容解密:

這段程式碼定義了一個 receive_from 函式,用於從指定的 socket 連線中接收資料。具體步驟如下:

  1. 初始化緩衝區:建立一個空的 byte 字串 buffer 用於儲存接收到的資料。
  2. 設定超時:設定 socket 的超時時間為 5 秒。這在處理跨國或不穩定網路時可能需要調整。
  3. 接收資料:進入一個無限迴圈,每次從 socket 接收最多 4096 個 byte 的資料,並將其新增到緩衝區中。如果沒有接收到資料或超時,則離開迴圈。
  4. 處理異常:捕捉並忽略可能發生的異常。
  5. 傳回資料:傳回累積的緩衝區資料。

請求與回應處理

在某些情況下,我們可能需要修改請求或回應包裡面的內容。因此,我們定義了兩個函式:request_handlerresponse_handler

原始碼

def request_handler(buffer):
    # perform packet modifications
    return buffer

def response_handler(buffer):
    # perform packet modifications
    return buffer

內容解密:

這兩個函式目前僅是佔位符號,實際上可以在其中進行各種操作,例如修改包裡面的內容、進行模糊測試(fuzzing)、測試身份驗證問題等。具體實作方式可以根據需求進行擴充套件。

主代理處理邏輯

最終,我們要實作主代理處理邏輯。這個函式負責連線遠端主機、接收和傳送資料、以及在必要時修改資料包。

原始碼

def proxy_handler(client_socket, remote_host, remote_port, receive_first):
    remote_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    remote_socket.connect((remote_host, remote_port))
    if receive_first:
        remote_buffer = receive_from(remote_socket)
        hexdump(remote_buffer)
        remote_buffer = response_handler(remote_buffer)
        if len(remote_buffer):
            print("[<==] Sending %d bytes to localhost." % len(remote_buffer))
            client_socket.send(remote_buffer)
    while True:
        local_buffer = receive_from(client_socket)
        if len(local_buffer):
            line = "[==>] Received %d bytes from localhost." % len(local_buffer)
            print(line)
            hexdump(local_buffer)
            local_buffer = request_handler(local_buffer)
            remote_socket.send(local_buffer)
            print("[==>] Sent to remote.")
        remote_buffer = receive_from(remote_socket)
        if len(remote_buffer):
            print("[<==] Received %d bytes from remote." % len(remote_buffer))
            hexdump(remote_buffer)
            remote_buffer = response_handler(remote_buffer)
            client_socket.send(remote_buffer)
            print("[<==] Sent to localhost.")
        if not len(local_buffer) or not len(remote_buffer):
            client_socket.close()
            remote_socket.close()
            print("[*] No more data. Closing connections.")
            break

內容解密:

這段程式碼定義了 proxy_handler 函式,具體步驟如下:

  1. 連線遠端主機:建立一個新的 socket 並連線到指定的遠端主機和埠。
  2. 檢查是否先接收遠端資料:如果 receive_first 為真,則先從遠端 socket 接收資料並進行處理。這對於某些伺服器(例如 FTP)非常重要。
  3. 主迴圈:進入一個無限迴圈,持續接收本地和遠端資料並進行處理:
    • 接收本地 socket 的資料並進行修改。
    • 將修改後的資料傳送到遠端 socket。
    • 接收遠端 socket 的資料並進行修改。
    • 將修改後的資料傳送到本地 socket。
  4. 關閉連線:如果沒有更多資料可供傳輸,則關閉兩個 socket 連線並離開迴圈。

未來趨勢與應用評估

隨著網路安全需求不斷增加,代理程式在監控和分析網路流量方面具有廣泛應用前景。未來可以考慮增加更多功能,例如:

  • 支援更多協定(如 HTTPS、WebSocket)的解析和監控。
  • 提供更強大的資料包修改功能。
  • 整合機器學習模型以自動檢測異常流量。

這些功能都可以幫助提升代理程式在實際應用中的效能和靈活性。

基本網路工具與代理伺服器架設

代理伺服器的基本運作原理

代理伺服器的核心功能在於中繼客戶端與伺服器之間的通訊,這樣可以攔截並檢查資料流。當客戶端連線到代理伺服器時,代理伺服器會轉發資料到遠端伺服器,並將回應轉發回客戶端。這個過程需要細緻的管理,以確保資料能夠正確地在兩端之間傳遞。

以下是一個簡單的代理伺服器範例,它使用 Python 的 socket 套件來建立代理連線。這個範例展示瞭如何建立一個簡單的代理伺服器來攔截並處理 FTP 流量。

import socket
import sys
import threading

def proxy_handler(client_socket, remote_host, remote_port, receive_first):
    # 建立連線到遠端伺服器
    remote_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    remote_socket.connect((remote_host, remote_port))

    # 如果需要,首先從遠端伺服器接收資料
    if receive_first:
        remote_buffer = receive_from(remote_socket)
        hexdump(remote_buffer)

        # 將接收到的資料轉發給客戶端
        client_socket.send(remote_buffer)

    while True:
        # 從本地客戶端接收資料
        local_buffer = receive_from(client_socket)
        if len(local_buffer):
            print("[==>] Sending to remote.")
            hexdump(local_buffer)
            # 將資料轉發給遠端伺服器
            remote_socket.send(local_buffer)
            # 從遠端伺服器接收回應
            remote_buffer = receive_from(remote_socket)
            if len(remote_buffer):
                print("[<==] Received from remote.")
                hexdump(remote_buffer)
                # 將回應轉發給本地客戶端
                client_socket.send(remote_buffer)
        else:
            break

    # 清除並關閉所有連線
    client_socket.close()
    remote_socket.close()

def receive_from(connection):
    buffer = b""
    connection.settimeout(2)
    try:
        while True:
            data = connection.recv(4096)
            if not data:
                break
            buffer += data
    except Exception as e:
        pass
    return buffer

def hexdump(src, length=16):
    result = []
    digits = 4 if isinstance(src, bytes) else 2
    for i in range(0, len(src), length):
        s = src[i:i + length]
        hexa = b' '.join("%0*X" % (digits, b) for b in s)
        text = b''.join(chr(b) if 0x20 <= b <= 0x7E else b'.' for b in s)
        result.append(b"%04X   %-*s   %s" % (i, length * (digits + 1), hexa, text))
    print(b'\n'.join(result))

#### 內容解密:
這段程式碼定義了一個簡單的代理伺服器其核心功能是中繼客戶端和遠端伺服器之間的通訊以下是每個主要部分的詳細解釋

1. **proxy_handler 函式**這是代理處理函式負責處理客戶端和遠端伺服器之間的資料傳遞它首先建立與遠端伺服器的連線然後根據需求從遠端伺服器接收資料並轉發給客戶端之後它進入一個迴圈持續從本地客戶端接收資料將其轉發給遠端伺服器並將回應轉發回本地客戶端

2. **receive_from 函式**這個函式用於從指定的連線中接收資料它設定了一個超時時間並迴圈讀取資料直到沒有更多資料可讀取

3. **hexdump 函式**這個函式用於以十六進位制形式顯示二進位制資料它將二進位制資料分成固定長度的塊並以易讀的格式顯示

### 設計與實作細節

在設計這個代理伺服器時我們需要考慮以下幾個關鍵點

1. **連線管理**確保代理伺服器能夠正確地管理多個同時連線這裡使用了 `threading.Thread` 來處理每個新的連線請求
2. **資料轉發**確保資料在兩端之間正確地傳遞這需要考慮網路延遲和錯誤處理
3. **日誌與監控**為了方便除錯和監控我們使用 `hexdump` 函式來顯示每次傳輸的詳細內容

### 測試與實際應用

我們將這個簡單的代理測試在一個 FTP 伺服器上來驗證其功能首先我們啟動這個代理並連線到一個遠端 FTP 伺服器

```shell
sudo python proxy.py 192.168.1.203 21 ftp.sun.ac.za 21 True

啟動後,我們使用 FTP 工具連線到本地代理:

tim@kali:$ ftp 192.168.1.203

透過這種方式我們可以成功地捕捉和檢查 FTP 流量,並驗證代理是否正確工作。

二級標題:代理伺服器架設細節

主要功能:server_loop

此函式負責設定和管理與遠端主機之間的連線。具體實作如下:

def server_loop(local_host, local_port,
remote_host, remote_port, receive_first):
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
server.bind((local_host, local_port))
except Exception as e:
print('problem on bind: %r' % e)
print("[!!] Failed to listen on %s:%d" % (local_host, local_port))
print("[!!] Check for other listening sockets or correct permissions.")
sys.exit(0)
print("[*] Listening on %s:%d" % (local_host, local_port))
server.listen(5)
while True:
client_socket, addr = server.accept()
line = "> Received incoming connection from %s:%d" % (addr[0], addr[1])
print(line)
proxy_thread = threading.Thread(
target=proxy_handler,
args=(client_socket, remote_host,
remote_port, receive_first))
proxy_thread.start()

內容解密:

這段程式碼實作了 server_loop 函式,這是整個代理系統中最核心的一部分。

  1. socket 建立與繫結:首先建立一個 TCP/IP socket(socket.AF_INET 和 socket.SOCK_STREAM),然後繫結到本地主機和埠。
  2. 錯誤處理:如果繫結失敗(例如埠已被佔用),則會列印錯誤資訊並離開程式。
  3. 監聽與接受連線:設定該 socket 停止監聽新連線(最大佇列長度為5),然後進入一個無限迴圈等待新連線請求。當有一個新連線請求時,accept 函式會傳回一個新的通訊端物件和客戶端地址。
  4. 執行緒處理:為每個新連線建立一個新執行緒來處理解決方案要求對應業務邏輯函式 proxy_handler。執行緒執行後自動結束,不會干擾其他執行緒。

主要功能:main

main 函式負責處理解析命令列引數並啟動 server_loop

def main():
if len(sys.argv[1:]) != 5:
print("Usage: ./proxy.py [localhost] [localport]", end='')
print("[remotehost] [remoteport] [receive_first]")
print("Example: ./proxy.py 127.0.0.1 9000 10.12.132.1 9000 True")
sys.exit(0)

local_host = sys.argv[1]
local_port = int(sys.argv[2])
remote_host = sys.argv[3]
remote_port = int(sys.argv[4])
receive_first = sys.argv[5]
if "True" in receive_first:
receive_first = True
else:
receive_first = False

server_loop(local_host, local_port,
remote_host, remote_port, receive_first)

if __name__ == '__main__':
main()

內容解密:

  • 引數解析:首先檢查命令列引數是否正確,如果引數不足或錯誤則列印使用說明並離開。
  • 變數指定:從命令列引數中提取本地主機、本地埠、遠端主機、遠端埠以及是否首先從遠端主機接受資料。
  • 啟動server_loop:呼叫 server_loop 函式開始監聽和處理解析需要啟動時。

其他考慮事項

  • 效能與可擴充套件性:為了支援高併發情況和大規模資料傳輸,需要考慮對現有系統進行擴充套件和改進。
  • 安全性:確保資料在傳輸過程中的安全性是至關重要的。可以考慮加密傳輸或其他安全措施來防止資料洩露或被篡改。
  • 日誌與監控:為便於除錯和維護,應該對關鍵操作進行日誌記錄並監控系統狀態。

此圖示展示了代理伺序列圖:

@startuml
note
  無法自動轉換的 Plantuml 圖表
  請手動檢查和調整
@enduml

此圖示展示了基本網路工具與最佳化替換機台之間的互動流程。我們可以清晰看到各階段操作流程。