返回文章列表

Spark 結構化串流的多源資料處理與架構

本文深入探討串流資料處理架構的三大核心組件:來源、處理引擎與接收器。文章聚焦於運用 Apache Spark 的結構化串流 API,闡述其微批次處理機制如何有效平衡資料新鮮度與運算成本。內容涵蓋從 Kafka、Kinesis 等訊息佇列,以及 CSV、Delta Lake 等物件儲存格式讀取串流資料的實務方法。同時,也簡要介紹了聯集與連接等基礎資料轉換操作,展示 Spark 在整合多源串流資料時的強大功能與彈性。

資料工程 大數據

在現代資料平台中,即時數據的價值日益凸顯,促使企業架構從傳統的批次處理轉向更具時效性的串流處理模式。此模式的核心在於能夠增量讀取與處理持續產生的資料,確保決策的即時性。本文將深入解析串流架構的組成要素,特別是 Apache Spark 結構化串流如何扮演關鍵的處理引擎角色。我們將探討其如何與 Kafka、Kinesis 等主流訊息服務,以及作為資料湖核心的物件儲存(如 Delta Lake)進行整合。透過分析 Spark 的微批次執行模型與統一的 API 設計,我們將理解其在簡化開發複雜度、實現容錯保證,以及靈活應對從即時警報到分析儀表板等多樣化業務需求方面的理論基礎與實踐價值。

第五章:物件儲存與資料湖

串流資料

為了攝取串流資料,需要三樣東西:一個可以增量讀取的來源、一個從這個來源讀取的處理引擎,以及最後一個處理結果的存放位置

第一個部分是來源——通常是Kafka、Kinesis、Azure Event HubsAWS SQS等訊息服務。它也可以是物件儲存的形式,或者建立在物件儲存之上的湖倉一體格式。所有這些來源的共同點是,可以追蹤已處理的內容,以便每個訊息或物件只處理一次。

第二個部分是某種類型的計算組件,它可以從這些來源讀取,進行某種類型的處理,追蹤已處理的內容,並能夠將結果寫入接收器以供消費。

處理引擎可以是雲端供應商上的無伺服器功能Apache FlinkApache Spark。然而,玄貓將專注於使用Spark作為處理引擎。Spark使用微批次的概念運行,這意味著每次執行程式碼時,它都會從尚未處理的來源讀取資料,根據指定的程式碼進行處理,並將其放入接收器微批次。這裡的關鍵是執行時間的概念。這個執行可以是在上一個微批次完成後立即執行、每秒、每五分鐘、每小時、每天一次等等。

串流架構的好處是只處理新資料。處理這些資料的速度完全取決於手頭的用例。如果串流管線用於新鮮儀表板的分析用例,那麼管線可能每小時運行一次。每次管線運行時,它都會處理新資料,然後停止處理。這是有益的,因為它平衡了新鮮資料和成本。管線不需要持續運行,並為僅需要每小時新鮮資料的用例產生費用。在另一個極端,圍繞詐欺檢測的用例將需要此管線持續處理資料,計算交易是詐欺的可能性,並將這些推論推送到接收器以進行即時警報和行動。

玄貓已經了解了什麼是串流,現在讓玄貓探索如何將串流來源與Spark一起使用。

使用串流來源

Apache Spark使用結構化串流API從各種串流來源讀取。這個API對開發人員來說很容易使用,因為它與Spark的批次API非常互通,因此開發人員可以在兩種用例中重複使用他們的知識。以下是使用SparkKafkaKinesis讀取並在控制台中顯示結果的一些範例。writeStream部分將在本節後面更詳細地介紹:

// KINESIS
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger

val spark = SparkSession
.builder()
.appName("Kinesis Streaming Example")
.master("local[*]")
.getOrCreate()

var dfKinesis = spark.readStream.format("kinesis")
.option("streamName","scala-book")
.option("region","us-east-1")
.option("initialPosition","TRIM_HORIZON")
.option("awsAccessKeyId",sys.env.getOrElse("AWS_ACCESS_KEY_ID",""))
.option("awsSecretKey",sys.env.getOrElse("AWS_SECRET_ACCESS_KEY",""))
.load()

dfKinesis = dfKinesis.withColumn("data",col("data").cast("string"))
.selectExpr("from_json(data,'id int, name string') as data")
.select("data.*")

val queryKinesis = dfKinesis.writeStream
.trigger(Trigger.ProcessingTime("10 seconds"))
.format("console")
.start()

