返回文章列表

Fluentd過濾器與解析器日誌處理應用

本文探討 Fluentd 過濾器與解析器的應用,包含修改日誌事件、新增欄位、提取 JSON 屬性、刪除欄位、使用正規表示式解析日誌、動態設定標籤和時間戳記,以及與 Prometheus 和 Elasticsearch 的整合,提供全面的日誌處理方案。

Web 開發 系統設計

Fluentd 的 record_transformerparser 外掛是強大的日誌處理工具。record_transformer 可修改現有欄位、新增欄位、使用 dig 函式安全提取巢狀 JSON 屬性,並移除特定欄位,有效控制日誌內容。parser 外掛則能利用正規表示式從非結構化日誌中提取關鍵資訊,轉換為結構化資料,方便後續分析。結合 extract 功能,更能動態設定日誌事件的標籤和時間戳記,提升日誌管理的靈活性。此外,Fluentd 與 Prometheus 和 Elasticsearch 的整合,提供更全面的監控和分析能力,實作從日誌收集、處理到視覺化展示的完整流程。

過濾器與解析器的應用

在前面的章節中,我們已經瞭解瞭如何使用 Fluentd 來處理日誌事件。在本文中,我們將探討如何使用過濾器和解析器來修改和擴充套件日誌事件。

使用 record_transformer 外掛修改日誌事件

使用過濾器可以控制哪些日誌事件被處理或不被處理,根據日誌事件的內容。這解決了很多之前描述的場景。修改日誌事件以新增額外的上下文資訊和衍生值,或提取和記錄日誌事件值,以更為有意義和可用的方式呈現,有助於其他一些場景。

為了說明這是如何工作的,我們將在標準的日誌事件之外新增新的欄位,具體如下:

  • 一個名為 computer 的欄位,包含執行 Fluentd 的主機名稱。
  • 在標準訊息前面加上 'processed-' 字首,以說明現有值的修改。
  • 日誌訊息包含一個 JSON 結構,其中包括一個 name 屬性,該屬性包含 firstnamesurname。我們將提取 firstname 並建立一個新的日誌事件屬性 from,然後刪除 surname 以解決個人識別資訊(PII)規則的問題。

RECORD DIRECTIVE

過濾器定義的關鍵部分是 record 指令。指令中的每一行代表一個欄位名稱和欄位值。例如,如果我們想要新增一個名為 myNewField 的新欄位,其字面值為 aValue,則組態如下:

<record>
  myNewField aValue
</record>

僅僅加入一個設計時的字面值並不會提供太大的價值。要告訴 Fluentd 需要處理一個衍生值,我們需要將表示式包裝在 ${} 中。我們可以透過將名稱放在括號內(例如 ${msg})來參照日誌事件中的其他欄位。要存取日誌事件訊息,我們使用 record["<field name>"](例如 record["msg"])。record 是提供給我們使用的函式的參考。

${} 字元中,我們可以使用 Ruby 物件、函式和運算元的子集,包括一些由 Fluentd 提供的。要啟用此功能,我們可以在過濾器屬性中包含 enable_ruby 屬性;當其設定為 true 時,將允許使用 Ruby 語言的全部功能。預設情況下,這被設定為 false,因為它會為解析器帶來更多的工作,例如確保它可以解析依賴關係等;為了保持效率,最好不要將其設定為 true,除非必要。

存取巢狀 JSON 元素

要取得 firstname 元素,我們需要導航日誌事件訊息部分的 JSON 結構。這可以透過標準的 record 方法完成,例如 record["name"]["firstname"],它將遍歷到 firstname 作為子屬性,但需要該屬性存在。如果結構的一部分是可選的,那麼路徑中任何缺失的部分都將觸發執行時錯誤。另一種方法是使用 record 運算子提供的名為 dig 的函式。其語法非常相似;但是,如果路徑不存在,則提供 nil 結果而不是錯誤。dig 函式是這樣使用的:

record.dig("name", "firstname")

這樣,即使 namefirstname 不存在,也不會觸發錯誤,而是傳回 nil

設定範例

讓我們來看一個具體的設定範例:

<filter **>
  @type record_transformer
  <record>
    computer ${hostname}
    msg "processed-${record['msg']}"
    from ${record.dig("name", "firstname")}
  </record>
  remove_keys surname
</filter>

