返回文章列表

Spark 資料轉換:處理 JSON 陣列與嵌入式 XML

本文探討在 Spark 環境中處理複雜資料結構的兩種關鍵技術。首先,文章示範如何利用 `explode` 函數將 JSON 資料中的陣列欄位展開為多行,以便進行後續的聚合運算。其次,針對 JSON 檔案中內嵌 XML 字串的場景,本文介紹如何整合 Databricks 的 `spark-xml` 函式庫,透過 `from_xml` 與 `schema_of_xml` 函數,動態解析 XML 內容並將其轉換為結構化的 Spark DataFrame 欄位,展現 Spark 處理混合與巢狀資料的彈性。

資料工程 巨量資料

在當代資料工程實務中,資料源的格式日益複雜,純粹的表格式資料已非主流。企業時常需要處理半結構化資料,例如包含巢狀陣列的 JSON 物件,或是將 XML 格式的資訊嵌入在其他資料流中。這些複雜的資料結構雖然提供了豐富的資訊層次,卻也對資料處理與分析帶來挑戰。若無法有效進行扁平化或結構化解析,後續的聚合、統計與機器學習應用將難以進行。本文將聚焦於 Apache Spark 框架,深入探討其處理這類複雜資料類型的核心轉換機制。我們將透過具體範例,解析如何將巢狀的 JSON 陣列展開,以及如何運用外部函式庫解析嵌入式 XML,展示將非結構化資訊轉化為可分析洞見的關鍵步驟。

第六章:深入理解資料轉換

處理複雜資料集類型

