Kafka 安全性維護包含例行安全更新和加密機制設定。安全更新包含憑證輪換、修補程式套用和安全協定升級,多數透過滾動更新逐步重啟 Broker 完成。部分更新,例如 SSL 金鑰儲存更新,可透過動態組態更新完成,無需重啟。在新增安全協定時,可新增監聽器並引導客戶端應用程式逐步切換,確保服務不中斷。例如,從 PLAINTEXT 遷移至 SASL_SSL 時,可先新增 SASL_SSL 監聽器,待客戶端應用程式完成切換後再移除舊監聽器。SASL 機制變更也採用滾動更新,例如從 PLAIN 切換至 SCRAM-SHA-256,可先啟用兩種機制,待客戶端更新後再移除 PLAIN。Kafka 使用 SSL 和 SASL_SSL 協定建立 TLS 加密通道,保護資料傳輸安全。除了傳輸加密,靜態資料加密也至關重要,可使用全磁碟加密或卷加密。對於高度敏感資料,端對端加密是必要的,透過在客戶端嵌入加密提供者,確保資料全程加密。訊息加密通常使用對稱加密演算法,例如 AES,金鑰儲存在金鑰管理系統(KMS)。Broker 無需存取金鑰,確保雲端環境安全。加密引數可儲存在訊息標頭或負載中,標頭也支援數位簽章驗證訊息完整性。
Kafka 安全更新與加密機制
Kafka 佈署需要定期進行安全更新,包括輪換憑證、應用安全修補程式以及更新至最新的安全協定。許多維護工作透過滾動更新完成,逐步關閉並重啟 Broker,以套用更新的組態。部分任務,如更新 SSL 金鑰儲存和信任儲存,可透過動態組態更新完成,無需重啟 Broker。
無停機安全更新
在現有佈署中新增安全協定時,可以在保留舊監聽器的同時新增新的監聽器,確保客戶端應用程式能夠在更新期間繼續使用舊監聽器。例如,從 PLAINTEXT 切換至 SASL_SSL 的步驟如下:
- 使用 Kafka 組態工具為每個 Broker 新增一個新監聽器,並更新
listeners和advertised.listeners以包含新舊監聽器。 - 修改所有客戶端應用程式以使用新的 SASL_SSL 監聽器。
- 如果需要更新 Broker 間的通訊以使用新的 SASL_SSL 監聽器,則執行 Broker 的滾動更新,並設定
inter.broker.listener.name。 - 使用組態工具從
listeners和advertised.listeners中移除舊監聽器,並刪除任何未使用的舊監聽器組態選項。
SASL 機制變更
可以在相同的監聽器埠上使用滾動更新來新增或移除 SASL 機制,而無需停機。例如,將機制從 PLAIN 切換至 SCRAM-SHA-256 的步驟如下:
- 使用 Kafka 組態工具將所有現有使用者新增至 SCRAM 儲存。
- 設定
sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,並為監聽器組態listener.name.<listener-name>.scram-sha-256.sasl.jaas.config,然後執行 Broker 的滾動更新。 - 修改所有客戶端應用程式以使用
sasl.mechanism=SCRAM-SHA-256,並更新sasl.jaas.config以使用 SCRAM。 - 如果監聽器用於 Broker 間通訊,則執行 Broker 的滾動更新以設定
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256。 - 再次執行 Broker 的滾動更新以移除 PLAIN 機制,設定
sasl.enabled.mechanisms=SCRAM-SHA-256,並移除與 PLAIN 相關的組態選項。
加密
加密用於保護資料的隱私和完整性。Kafka 使用 SSL 和 SASL_SSL 安全協定的監聽器透過 TLS 提供加密通道,保護透過不安全網路傳輸的資料。可以限制 TLS 加密套件以加強安全性,並遵守聯邦資訊處理標準(FIPS)等安全要求。
資料靜態加密
除了傳輸層加密外,還需要採取額外措施來保護靜態資料,以確保即使物理存取磁碟,使用者也無法檢索敏感資料。可以使用全磁碟加密或卷加密來加密物理儲存。
端對端加密
在處理高度敏感資料或個人識別資訊(PII)的佈署中,需要額外的措施來保護資料隱私。為了符合監管要求,特別是在雲端佈署中,必須保證機密資料無法被平台管理員或雲端提供者存取。可以在 Kafka 客戶端中插入自訂加密提供者,以實作端對端加密,確保整個資料流程都是加密的。
實作端對端加密
序列化器和反序列化器可以與加密函式庫整合,在序列化期間對訊息進行加密,並在反序列化期間進行解密。訊息加密通常使用對稱加密演算法(如 AES)進行。儲存在金鑰管理系統(KMS)中的共用加密金鑰使生產者能夠加密訊息,而消費者能夠解密訊息。Broker 不需要存取加密金鑰,也不會看到訊息的未加密內容,因此這種方法在雲端環境中是安全的。解密訊息所需的加密引數可以儲存在訊息標頭或訊息負載中。如果需要支援沒有標頭支援的舊消費者,可以將這些引數儲存在訊息負載中。也可以在訊息標頭中包含數位簽章,以驗證訊息的完整性。
// 示例:使用 AES 對稱加密演算法進行訊息加密和解密
import javax.crypto.Cipher;
import javax.crypto.KeyGenerator;
import javax.crypto.SecretKey;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
public class MessageEncryption {
public static void main(String[] args) throws Exception {
String message = "Sensitive Data";
SecretKey secretKey = generateKey();
String encryptedMessage = encrypt(message, secretKey);
String decryptedMessage = decrypt(encryptedMessage, secretKey);
System.out.println("Original Message: " + message);
System.out.println("Encrypted Message: " + encryptedMessage);
System.out.println("Decrypted Message: " + decryptedMessage);
}
private static SecretKey generateKey() throws Exception {
KeyGenerator keyGen = KeyGenerator.getInstance("AES");
keyGen.init(128);
return keyGen.generateKey();
}
private static String encrypt(String message, SecretKey secretKey) throws Exception {
Cipher cipher = Cipher.getInstance("AES");
cipher.init(Cipher.ENCRYPT_MODE, secretKey);
byte[] encryptedBytes = cipher.doFinal(message.getBytes(StandardCharsets.UTF_8));
return Base64.getEncoder().encodeToString(encryptedBytes);
}
private static String decrypt(String encryptedMessage, SecretKey secretKey) throws Exception {
Cipher cipher = Cipher.getInstance("AES");
cipher.init(Cipher.DECRYPT_MODE, secretKey);
byte[] decodedBytes = Base64.getDecoder().decode(encryptedMessage);
byte[] decryptedBytes = cipher.doFinal(decodedBytes);
return new String(decryptedBytes, StandardCharsets.UTF_8);
}
}
內容解密:
- 上述範例展示瞭如何使用 AES 對稱加密演算法對訊息進行加密和解密。
generateKey方法用於生成 AES 加密金鑰。encrypt方法使用 AES 演算法對輸入訊息進行加密,並將結果以 Base64 編碼字串傳回。decrypt方法對 Base64 編碼的加密訊息進行解碼,使用 AES 演算法解密,並傳回原始訊息。- 在 Kafka 中,可以在序列化器中使用
encrypt方法對訊息進行加密,在反序列化器中使用decrypt方法對訊息進行解密,從而實作端對端加密。
Kafka 安全機制:端對端加密與授權管理
Kafka 的安全機制包含多個層面,其中最關鍵的兩個部分是端對端加密和授權管理。本文將探討這兩大主題,並提供具體的實作建議和技術細節。
端對端加密(End-to-End Encryption)
Kafka 的端對端加密確保了訊息在傳輸過程中的安全性。以下是一個典型的 Kafka 端對端加密流程:
- 生產者(Producer)使用金鑰管理系統(KMS)中的加密金鑰對訊息進行加密。
- 加密後的訊息被傳送到 Kafka Broker,並儲存在分割槽日誌(Partition Logs)中。
- Broker 將加密後的訊息傳送給消費者(Consumer)。
- 消費者使用相同的 KMS 中的加密金鑰對訊息進行解密。
金鑰輪換(Key Rotation)
為了增強安全性,建議定期進行金鑰輪換。這可以限制在金鑰洩露的情況下受影響的訊息數量,並防止暴力破解攻擊。在金鑰輪換期間,消費者需要同時支援新舊金鑰,以確保能夠解密舊的加密訊息。
@startuml
skinparam backgroundColor #FEFEFE
skinparam componentStyle rectangle
title Kafka 安全更新與加密機制實務
package "安全架構" {
package "網路安全" {
component [防火牆] as firewall
component [WAF] as waf
component [DDoS 防護] as ddos
}
package "身份認證" {
component [OAuth 2.0] as oauth
component [JWT Token] as jwt
component [MFA] as mfa
}
package "資料安全" {
component [加密傳輸 TLS] as tls
component [資料加密] as encrypt
component [金鑰管理] as kms
}
package "監控審計" {
component [日誌收集] as log
component [威脅偵測] as threat
component [合規審計] as audit
}
}
firewall --> waf : 過濾流量
waf --> oauth : 驗證身份
oauth --> jwt : 簽發憑證
jwt --> tls : 加密傳輸
tls --> encrypt : 資料保護
log --> threat : 異常分析
threat --> audit : 報告生成
@enduml
此圖示展示了 Kafka 端對端加密的基本流程。
壓縮加密訊息(Compression of Encrypted Messages)
在加密後對訊息進行壓縮不太可能帶來空間上的顯著減少,因此建議在加密前進行壓縮。生產者可以在序列化過程中進行壓縮,或者應用程式可以在傳送訊息前進行壓縮。同時,應在 Kafka 中停用壓縮,以避免不必要的效能開銷。
訊息鍵的處理(Message Key Handling)
在某些情況下,訊息鍵可能需要被加密,以符合相關的法規要求。由於訊息鍵用於分割槽和壓縮,因此在轉換鍵時必須保留其雜湊等價性。一個可行的方法是將原始鍵的雜湊值作為訊息鍵,並將加密後的訊息鍵儲存在訊息內容或標頭中。
授權管理(Authorization)
Kafka 的授權管理決定了使用者對特定資源的操作許可權。Kafka Broker 使用可自定義的授權器(Authorizer)來管理存取控制。
ACL 授權器(AclAuthorizer)
Kafka 內建的 AclAuthorizer 使用存取控制列表(ACLs)來實作細粒度的存取控制。ACLs 儲存在 ZooKeeper 中,並在每個 Broker 中進行快取,以實作高效能的授權查詢。
每個 ACL 繫結包含以下元素:
- 資源型別(Resource Type)
- 模式型別(Pattern Type)
- 資源名稱(Resource Name)
- 操作型別(Operation)
- 許可權型別(Permission Type)
- 主體(Principal)
- 主機(Host)
例如,一個 ACL 可以指定 User:Alice 對 Prefixed Topic:customer 具有 Write 許可權,且來源 IP 地址為 192.168.0.1。
ACL 評估邏輯
AclAuthorizer 根據以下邏輯進行授權:
- 如果存在比對操作的 Deny ACL,則拒絕操作。
- 如果存在比對操作的 Allow ACL,則允許操作。
某些許可權具有隱含關係,例如,擁有 Read 或 Write 許可權隱含擁有 Describe 許可權。
Kafka ACL 設定與自定義授權詳解
Kafka 提供了靈活的存取控制列表(ACL)機制來管理叢集中的資源存取。本文將探討 Kafka ACL 的設定、內建授權機制以及如何進行自定義授權開發。
Kafka ACL 設定基礎
Kafka ACL 主要用於控制不同主體(Principal)對叢集資源的存取許可權。資源型別包括叢集、主題、群組和交易 ID 等。ACL 設定可透過 kafka-acls.sh 工具進行管理。
常用 ACL 操作指令
# 在 ZooKeeper 中直接建立 broker ACL
$ bin/kafka-acls.sh --add --cluster --operation ClusterAction \
--authorizer-properties zookeeper.connect=localhost:2181 \
--allow-principal User:kafka
# 使用 bootstrap-server 建立主題 ACL
$ bin/kafka-acls.sh --bootstrap-server localhost:9092 \
--command-config admin.props --add --topic customerOrders \
--producer --allow-principal User:Alice
# 設定 prefix ACL
$ bin/kafka-acls.sh --bootstrap-server localhost:9092 \
--command-config admin.props --add --resource-pattern-type PREFIXED \
--topic customer --operation Read --allow-principal User:Bob
ACL 許可權型別詳解
| 資源型別 | 操作許可權 | 描述 | 適用場景 | |
–|
–|
|
–| | Cluster | ClusterAction | 叢集內部操作許可權 | broker 間通訊 | | Topic | Write | 生產者寫入許可權 | 需要寫入主題的應用程式 | | Topic | Read | 消費者讀取許可權 | 需要讀取主題的消費者 | | Group | Read | 消費群組操作許可權 | 使用消費群組的消費者 |
自定義授權機制開發
Kafka 允許開發自定義授權器(Authorizer)來實作更複雜的存取控制邏輯。以下是一個限制特定請求只能透過內部監聽器存取的範例:
自定義授權器實作
public class CustomAuthorizer extends AclAuthorizer {
// 定義內部操作型別
private static final Set<Short> internalOps =
Utils.mkSet(ApiKeys.CREATE_ACLS.id, ApiKeys.DELETE_ACLS.id);
// 定義內部監聽器名稱
private static final String internalListener = "INTERNAL";
@Override
public List<AuthorizationResult> authorize(
AuthorizableRequestContext context, List<Action> actions) {
// 檢查請求是否來自內部監聽器
if (!context.listenerName().equals(internalListener) &&
internalOps.contains((short) context.requestType())) {
// 若非內部監聽器且為內部操作,則拒絕存取
return Collections.nCopies(actions.size(), AuthorizationResult.DENIED);
} else {
// 否則交由父類別處理
return super.authorize(context, actions);
}
}
}
自定義授權器設計考量
請求上下文分析
- 監聽器名稱
- 安全協定型別
- 請求型別
角色基礎存取控制(RBAC)實作
- 與外部系統(如 LDAP)整合
- 動態載入角色與群組資訊
- 多層級存取控制支援
RBAC 授權器範例
class RbacAuthorizer extends AclAuthorizer {
// 群組快取
@volatile private var groups = Map.empty[KafkaPrincipal, Set[KafkaPrincipal]]
.withDefaultValue(Set.empty)
// 角色快取
@volatile private var roles = Map.empty[KafkaPrincipal, Set[KafkaPrincipal]]
.withDefaultValue(Set.empty)
override def authorize(context: AuthorizableRequestContext,
actions: util.List[Action]): util.List[AuthorizationResult] = {
// 取得相關主體資訊
val principals = groups(context.principal) + context.principal
val allPrincipals = principals.flatMap(roles) ++ principals
// 對每個主體進行授權檢查
actions.asScala.map { action =>
// 實作授權邏輯
}.asJava
}
}
最佳實踐與安全建議
最小許可權原則
- 僅授予必要的操作許可權
- 定期檢視和清理過期的 ACL 設定
Super Users 管理
- 謹慎使用超級使用者許可權
- 避免在生產環境中使用
allow.everyone.if.no.acl.found=true
監控與稽核
- 記錄 ACL 相關的操作日誌
- 監控異常的存取行為
透過適當的 ACL 設定和自定義授權機制,可以有效提升 Kafka 叢集的安全性和存取控制的靈活性。開發人員應根據實際業務需求,選擇合適的授權方案並實施相應的安全措施。