程式碼解密:

  1. <filter **>:定義一個過濾器,適用於所有標籤的日誌事件。
  2. @type record_transformer:指定使用 record_transformer 外掛。
  3. <record>:定義要新增或修改的欄位。
    • computer ${hostname}:新增一個名為 computer 的欄位,其值為執行 Fluentd 的主機名稱。
    • msg "processed-${record['msg']}":修改 msg 欄位,在其值前面加上 'processed-' 字首。
    • from ${record.dig("name", "firstname")}:新增一個名為 from 的欄位,其值為 name 屬性中的 firstname 值。如果 namefirstname 不存在,則該欄位的值將為 nil
  4. remove_keys surname:刪除日誌事件中的 surname 欄位。

過濾與解析日誌事件

在處理日誌事件時,Fluentd 提供了多種方式來過濾和轉換日誌內容。其中,record_transformer 外掛允許我們對日誌事件進行 JSON 結構的操作,包括刪除或替換特定的元素。

JSON 元素刪除

record_transformer 外掛提供了多個屬性,讓我們能夠控制日誌事件元素的組成。透過設定 remove_keys 屬性,可以刪除特定的元素。例如,若要刪除 $.name.surname 路徑下的元素,可以在設定中加入 remove_keys $.name.surname。如果需要刪除多個元素,可以使用逗號分隔的列表,如 remove_keys $.name.surname, $.name.anotherName, $.somethingElse

值替換

record_transformer 外掛也允許我們替換 JSON 元素中的值。這對於資料遮蔽或修正錯誤訊息層級非常有用。替換值的操作是透過參照元素名稱並呼叫 gsub 函式來實作的。例如,若要將 msg 中的 'I' 替換為 'We',可以使用 ${record["msg"].gsub('I', 'We')}

<filter *>
  @type record_transformer
  enable_ruby true
  <record>
    computer ${hostname}
    from ${record.dig("name", "firstname")}
    msg processed ${record["msg"]}
    msg_gsub ${record["msg"].gsub('I ', 'We ')}
  </record>
  remove_keys $.name.surname
</filter>

內容解密:

  1. enable_ruby true:啟用 Ruby 支援,以便使用 record.dig 方法來定位值。
  2. <record> 區塊:新增屬性並使用已知的上下文值。
    • computer ${hostname}:新增一個名為 computer 的屬性,其值為主機名稱。
    • from ${record.dig("name", "firstname")}:新增一個名為 from 的屬性,其值為 JSON 結構中 name 下的 firstname 值。
    • msg processed ${record["msg"]}:新增一個名為 msg processed 的屬性,其值為原始的 msg 值。
    • msg_gsub ${record["msg"].gsub('I ', 'We ')}:新增一個名為 msg_gsub 的屬性,其值為將 msg 中的 'I ' 替換為 'We ' 後的結果。
  3. remove_keys $.name.surname:刪除 $.name.surname 路徑下的元素,以避免包含個人身份資訊(PII)。

預定義值