queryKinesis.awaitTermination()

// KAFKA
var dfKafka = spark.readStream.format("kafka")
.option("subscribe", "scala-book")
.option("kafka.bootstrap.servers", sys.env.getOrElse("BOOTSTRAP_SERVERS",""))
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.jaas.config", sys.env.getOrElse("KAFKA_SASL_JAAS_CONFIG","")) // 假設這裡有配置
.option("kafka.request.timeout.ms", "10000")
.option("kafka.session.timeout.ms", "10000")
.option("failOnDataLoss", "true")
.load()

這段程式碼展示了如何使用Spark的結構化串流APIKinesisKafka讀取資料。對於Kinesis,它配置了串流名稱、區域和初始讀取位置,並從環境變數中獲取AWS憑證。讀取到的二進位資料被轉換為字串,然後使用from_json解析為結構化資料。writeStream部分設定了每10秒觸發一次處理,並將結果輸出到控制台。對於Kafka,它配置了訂閱的主題和Kafka伺服器,並處理了SASL認證相關的選項。這兩個範例都體現了Spark結構化串流的靈活性和易用性,能夠處理來自不同訊息佇列服務的串流資料。

此圖示:串流資料處理架構

@startuml
!define DISABLE_LINK
!define PLANTUML_FORMAT svg
!theme _none_

skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100

actor "資料產生者" as Producer

package "串流資料來源" as StreamingSources {
queue "Kafka" as KafkaSource
queue "Kinesis" as KinesisSource
queue "Azure Event Hubs" as EventHubsSource
queue "AWS SQS" as SQSSource
folder "物件儲存 (Delta Lake)" as ObjectStoreSource
}

package "串流處理引擎" as ProcessingEngine {
component "Apache Spark (結構化串流)" as SparkStreaming
component "Apache Flink" as Flink
component "雲端無伺服器功能" as ServerlessFunctions
}

package "資料接收器 (Sink)" as DataSinks {
database "資料庫 (OLTP/OLAP)" as DatabaseSink
folder "物件儲存 (Delta Lake)" as ObjectStoreSink
queue "訊息佇列 (下游系統)" as MessageQueueSink
component "儀表板/報表" as Dashboard
component "機器學習模型" as MLModel
component "即時警報系統" as AlertSystem
}

Producer --> KafkaSource
Producer --> KinesisSource
Producer --> EventHubsSource
Producer --> SQSSource
Producer --> ObjectStoreSource

KafkaSource --> SparkStreaming : 讀取增量資料
KinesisSource --> SparkStreaming : 讀取增量資料
EventHubsSource --> SparkStreaming : 讀取增量資料
SQSSource --> SparkStreaming : 讀取增量資料
ObjectStoreSource --> SparkStreaming : 讀取增量資料

SparkStreaming --> DatabaseSink : 寫入處理結果
SparkStreaming --> ObjectStoreSink : 寫入處理結果 (例如 Delta 格式)
SparkStreaming --> MessageQueueSink : 寫入處理結果
SparkStreaming --> Dashboard : 驅動即時儀表板
SparkStreaming --> MLModel : 更新模型特徵
SparkStreaming --> AlertSystem : 觸發警報

note right of SparkStreaming
- 微批次處理
- 追蹤已處理資料
- 支援多種來源與接收器
- 批次/串流 API 互通
end note

note "增量處理是核心價值" as IncrementalNote
SparkStreaming .. IncrementalNote
end note

@enduml

看圖說話:

此圖示清晰地描繪了串流資料處理的完整架構,從資料產生到最終消費的整個流程。資料由資料產生者產生後,會進入各種串流資料來源,例如Kafka、Kinesis、Azure Event HubsAWS SQS等訊息佇列,甚至可以是物件儲存(如Delta Lake)。這些來源的共同特點是能夠支援增量讀取,確保每個訊息或物件只被處理一次。接著,串流處理引擎(如Apache Spark結構化串流、Apache Flink或雲端無伺服器功能)會從這些來源讀取資料。Spark結構化串流的核心是微批次處理,它能夠追蹤已處理的資料,並根據預設的觸發時間(從即時到每日)執行處理邏輯。處理後的結果會被寫入各種資料接收器,包括資料庫、物件儲存(特別是Delta Lake格式)、訊息佇列,甚至直接驅動儀表板、機器學習模型更新即時警報系統。整個架構強調了增量處理的核心價值,它不僅能有效平衡資料新鮮度與成本,還能實現從批次到即時的靈活資料處理需求。

