返回文章列表

Fluentd Redis 列表輸出外掛開發實戰

本文探討如何開發自訂 Fluentd 輸出外掛,將日誌事件推播到 Redis 列表。涵蓋外掛開發流程、Redis 列表操作、Fluentd 外掛生命週期管理、組態設定、連線管理、錯誤處理及效能最佳化等關鍵技術,並提供實際程式碼範例與操作步驟,引導讀者掌握 Fluentd 外掛開發的實務技巧。

Web 開發 系統設計

Fluentd 作為一款高效能的開源日誌收集器,其靈活性與擴充性 largely built upon its plugin ecosystem. 本文將引導讀者逐步開發一個自訂 Fluentd 輸出外掛,將日誌資料推播到 Redis 列表中。此方法利用 Redis 列表的特性,確保日誌事件的順序性,並提供比嵌入式快取更具彈性的儲存方案,同時也方便與其他根據 Redis 列表的服務整合。文章將詳細說明外掛開發的各個環節,包含 Redis 連線管理、組態引數設定、日誌事件處理、錯誤處理機制以及效能調校策略,並提供完整的程式碼範例與測試步驟,協助讀者快速上手 Fluentd 外掛開發。

建立自訂外掛的技術深度解析

外掛原始碼的重要性與應用

本章節所開發的外掛原始碼已包含在書籍的下載內容中,或可從我們的 GitHub 存放函式庫(http://mng.bz/M20W)中檢索。鼓勵讀者根據提供的原始碼進行擴充套件開發,例如:

  • 遷移到 RedisTimeSeries 功能以提升效能
  • 開發支援 Fluentd 的區塊大小為基礎的緩衝區
  • 增強與 Redis 連線的安全性,例如使用 SSL/TLS 連線和憑證(使用者名稱和密碼)

所有擴充套件開發均需註明本文為原始碼的起始點。

為何選擇 Redis 及其列表功能進行外掛開發?

Redis 是一種開源、可擴充套件、根據記憶體的儲存解決方案,以名稱-值對為核心。其支援定義資料元素的生命週期(TTL),使其成為優秀的快取工具。利用 Redis 列表功能開發外掛具有多項實務效益:

  • 提供比小型嵌入式記憶體快取更具彈性的開源替代方案,能有效擴充套件和複製快取資料於多台伺服器之間
  • 促進 Fluentd 節點之間的協作效率
  • 與 Redis 列表(如 Ruby Resque)整合,提供支援或使用其他服務的機會
  • 支援事件按時間序列排序,保持日誌事件的順序性

Redis 列表 vs. RedisTimeSeries

儘管 Redis Labs 已開發出高效能的 RedisTimeSeries,用於處理時間序列資料,但仍存在一些挑戰,例如:

  • 無原生 Windows 支援,需要額外的 Linux 虛擬化處理
  • 資料結構限制,需使用「外部索引鍵」關聯其他資料結構,增加複雜度

因此,本章節選擇使用原生的 Redis 功能進行開發。

使用 Redis CLI 示範目標實作

在進行開發之前,先使用 Redis 命令列介面(CLI)模擬外掛的預期行為。首先需要安裝 Redis(詳見附錄 A),然後啟動 Redis 伺服器:

redis-server

待 Redis 伺服器啟動後,在新的終端機視窗中使用以下命令啟動 Redis CLI:

redis-cli

內容解密:

  1. redis-server 命令用於啟動 Redis 伺服器,使其在背景執行並接受客戶端連線。
  2. redis-cli 命令則是用於與 Redis 伺服器互動的命令列工具,可以用來執行各種 Redis 命令,例如設定鍵值對、操作列表等。
  3. 在模擬外掛行為時,使用 Redis CLI 可以直觀地驗證外掛的功能是否正確,以及是否符合預期。

程式碼實作與技術考量

在實際開發過程中,需考慮以下技術細節:

  • 如何有效利用 Redis 的列表功能來儲存和處理日誌事件
  • 如何確保資料的安全性和完整性,例如使用適當的錯誤處理機制
  • 如何最佳化外掛的效能,以滿足實際應用的需求

範例程式碼:

# Ruby 程式碼範例,用於示範如何使用 Redis 列表功能
require 'redis'

redis = Redis.new

# 將日誌事件推入 Redis 列表
redis.rpush('log_events', 'event1')
redis.rpush('log_events', 'event2')

# 從 Redis 列表中取出日誌事件
event = redis.lpop('log_events')
puts event

內容解密:

  1. 首先引入 redis 函式庫,以便在 Ruby 程式中使用 Redis 的相關功能。
  2. 建立一個新的 Redis 連線例項,用於與 Redis 伺服器互動。
  3. 使用 rpush 方法將日誌事件推入指定的 Redis 列表中。
  4. 使用 lpop 方法從 Redis 列表中取出最左邊的元素(即最早推入的元素),並將其輸出。

開發自訂 Fluentd 外掛程式:Redis 列表輸出外掛實作

前言

Fluentd 是一款強大的日誌收集與處理工具,其高度可擴充套件性得益於豐富的外掛生態系統。本文將引導讀者從零開始建立一個自訂的 Fluentd 輸出外掛,用於將日誌事件推播至 Redis 列表中,藉此深入瞭解 Fluentd 外掛開發的核心概念與實踐技巧。

模擬 Fluentd 輸入輸出外掛互動

在開始開發之前,我們先使用 Redis CLI 模擬 Fluentd 輸入輸出外掛的互動過程,以加深對外掛功能的理解。

  1. 啟動 Redis 服務:確保 Redis 服務已啟動並可連線。
  2. 連線 Redis CLI:開啟兩個終端視窗,分別執行 redis-cli 連線到 Redis 服務。
    • 在第一個 CLI(CLI 1)中,執行 lpush 命令將 JSON 格式的日誌事件推播到 Redis 列表中。
      lpush fluentd '{"tag":"demo", "timestamp" : 1606076261, "record" : {"blah" : "blah 1"}}'
      lpush fluentd '{"tag":"demo", "timestamp" : 1606076263, "record" : {"blah" : "blah 2"}}'
      lpush fluentd '{"tag":"demo", "timestamp" : 1606076267, "record" : {"blah" : "blah 3"}}'
      
    • 使用 llen fluentd 命令檢查列表長度。
    • 在第二個 CLI(CLI 2)中,執行 lpop fluentd 命令彈出列表中的元素,模擬 Fluentd 輸入外掛讀取日誌事件。

程式碼解析:

# 在 CLI 1 中執行
lpush fluentd '{"tag":"demo", "timestamp" : 1606076261, "record" : {"blah" : "blah 1"}}'

#### 內容解密:
- `lpush`:將指定的元素插入到列表的左側(頭部)。
- `fluentd`:Redis 中的列表鍵名。
- `JSON 字串`:代表日誌事件的 JSON 格式資料。

# 在 CLI 2 中執行
lpop fluentd

#### 內容解密:
- `lpop`:從列表左側彈出第一個元素。
- 當列表為空時,傳回 `nil`

開發準備

在開始開發自訂外掛之前,需完成以下準備工作:

  • 安裝 Fluentd 和 Ruby 環境。
  • 安裝 Redis 並確保可正常連線。
  • 選擇合適的 IDE(例如 Visual Studio Code)並安裝必要的 Ruby 擴充套件。
  • 安裝 Redis Gem 以便在 Ruby 中操作 Redis。
  • 建立用於開發外掛的目錄結構。

Fluentd 外掛框架

Fluentd 的外掛架構提供了豐富的類別層次結構,包括輸入、輸出、過濾器、解析器和格式化器等外掛型別。開發者可根據這些基礎類別建立自訂外掛。

圖示:Fluentd 外掛類別層次結構

此圖示說明瞭 Fluentd 外掛的主要類別及其繼承關係,所有這些類別均位於 lib/fluent/plugin 目錄下。

建立外掛骨架

Fluentd 提供了一個名為 fluent-plugin-generate 的工具,用於自動生成外掛的基本框架。執行以下命令以建立一個名為 redislist 的輸出外掛:

fluent-plugin-generate output redislist

該命令將在當前目錄下生成必要的檔案和目錄結構,如 lib/fluent/plugintest 等。

圖示:生成的目錄結構

@startuml
skinparam backgroundColor #FEFEFE
skinparam componentStyle rectangle

title Fluentd Redis 列表輸出外掛開發實戰

package "資料庫架構" {
    package "應用層" {
        component [連線池] as pool
        component [ORM 框架] as orm
    }

    package "資料庫引擎" {
        component [查詢解析器] as parser
        component [優化器] as optimizer
        component [執行引擎] as executor
    }

    package "儲存層" {
        database [主資料庫] as master
        database [讀取副本] as replica
        database [快取層] as cache
    }
}

pool --> orm : 管理連線
orm --> parser : SQL 查詢
parser --> optimizer : 解析樹
optimizer --> executor : 執行計畫
executor --> master : 寫入操作
executor --> replica : 讀取操作
cache --> executor : 快取命中

master --> replica : 資料同步

note right of cache
  Redis/Memcached
  減少資料庫負載
end note

@enduml

內容解密:

  • lib/fluent/plugin/<plugin name>.rb:外掛的主要實作檔案。
  • test/test_<plugin name>.rb:用於測試外掛的單元測試檔案。
  • <plugin name>.gemspec:定義 Gem 包的後設資料,用於發布外掛。

實作外掛核心

在產生了外掛的基本結構並瞭解了生命週期的各個階段之後,我們現在可以開始注入外掛所需的特定行為。接下來,我們將撰寫處理組態和執行日誌事件處理的外掛程式碼,並介紹如何與Redis建立和斷開連線。

組態屬性的運作方式

有了外掛結構後,第一步是定義外掛將使用的組態屬性。這可以透過使用config_param物件來實作,該物件接受以下值:

  • 組態檔案中使用的屬性名稱。
  • 屬性的資料型別,可以是字串、整數等。
  • 如果可以指定預設值,則為預設值。
  • 當Fluentd執行乾跑或輸出其組態的啟動資訊時,是否應該保密該值。預設情況下,這是false,因此不需要指定。但為了說明其行為,我們在程式碼中包含了它的使用。
  • 定義屬性名稱的別名。當外掛隨著時間而改變時,別名可以很有幫助。

我們需要捕捉Redis的連線埠號、主機位址(DNS名稱或IP)以及列表名稱。為此,我們需要在組態檔案中定義幾個組態屬性。程式碼是一系列descconfig_param陳述式,desc提供了對應config_param的描述,如下所示。

desc "指定連線Redis的埠號,若未指定則預設為6379"
config_param :port, :integer, default: 6379, secret: false, alias: :portNo

desc "定義Redis的主機位址,若未定義則預設為127.0.0.1"
config_param :hostaddr, :string, default: "127.0.0.1", secret: false

desc "定義在Redis中使用的列表名稱,預設為Fluentd"
config_param :listname, :string, default: "fluentd"

desc "定義在放棄連線Redis伺服器之前的重連次數"
config_param :reconnect_attempts, :integer, default: 2

desc "定義連線Redis伺服器的逾時秒數"
config_param :connection_timeout, :integer, default: 5

desc "定義一個chunk中的日誌事件數量"
config_param :chunksize, :integer, default: 20

內容解密:

  1. config_param的使用:此方法用於定義外掛的組態屬性。它接受多個引數,包括屬性名稱、資料型別、預設值等。
  2. desc的使用:在每個config_param之前使用desc來提供對該組態屬性的描述,這些描述可以用於生成外掛組態的檔案。
  3. 屬性值的參照:一旦定義了組態屬性,就可以在程式碼的其他部分使用類別層級的元素和Ruby的@符號(例如@port)來參照這些值。
  4. Redis連線的管理:將Redis連線作為類別成員變數來儲存,這樣就不必每次與Redis互動時都重新建立連線。同時,在檔案頂部加入require "redis"來引入Redis的依賴。

組態函式的實作

框架提供了configure函式,如生命週期所示(見圖9.3)。這讓我們有機會實作任何額外的自定義驗證。我們也可以使用這個方法來定義自己的類別層級變數。為了說明這一點,我們將實作檢查網路埠是否為預設的Redis埠的程式碼。如果不是預設埠,則記錄警告訊息,以提醒開發人員需要確保埠號不衝突。

def check_port(conf)
  log.trace "checkport invoked"
  port = conf['port']
  # 在這裡實作檢查埠號的邏輯
end

def configure(conf)
  super # 繼承父類別的行為
  check_port(conf) # 呼叫自定義的檢查埠號函式
end

內容解密:

  1. check_port函式:此函式用於檢查組態中的埠號設定。它檢索組態值並進行必要的檢查。
  2. configure函式的重寫:此函式在外掛生命週期中被呼叫,用於組態外掛。它首先呼叫父類別的configure方法,然後執行自定義的組態檢查邏輯。
  3. 日誌記錄:使用log.trace來記錄日誌事件,這有助於除錯和理解程式碼的執行流程。

透過這些步驟,我們可以有效地實作和組態Fluentd外掛,以滿足特定的需求和條件。

實作 RedisList 輸出外掛的核心功能

在開發 Fluentd 的自定義外掛時,瞭解其生命週期中的關鍵函式至關重要,例如 startshutdown。這些函式是建立或關閉與外部儲存系統(如 Redis)連線的理想時機。

9.6.2 啟動與關閉外掛

在外掛的生命週期中,startshutdown 是兩個最關鍵的函式。大多數外掛都會在這兩個函式中建立或關閉與外部系統的連線。由於建立連線通常較為耗時,我們希望在開始與遠端系統通訊之前完成這項任務。

連線 Redis

def connect_redis()
  log.trace "connect_redis - Create connection if non existant"
  if !@redis
    begin
      @redis = Redis.new(host: @hostaddr, port: @port, connect_timeout: @connection_timeout, reconnect_attempts: @reconnect_attempts)
      log.debug "Connected to Redis " + @redis.connected?.to_s
    rescue Redis::BaseConnectionError, Redis::CannotConnectError => conn_err
      log.error "Connection error - ", conn_err.message, "\n connection timeout=", @connection_timeout, "\n connection attempts=", @reconnect_attempts
      @redis = nil
      return nil
    rescue => err
      log.error "Error connecting to redis - ", err.message, "|", err.class.to_s
      @redis = nil
      return nil
    end
  end
end

啟動外掛

def start
  super
  log.trace "starting redis plugin\n"
  connect_redis()
end

關閉外掛

def shutdown
  super
  log.trace "shutdown"
  if @redis
    begin
      @redis.disconnect!
      log.debug "disconnecting from redis\n"
      @redis = nil
    rescue
      log.error "Error closing Redis connection"
    end
  end
end

程式碼解析:

  1. connect_redis 方法:檢查是否已存在 Redis 連線,若不存在,則嘗試建立連線,並根據組態設定連線超時和重連次數。

    • 若連線失敗,將記錄錯誤並將 @redis 設定為 nil
    • 分別處理 Redis 連線錯誤和其他一般錯誤,以提供更具體的錯誤資訊。
  2. start 方法:在外掛啟動時呼叫,繼承父類別的啟動邏輯後,建立與 Redis 的連線。

  3. shutdown 方法:在外掛關閉時呼叫,繼承父類別的關閉邏輯後,斷開與 Redis 的連線,並將 @redis 設定為 nil 以釋放資源。

9.6.3 將外掛與 Fluentd 整合

為了驗證外掛的基本功能,可以使用 Fluentd 提供的工具進行簡單測試。首先,需要準備一個測試組態檔案,例如 dummy-plugin.conf

組態檔案範例:

<match *>
  @type redislist
  portno 6379
  #<buffer>
  # flush_interval 120
  #</buffer>
</match>

Fluentd 啟動命令:

fluentd -c Chapter9/Fluentd/dummy-plugin.conf -p <plugin_absolute_path>/Chapter9/fluent-plugin-out-redislist/lib/fluent/plugin -vv

在啟動 Fluentd 前,需先啟動 Redis 伺服器。透過觀察 Redis 的連線日誌,可以驗證外掛是否正確建立和斷開連線。

9.6.4 新增額外的組態驗證

為了確認組態檢查的正確性,可以修改 Redis 的埠並更新 Fluentd 的組態檔案以連線到新的埠。

修改後的組態檔案:

將埠從 6379 修改為 16379,並重新啟動 Redis 伺服器和 Fluentd。

fluentd -c Chapter9/ExerciseResults/Fluentd/dummy-plugin-Answer.conf -p <plugin_absolute_path>/Chapter9/fluent-plugin-out-redislist/lib/fluent/plugin

若組態正確,Fluentd 的日誌中應顯示使用非標準埠的警告,而 Redis 應能正確記錄連線。

組態驗證結果:

  • 若 Fluentd 正確顯示非標準埠警告,且 Redis 能接收到連線請求,則表示組態驗證功能正常運作。
  • 此驗證過程確保了外掛能夠根據不同的組態進行適當的調整。