record_transformer 外掛提供了一些預定義的值,包括主機名稱、當前時間和日誌事件標籤。啟用 Ruby 支援後,可以使用 ${} 語法來存取任何公共類別方法。例如,${#Dir.getwd} 可以取得當前工作目錄。

Filter Parser vs. Record Transformer

如果日誌事件只是一個單一的文字區塊,我們可能需要使用解析器來提取有意義的值。解析器(如 regexp)可以與 filter 指令一起使用,將命名的元素轉換為頂級的日誌事件屬性。

<filter *>
  @type parser
  format regexp
  key_name log
  expression /^(?<time>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) \[(?<level>\w+)\] (?<msg>.*)$/
  reserve_data true
</filter>

內容解密:

  1. @type parser:指定使用解析器外掛。
  2. format regexp:指定解析格式為正規表示式。
  3. key_name log:指定要解析的日誌事件屬性名稱。
  4. expression:定義正規表示式來解析日誌內容。
  5. reserve_data true:保留原始的日誌事件屬性。

Fluentd 過濾與擷取技術解析

動態設定日誌事件關鍵值

在處理日誌事件時,我們經常需要動態設定日誌事件的主要屬性,例如時間(time)和標籤(tag)。這是因為靜態設定的值可能無法滿足實際需求,特別是在收集來自多個微服務的日誌事件時。Fluentd 的 extract 功能允許我們從日誌事件記錄中擷取所需的值,並將其用於設定標籤或調整時間戳。

使用 Extract 功能

extract 指令可以整合到 sourcefiltermatch(輸出)外掛中,例如 exec 外掛。雖然 extract 機制非常靈活,但它僅限於操作 tagtime 日誌事件屬性。

設定標籤和時間

我們可以宣告如何解釋時間值。時間值可以是自 Epoch(1970 年 1 月 1 日午夜 UTC)以來的秒數,例如 1601495341 對應到 2020 年 9 月 30 日 19:49:01。另一種可能的格式是 ISO 8601 標準。

示範:使用 Exec Source Plugin 動態設定標籤

考慮一個簡單的例子,我們需要根據 exec source plugin 獲得的值來設定標籤。與之前使用 exec 一樣,我們選擇了一個簡單的命令,便於在不同的作業系統上使用或調整。同時,我們獲得了一個結構化的物件,因此在檢索值之前不需要解析負載。

<source>
  @type exec
  command more TestData\exe-src.json
  run_interval 10s
  tag exec
  <parse>
    @type json
  </parse>
  <extract>
    tag_key service_name
    keep_tag_key true
  </extract>
</source>

設定解說:

  • @type exec:指定使用 exec 外掛作為輸入源。
  • command more TestData\exe-src.json:執行的命令,用於產生輸入資料。
  • run_interval 10s:設定命令執行的間隔為 10 秒。
  • tag exec:初始設定的標籤。
  • <parse>@type json</parse>:將命令輸出的資料解析為 JSON 格式。
  • <extract>...</extract>:定義擷取規則。
    • tag_key service_name:指定用於設定標籤的日誌事件元素鍵值。
    • keep_tag_key true:保留原始的標籤鍵值。

程式碼作用與邏輯:

  1. @type exec:選擇合適的輸入外掛來執行外部命令並捕捉其輸出。
  2. commandrun_interval:定義執行的命令和執行頻率,用於模擬動態資料來源。
  3. <parse>:將捕捉的資料解析為結構化格式(JSON),便於進一步處理。
  4. <extract>:從解析後的資料中擷取特定欄位(service_name),並用於動態設定日誌事件的標籤,同時保留原始欄位。

使用 record_transformer 衍生新資料值

利用排除日誌事件和提取特定值的能力,我們現在可以考慮生成衍生值和指標。例如,我們可能想要了解錯誤發生的頻率,或是哪些元件或程式碼部分是大多數錯誤的來源。雖然使用 Elasticsearch 或 Splunk 生成這樣的指標是可能的,但它們通常是在事件分析之後使用的。如果我們想要更主動地進行監控,就需要在事件發生時更動態地計算指標。

實作計算的日誌事件轉換

有些人認為採用出生年份比年齡更不具個人特徵。這意味著您被要求修改儲存的已記錄資料。有人認為應該新增出生年份,並移除年齡屬性。Chapter6/Fluentd/file-source-transformed-elastic-search-out.conf 設定檔已被確定為整合必要變更的起點。同樣的測試來源(./Chapter6/SimulatorConfig/log-source-1.properties)可以用於執行設定。為了便於識別此場景的輸出,請變更 match 設定中的 index_name

我們的實作設定可以在 Chapter6/Exercise-Results/file-source-transformed-elastic-search-out-Answer2.conf 中看到。主要的變更是在 record 指令中加入 birthYr ${Date.today.year - record['age']},以及在之後加入 remove_keys $.age 指令。結果可以使用先前描述的 UI 在 Elasticsearch 的內容中進行檢查。

程式碼範例

<record>
  birthYr ${Date.today.year - record['age']}
</record>

內容解密:

  1. ${Date.today.year - record['age']}:此表示式計算目前年份與記錄中的年齡之間的差值,從而得出出生年份。
  2. record['age']:這裡使用了單引號來參照記錄中的 age 屬性,因為在 record_transformer 中,屬性需要用單引號括起來。
  3. remove_keys $.age:此指令用於移除記錄中的 age 屬性,以保持資料的一致性和相關性。

生成簡單的 Fluentd 指標

Fluentd 有一個由 CNCF 控制的優秀合作專案 Prometheus,其作用是處理和建立根據指標的資料。Prometheus 通常也與 Grafana 相關聯,用於視覺化這些資料。Prometheus 和 Grafana 與微服務相關聯,但與 Fluentd 一樣,沒有真正的限制或理由不將這些工具用於微服務生態系統之外。

Fluentd 與 Prometheus 的合作

Fluentd 可以在多個方面與 Prometheus 相關聯,包括:

  • 作為資料來源,將資料推播到 Push Gateway,Prometheus 可以從中計算指標。
  • 使用 monitor_agent 外掛,將 Fluentd 內部指標以 Prometheus 格式提供給 Prometheus 伺服器。
  • 作為記錄指標警示的通道,透過 Alert Mgr。
@startuml
skinparam backgroundColor #FEFEFE
skinparam defaultTextAlignment center
skinparam rectangleBackgroundColor #F5F5F5
skinparam rectangleBorderColor #333333
skinparam arrowColor #333333

title Fluentd 與 Prometheus 的合作

rectangle "資料來源" as node1
rectangle "內部指標" as node2
rectangle "警示" as node3

node1 --> node2
node2 --> node3

@enduml

此圖示描述了 Fluentd 如何與 Prometheus 生態系統互動,提供了多種整合方式。

詳細解說:
  1. 資料來源:Fluentd 可以將資料推播到 Push Gateway,作為 Prometheus 計算指標的來源。
  2. 內部指標:透過 monitor_agent 外掛,Fluentd 可以將其內部指標以 Prometheus 格式提供給 Prometheus 伺服器,無需額外的準備步驟。
  3. 警示通道:Fluentd 可以透過 Alert Mgr 記錄指標警示,這是監控和預警系統的重要組成部分。

Fluentd 在日誌事件過濾和推斷中的應用

簡介

Fluentd 是一款強大的日誌收集和處理工具,它能夠對日誌事件進行過濾和推斷,從而更好地滿足各種應用場景的需求。在本章中,我們將探討 Fluentd 在日誌事件過濾和推斷中的應用,包括如何使用 Fluentd 外掛來實作日誌事件的計數、轉換和輸出。

使用 Fluentd 外掛進行日誌事件計數

Fluentd 提供了多種外掛來實作日誌事件的計數,其中包括 fluent-plugin-datacounterfluent-plugin-numeric-counter。這些外掛可以根據特定的條件對日誌事件進行計數,並輸出計數結果。

fluent-plugin-datacounter 外掛的使用

fluent-plugin-datacounter 外掛可以根據正規表示式匹配日誌事件中的特定欄位,並對匹配到的事件進行計數。下面是一個使用 fluent-plugin-datacounter 外掛的範例組態:

<match *>
  @type datacounter
  @id counted
  tag counted
  count_key msg
  count_interval 1m
  aggregate all
  output_messages yes
  pattern1 p1 computer
</match>

在這個組態中,我們使用 datacounter 外掛對 msg 欄位中包含 computer 的日誌事件進行計數,計數間隔為 1 分鐘。

程式碼解說

  • @type datacounter:指定使用的外掛型別為 datacounter
  • @id counted:為該外掛例項指定一個唯一的 ID。
  • tag counted:指定輸出事件的標籤為 counted
  • count_key msg:指定要計數的日誌事件欄位為 msg
  • count_interval 1m:指定計數間隔為 1 分鐘。
  • pattern1 p1 computer:定義一個正規表示式模式,用於匹配 msg 欄位中包含 computer 的事件。

將日誌事件計數結果輸出到 Elasticsearch

我們可以將日誌事件計數結果輸出到 Elasticsearch 中,以便進行進一步的分析和視覺化。下面是一個將計數結果輸出到 Elasticsearch 的範例組態:

<match *>
  @type elasticsearch
  host localhost
  port 9200
  index_name fluentd-book-counted
  scheme http
  logstash_format true
  reload_on_failure true
  include_tag_key true
  tag_key tag
  <buffer>
    flush_interval 5s
  </buffer>
</match>

程式碼解說

  • @type elasticsearch:指定輸出的型別為 Elasticsearch。
  • hostport:指定 Elasticsearch 的主機和埠。
  • index_name:指定輸出到 Elasticsearch 的索引名稱。
  • logstash_format true:啟用 Logstash 格式的輸出。
  • <buffer>:定義緩衝區的組態,包括重新整理間隔等。

重點回顧

 Fluentd 可以透過外掛實作日誌事件的計數和轉換。  使用 fluent-plugin-datacounter 外掛可以根據正規表示式匹配日誌事件中的特定欄位,並對匹配到的事件進行計數。  可以將日誌事件計數結果輸出到 Elasticsearch 中,以便進行進一步的分析和視覺化。  Fluentd 的外掛機制提供了豐富的功能擴充套件,可以滿足不同的應用場景需求。