第五章:物件儲存與資料湖

使用串流來源

對於每個訊息,消費者遵循相同的基本結構,從readStream開始,並設定format以告知消費者從哪種類型的串流來源消費。其餘選項是來源特定的。這兩種來源都將具有Base64編碼的資料,需要先轉換為字串,然後典型的模式是從有效負載中解碼JSON,以便能夠處理來自來源的簡單DataFrame。最終結果如下圖所示:

此圖示:處理輸出到控制台

這段程式碼執行後,會將處理後的串流資料持續輸出到控制台,類似於:

-------------------------------------------
Batch: 0
-------------------------------------------
+---+----+
| id|name|
+---+----+
| 1|john|
| 2|jane|
+---+----+
-------------------------------------------
Batch: 1
-------------------------------------------
+---+-----+
| id| name|
+---+-----+
| 3|peter|
| 4|mary |
+---+-----+
...

物件儲存是另一個常見的串流資料來源。當資料從源系統放置到資料湖或湖倉一體的暫存區進行處理、豐富和儲存以進行分析處理時,這是一個非常典型的模式。暫存位置可以是CSV、JSON、Parquet湖倉一體格式(例如Delta Lake)。這裡有兩個範例——第一個是讀取CSV資料S3位置;第二個是作為來源的Delta資料表

// CSV 檔案作為串流來源
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger

val spark = SparkSession
.builder()
.appName("CSV Streaming Example")
.master("local[*]")
.getOrCreate()

val dfCsv = spark.readStream.format("csv")
.option("header","true")
.schema("id int, name string") // 必須指定 Schema
.load("s3a://scala-data-engineering/my/first/")

val queryCsv = dfCsv.writeStream
.trigger(Trigger.ProcessingTime("10 seconds"))
.format("console")
.start()
queryCsv.awaitTermination()

// DELTA 資料表作為串流來源
val dfDelta = spark.readStream.format("delta")
.load("s3a://scala-data-engineering/my/first_delta")

val queryDelta = dfDelta.writeStream
.trigger(Trigger.ProcessingTime("10 seconds"))
.format("console")
.start()
queryDelta.awaitTermination()

與訊息服務一樣,物件串流遵循相同的API模式。只需指定格式(在本例中為CSVDelta)並提供該格式特定的選項。這些格式可以替換為JSON、ORCParquet。請注意,CSVDelta(或JSON/Parquet/ORC)之間存在細微差別,即從Delta讀取時未指定Schema。這是因為Delta格式將Schema包含在物件位置中,讀取器將能夠在首次掃描Delta日誌時知道Schema。玄貓將在下一節中了解其中的細節。

處理與接收器

串流來源只是串流架構的第一部分。僅僅從上述來源之一讀取並不能獲得任何有價值的東西。為了獲得價值,資料需要以某種方式進行處理,然後儲存到某種類型的目的地。Spark中的處理可以根據手頭的用例採取多種形式。有時,簡單的轉換,例如將CSVJSON等原始形式轉換為ParquetDelta,以獲得更好的讀取效能,而無需更改資料結構本身。其他時候,需要進行連接以進行豐富或聚合以進行匯總,以進行更豐富的資料分析。

以下範例將引導玄貓了解串流架構中一些非常常見的資料處理需求。為了簡化起見,每個範例都將使用控制台接收器,將接收器的詳細資訊留到本節後面。

僅從原始檔案位置讀取而不進行任何轉換並寫入控制台接收器的最簡單處理範例已在上一節中介紹。這裡將討論的第一種類型的轉換是聯集(Unions)連接(Joins)。當兩個不同的串流來源需要匯集在一起並寫入接收器時,通常會使用聯集聯集的語法對於熟悉DataFrame API的人來說會非常熟悉。

在以下範例中可以替換任何串流來源,並且可以使用多個DataFrame

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger

val spark = SparkSession
.builder()
.appName("Streaming Union Example")
.master("local[*]")
.getOrCreate()

val df_a = spark.readStream.format("csv")
.option("header","true")
.schema("id int, name string")
.load("s3a://scala-data-engineering/my/first_stream_a/")

val df_b = spark.readStream.format("csv")
.option("header","true")
.schema("id int, name string")
.load("s3a://scala-data-engineering/my/first_stream_b/")