| observations |
+--------------------+
| [8.3]|
| [9.9, 6.5]|
| [4.9, 6.5, 9.0]|
| [4.2, 8.8, 3.2]|
| [5.7]|
|[6.4, 5.2, 5.5, 4.1]|
| [3.2]|
| [7.5, 6.1]|
|[8.0, 9.9, 3.4, 8...|
|[5.1, 1.3, 7.7, 6...|
+--------------------+

玄貓已經讀入了一些JSON資料,其中包含一個帶有值陣列的欄位。具體來說,observations欄位是一個Double陣列。以其目前的形狀,它不是很實用,但玄貓可以將巢狀值拆分為行,然後執行聚合,如下面的程式碼所示。

dfDevicesJson
.select($"id", explode($"observations") // 將 observations 陣列展開為多行
.alias("observation"))
.groupBy("id") // 根據 id 進行分組
.agg(
max($"observation").alias("max_obs"), // 計算最大值
min($"observation").alias("min_obs"), // 計算最小值
avg($"observation").alias("avg_obs")) // 計算平均值

在上述範例中,玄貓將玄貓的observations資料展開為每條記錄中每個元素獨立的行,並帶有父記錄的ID。玄貓現在可以按ID進行分組,並計算每個ID的最大值、最小值和平均值。

+---+-------+-------+------------------+
| id|max_obs|min_obs| avg_obs|
+---+-------+-------+------------------+
| 0| 8.3| 8.3| 8.3|
| 1| 9.9| 6.5| 8.2|
| 2| 9.0| 4.9| 6.8|
| 3| 8.8| 3.2|5.3999999999999995|
| 4| 5.7| 5.7| 5.7|
| 5| 6.4| 4.1| 5.300000000000001|
| 6| 3.2| 3.2| 3.2|
| 7| 7.5| 6.1| 6.8|
| 8| 10.0| 3.4| 7.959999999999999|
| 9| 9.2| 1.3| 5.88|
+---+-------+-------+------------------+

現在,讓玄貓看看玄貓將如何管理JSON檔案中嵌入的XML資料

val dfDevicesXMLInJson = spark.read.json(
"src/main/scala/com/packt/dewithscala/chapter6/data/devicesWithXML.json")
dfDevicesXMLInJson.printSchema()

這是輸出:

root
|-- country: string (nullable = true)
|-- device_id: string (nullable = true)
|-- event_ts: string (nullable = true)
|-- event_type: string (nullable = true)
|-- id: long (nullable = true)
|-- line: string (nullable = true)
|-- manufacturer: string (nullable = true)
|-- observations: array (nullable = true)
| |-- element: double (containsNull = true)
|-- xmlObservations: string (nullable = true)

玄貓現在在玄貓的資料中多了一個名為xmlObservationsstring類型欄位。這是observations欄位的XML表示,玄貓現在需要解析它以產生與玄貓先前程式碼相同的結果。然而,這並不像看起來那麼容易。

以下是xmlObservations欄位中的資料範例:

<observations>
<observation>8.3</observation>
</observations>
<observations>
<observation>9.9</observation>
<observation>6.5</observation>
</observations>

玄貓將不得不引入一個名為spark-xml的函式庫,來自Databricks

import com.databricks.spark.xml.functions.from_xml
import com.databricks.spark.xml.schema_of_xml

這兩個導入將允許玄貓使用兩個函數。

如下面的程式碼所示,from_xml函數將解析欄位中的XML,而schema_of_xml將讀取XML值並生成Spark可以用來讀取資料的Schema:

val dfXmlFun = dfDevicesXMLInJson
.withColumn("parsed", from_xml($"xmlObservations", // 解析 xmlObservations 欄位
schema_of_xml(dfDevicesXMLInJson // 自動推斷 XML 的 Schema
.select("xmlObservations").as[String])))
dfXmlFun.printSchema()

這是輸出:

root
|-- country: string (nullable = true)
|-- device_id: string (nullable = true)
|-- event_ts: string (nullable = true)
|-- event_type: string (nullable = true)
|-- id: long (nullable = true)
|-- line: string (nullable = true)
|-- manufacturer: string (nullable = true)
|-- observations: array (nullable = true)
| |-- element: double (containsNull = true)
|-- xmlObservations: string (nullable = true)
|-- parsed: struct (nullable = true)
| |-- observation: array (nullable = true)
| | |-- element: double (containsNull = true)

在上述程式碼中,玄貓將這兩個函數與withColumn方法一起使用,以添加一個新欄位。

此圖示:複雜資料類型處理與XML解析流程

@startuml
!define DISABLE_LINK
!define PLANTUML_FORMAT svg
!theme _none_

skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100

package "JSON 陣列處理" as JsonArrayProcessing {
rectangle "dfDevicesJson: DataFrame" as DevicesJsonDF
component "select($"id", explode($"observations").alias(\"observation\"))" as ExplodeObservations
component "groupBy(\"id\").agg(max, min, avg)" as AggregateObservations
rectangle "dfAggregatedObservations: DataFrame" as AggregatedObsDF
}

package "嵌入式 XML 資料處理" as EmbeddedXmlProcessing {
file "devicesWithXML.json" as JsonWithXmlFile
component "spark.read.json(\"devicesWithXML.json\")" as ReadJsonWithXml
component "printSchema()" as PrintSchemaXml
rectangle "dfDevicesXMLInJson: DataFrame" as DevicesXmlInJsonDF
component "import com.databricks.spark.xml.functions.from_xml" as ImportFromXml
component "import com.databricks.spark.xml.schema_of_xml" as ImportSchemaOfXml
component "withColumn(\"parsed\", from_xml($"xmlObservations", schema_of_xml(...)))" as ParseXmlColumn
component "printSchema()" as PrintParsedSchema
rectangle "dfXmlFun: DataFrame" as XmlParsedDF
}

DevicesJsonDF --> ExplodeObservations
ExplodeObservations --> AggregateObservations
AggregateObservations --> AggregatedObsDF : 處理 JSON 陣列並聚合

JsonWithXmlFile --> ReadJsonWithXml
ReadJsonWithXml --> PrintSchemaXml
PrintSchemaXml --> DevicesXmlInJsonDF : 讀取 JSON 內嵌 XML 資料

ImportFromXml -[hidden]-> ParseXmlColumn
ImportSchemaOfXml -[hidden]-> ParseXmlColumn
DevicesXmlInJsonDF --> ParseXmlColumn
ParseXmlColumn --> PrintParsedSchema
PrintParsedSchema --> XmlParsedDF : 解析 XML 欄位並顯示新 Schema

note right of AggregatedObsDF
- 將 JSON 陣列展開後進行統計分析
- 適用於處理一對多關係的數值資料
end note

note right of XmlParsedDF
- 使用 spark-xml 函式庫解析嵌入式 XML
- 自動推斷 XML 結構並轉換為 Spark DataFrame 類型
end note

@enduml

看圖說話:

此圖示清晰地描繪了兩種處理複雜資料類型的方法:JSON陣列處理嵌入式XML資料處理

JSON陣列處理部分,玄貓從dfDevicesJson DataFrame開始,該DataFrame包含一個observations欄位,其值為陣列。透過explode函數,玄貓將這個陣列中的每個元素轉換為獨立的行,並與其父記錄的id關聯。隨後,玄貓對這些展開的資料按id進行groupBy,並執行maxminavg等聚合操作,最終生成了dfAggregatedObservations,它提供了每個id對應的觀察值的統計摘要。

嵌入式XML資料處理部分,玄貓展示了如何處理JSON檔案中包含XML字串的場景。玄貓首先讀取devicesWithXML.json檔案,得到dfDevicesXMLInJson。這個DataFrame包含一個名為xmlObservations的字串欄位,其中儲存著XML格式的觀察資料。為了有效解析這些XML,玄貓引入了Databricksspark-xml函式庫中的from_xmlschema_of_xml函數。from_xml函數負責解析XML字串,而schema_of_xml則自動推斷XML結構並生成相應的Schema。透過withColumn操作,玄貓將解析後的XML作為一個新的結構化欄位parsed添加到DataFrame中,最終生成dfXmlFun,並展示了其更新後的Schema。這個流程突顯了Spark在處理混合格式和巢狀複雜資料時的強大和靈活性。

縱觀現代資料處理的多元挑戰,從直接處理JSON陣列到解析嵌入式XML,我們看見了處理複雜性的層次遞進。前者依賴explode這類內建函數即可完成結構扁平化;後者則需借助spark-xml外部函式庫,以from_xml實現從非結構化字串到結構化資料的關鍵躍遷,這體現了從標準化操作到客製化解決方案的思維突破。

這種處理混合格式資料的能力,預示了未來資料工程師必須具備跨格式解析與動態結構推斷的系統思考框架。隨著資料源日益混雜,單一工具或方法的侷限性將愈發明顯。

玄貓認為,掌握這種從「規則」到「例外」的處理彈性,不僅是技術能力的展現,更是高階資料從業者在面對未知挑戰時,建立技術護城河與維持競爭優勢的核心策略。