此圖示:Spark 結構化串流處理流程

@startuml
!define DISABLE_LINK
!define PLANTUML_FORMAT svg
!theme _none_

skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100

actor "資料產生者" as Producer

package "資料來源" as Sources {
queue "Kafka Topic" as Kafka
queue "Kinesis Stream" as Kinesis
folder "S3 (CSV/JSON/Parquet)" as S3Folder
folder "Delta Lake Table" as DeltaTableSource
}

package "Spark 結構化串流" as SparkStreamingEngine {
component "讀取串流 (readStream)" as ReadStream
component "資料轉換 (DataFrame Operations)" as Transform
component "寫入串流 (writeStream)" as WriteStream
rectangle "檢查點目錄" as CheckpointDir
}

package "資料接收器" as Sinks {
database "外部資料庫" as ExternalDB
folder "S3 (Parquet/Delta)" as S3Sink
queue "下游訊息佇列" as DownstreamQueue
component "控制台輸出" as ConsoleOutput
}

Producer --> Kafka
Producer --> Kinesis
Producer --> S3Folder : 放置新檔案
Producer --> DeltaTableSource : 寫入新資料

Kafka --> ReadStream : 讀取增量訊息
Kinesis --> ReadStream : 讀取增量記錄
S3Folder --> ReadStream : 監聽新檔案
DeltaTableSource --> ReadStream : 監聽新版本

ReadStream --> Transform : 獲取微批次資料
Transform --> WriteStream : 處理後的微批次資料

WriteStream --> ExternalDB : 寫入
WriteStream --> S3Sink : 寫入
WriteStream --> DownstreamQueue : 寫入
WriteStream --> ConsoleOutput : 寫入

ReadStream .. CheckpointDir : 記錄讀取進度
WriteStream .. CheckpointDir : 記錄寫入進度 (at-least-once)

note right of ReadStream
- format()
- option()
- load()
end note

note right of Transform
- select(), where(), withColumn()
- join(), union()
- aggregate()
end note

note right of WriteStream
- format()
- outputMode()
- trigger()
- start()
end note

@enduml

看圖說話:

此圖示詳細展示了Spark結構化串流的資料處理流程。首先,資料產生者將資料發送到多種資料來源,包括Kafka、Kinesis等訊息佇列,或S3資料夾、Delta Lake資料表等檔案儲存。Spark結構化串流引擎透過readStream組件從這些來源增量讀取資料readStream會持續監聽新資料,並將其作為微批次傳遞給資料轉換組件。在資料轉換階段,可以執行各種DataFrame操作,例如選擇、過濾、聯集、連接和聚合等,以豐富或精煉資料。處理後的資料隨後由writeStream組件寫入各種資料接收器,包括外部資料庫、S3儲存、下游訊息佇列或直接輸出到控制台。整個過程中,檢查點目錄扮演著關鍵角色,它記錄了讀取和寫入的進度,確保了處理的容錯性至少一次的語義。這種架構使得Spark能夠高效、可靠地處理持續不斷的資料流,並將其應用於多種業務場景。

縱觀現代企業對即時洞察的需求,串流資料架構的崛起不僅是技術演進,更深層地反映了一種從靜態分析到動態感知的管理思維轉變。它將數據從週期性的「歷史報告」解放出來,轉化為持續流動的「商業脈搏」,為組織提供了前所未有的敏捷性。

分析此架構的內涵,其核心挑戰並非單純的技術選型,而在於「資料新鮮度」與「營運成本」之間的策略權衡。這考驗著管理者對業務價值的深刻理解,判斷何種決策場景值得為「即時性」投入資源。Apache Spark 以其統一的批次與串流 API,顯著降低了技術實踐的門檻,但將多元來源的資料流整合為單一、可信的真實來源,並確保端到端的處理韌性,依然是從概念走向穩定運營的關鍵瓶頸。

展望未來,批次與串流處理的界線將會持續模糊,最終融合成一種無縫的「整合式數據處理」典範。這將推動企業的數據策略從被動的「回顧式分析」演進為主動的「預測性感知」,使組織能夠更即時地捕捉機會與規避風險。

玄貓認為,對於追求卓越營運的管理者而言,掌握串流架構的精髓,關鍵在於將其視為一種建立「即時反饋迴路」的策略工具。應優先將其應用於高價值的場景,如風險偵測或客戶體驗優化,以此為切入點,逐步培養整個組織駕馭即時數據流的核心能力。