在現代數據工程架構中,將傳統關聯式資料庫的數據源整合至大數據處理流程是關鍵一環。Apache Spark 透過其強大的 JDBC 資料源 API,提供了一個標準化且高效的介面,使工程師能夠無縫地讀取、查詢並轉換儲存於 MySQL、PostgreSQL 等各式資料庫中的數據。本章節接續前文,將深入探討利用 Spark JDBC API 的具體實踐。我們將從基本的資料表讀取開始,逐步擴展至如何利用 SQL 子查詢進行更精細的數據篩選,並展示如何客製化 DataFrame 的輸出結果以利於分析與偵錯。此過程不僅是技術操作的演練,更是理解如何將分散的數據孤島,納入統一數據處理平台的核心理論體現。
數據工程核心理論與實踐
第四章:資料庫操作與Spark JDBC API
使用Spark JDBC API
mysql> LOAD DATA LOCAL INFILE 'airports.csv' INTO TABLE airports
-> FIELDS TERMINATED BY ','
-> ENCLOSED BY '"'
-> LINES TERMINATED BY '\n'
-> IGNORE 1 LINES;
Query OK, 322 rows affected, 6 warnings (0.00 sec)
Records: 322 Deleted: 0 Skipped: 0 Warnings: 6
此命令將airports.csv檔案的數據載入到airports表中。與airlines.csv類似,它也指定了欄位和行的分隔符,並忽略了第一行。這裡出現了6個警告,這可能是由於數據格式與表定義不完全匹配造成的。
mysql> LOAD DATA LOCAL INFILE 'flights.csv' INTO TABLE flights
-> FIELDS TERMINATED BY ','
-> LINES TERMINATED BY '\n'
-> IGNORE 1 LINES;
Query OK, 5819079 rows affected, 65535 warnings (42.91 sec)
Records: 5819079 Deleted: 0 Skipped: 0 Warnings:
24361132
這個命令將flights.csv檔案的數據載入到flights表中。由於flights.csv是一個非常大的檔案(近600萬條記錄),載入時間較長,並且產生了大量的警告。這些警告通常是由於數據中存在空值、格式不正確或與表定義不符的數據類型轉換問題。
上述警告是由於數據中的某些不一致性造成的。讀者可以使用show warnings limit 10;命令來查看前10條警告的詳細資訊。
mysql> show warnings limit 10;
對於玄貓的目的,讀者可以忽略這些警告。讀者可能需要檢查記錄計數,以確保所有三個資料表都已載入。以下是讀者可以這樣做的方法:
mysql> select count(1) from airlines; select count(1) from
airports; select count(1) from flights;
+----------+
| count(1) |
+----------+
| 14 |
+----------+
1 row in set (0.00 sec)
+----------+
| count(1) |
+----------+
| 322 |
+----------+
1 row in set (0.00 sec)
+----------+
| count(1) |
+----------+
| 5819079 |
+----------+
1 row in set (0.64 sec)
這些查詢驗證了每個資料表中的記錄數量,確認airlines有14條記錄,airports有322條記錄,flights有5,819,079條記錄。
Spark本身不包含MySQL JDBC驅動程式。在玄貓建立連接之前,讀者需要從MySQL官方網站下載connector/j/並將.jar檔案添加到類路徑中。玄貓的GitHub軟體庫中已經包含了該驅動程式,一旦專案在讀者的機器上編譯,它應該會自動包含在類路徑中。
以下是一個簡單的應用程式,用於從airports資料表讀取記錄:
package com.blackcat.dewithscala.chapter4
import org.apache.spark.sql.SparkSession
object ReadTable extends App {
private[chapter4] val session = SparkSession
.builder()
.appName("de-with-scala")
.master("local[*]")
.getOrCreate()
private[chapter4] val airportsDF = session.read
.format("jdbc")
.option("url", "jdbc:mysql://localhost:3306/my_db")
.option("user", "root")
.option("password", "****") // 請替換為實際密碼
.option("dbtable", "airports")
.load()
airportsDF.show()
}
上述程式碼片段展示了一個完整的Spark應用程式,用於從MySQL資料庫讀取airports資料表。它首先創建一個SparkSession,然後使用session.read.format("jdbc")來指定JDBC數據源。option方法用於配置資料庫連接參數,包括url、user、password和dbtable。最後,load()方法執行數據讀取,並將結果存儲在airportsDF這個DataFrame中,airportsDF.show()則將DataFrame的內容列印到控制台。
以下螢幕截圖顯示了列印到控制台的輸出:
此圖示:載入airports資料表並列印到控制台
@startuml
!define DISABLE_LINK
!define PLANTUML_FORMAT svg
!theme _none_
skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100
rectangle "控制台輸出 (airportsDF.show())" as ConsoleOutputAirports {
header "airportsDF.show()"
component "+---------+--------------------+---------+-----+-------+--------+---------+" as HeaderLineAirports
component "|iata_code|airport |city |state|country|latitude|longitude|" as HeaderAirports
component "+---------+--------------------+---------+-----+-------+--------+---------+" as HeaderLineAirports2
component "| ABE |Lehigh Valley Intl |Allentown|PA |USA |40.65236|-75.44040|" as Row1Airports
component "| ABI |Abilene Rgnl |Abilene |TX |USA |32.41132|-99.68190|" as Row2Airports
component "| ABQ |Albuquerque Intl Sun|Albuquerque|NM |USA |35.04022|-106.60919|" as Row3Airports
component "| ABR |Aberdeen Rgnl |Aberdeen |SD |USA |45.44906|-98.42183|" as Row4Airports
component "| ABY |Southwest Georgia Rgnl|Albany |GA |USA |31.53535|-84.19447|" as Row5Airports
component "| ACK |Nantucket Mem |Nantucket|MA |USA |41.25228|-70.06018|" as Row6Airports
component "| ACT |Waco Rgnl |Waco |TX |USA |31.61129|-97.23052|" as Row7Airports
component "| ACV |Arcata |Arcata |CA |USA |40.97811|-124.10862|" as Row8Airports
component "| ACY |Atlantic City Intl |Atlantic City|NJ |USA |39.45758|-74.57717|" as Row9Airports
component "| ADK |Adak |Adak |AK |USA |51.87806|-176.64603|" as Row10Airports
component "+---------+--------------------+---------+-----+-------+--------+---------+" as FooterLineAirports
component "only showing top 20 rows" as TruncateMessage
}
HeaderLineAirports -down-> HeaderAirports
HeaderAirports -down-> HeaderLineAirports2
HeaderLineAirports2 -down-> Row1Airports
Row1Airports -down-> Row2Airports
Row2Airports -down-> Row3Airports
Row3Airports -down-> Row4Airports
Row4Airports -down-> Row5Airports
Row5Airports -down-> Row6Airports
Row6Airports -down-> Row7Airports
Row7Airports -down-> Row8Airports
Row8Airports -down-> Row9Airports
Row9Airports -down-> Row10Airports
Row10Airports -down-> FooterLineAirports
FooterLineAirports -down-> TruncateMessage
@enduml
看圖說話:
此圖示展示了Spark應用程式從MySQL資料庫讀取airports資料表後,在控制台使用airportsDF.show()方法列印出的結果。預設情況下,show()方法會顯示DataFrame的前20條記錄,並且如果欄位值過長,會自動進行截斷。圖中清晰地呈現了airports資料表的結構,包括iata_code、airport、city、state、country、latitude和longitude等欄位,以及前10條機場數據的具體內容。底部的「only showing top 20 rows」訊息提醒使用者,這只是數據的一個子集,並且欄位內容可能因預設截斷而顯示不完整。
預設情況下,show方法輸出20條記錄並截斷欄位。讀者可以透過以下方式更改該行為:
airportsDF.show(5,false)
上述程式碼將顯示airportsDF的前5條記錄,並且false參數確保了欄位值不會被截斷,提供完整的數據顯示。
以下螢幕截圖顯示了airports資料表中的前五條未截斷的記錄:
此圖示:未截斷的airports數據:將五條記錄列印到控制台,不截斷欄位
@startuml
!define DISABLE_LINK
!define PLANTUML_FORMAT svg
!theme _none_
skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100
rectangle "控制台輸出 (airportsDF.show(5, false))" as ConsoleOutputNoTruncate {
header "airportsDF.show(5, false)"
component "+---------+------------------------------------+-----------------+-----+-------+----------+-----------+" as HeaderLineNoTruncate
component "|iata_code|airport |city |state|country|latitude |longitude |" as HeaderNoTruncate
component "+---------+------------------------------------+-----------------+-----+-------+----------+-----------+" as HeaderLineNoTruncate2
component "| ABE |Lehigh Valley International Airport |Allentown |PA |USA |40.652362 |-75.440402 |" as Row1NoTruncate
component "| ABI |Abilene Regional Airport |Abilene |TX |USA |32.411319 |-99.681903 |" as Row2NoTruncate
component "| ABQ |Albuquerque International Sunport |Albuquerque |NM |USA |35.040222 |-106.609194|" as Row3NoTruncate
component "| ABR |Aberdeen Regional Airport |Aberdeen |SD |USA |45.449061 |-98.421831 |" as Row4NoTruncate
component "| ABY |Southwest Georgia Regional Airport |Albany |GA |USA |31.535354 |-84.194473 |" as Row5NoTruncate
component "+---------+------------------------------------+-----------------+-----+-------+----------+-----------+" as FooterLineNoTruncate
}
HeaderLineNoTruncate -down-> HeaderNoTruncate
HeaderNoTruncate -down-> HeaderLineNoTruncate2
HeaderLineNoTruncate2 -down-> Row1NoTruncate
Row1NoTruncate -down-> Row2NoTruncate
Row2NoTruncate -down-> Row3NoTruncate
Row3NoTruncate -down-> Row4NoTruncate
Row4NoTruncate -down-> Row5NoTruncate
Row5NoTruncate -down-> FooterLineNoTruncate
@enduml
看圖說話:
此圖示清晰地展示了在Spark控制台中使用airportsDF.show(5, false)命令後,airports資料表的前五條記錄的輸出結果。與預設的show()行為不同,這裡的false參數確保了所有欄位的值都完整顯示,沒有任何截斷。這對於檢查數據的完整性和準確性至關重要,特別是當欄位包含較長的字串時。圖中列出了iata_code、airport、city、state、country、latitude和longitude等欄位的完整數據,讓使用者能夠一目瞭然地看到原始數據的詳細資訊。
dbtable選項也接受SQL查詢。例如,如果讀者想列出北卡羅來納州(NC)的所有機場,可以使用以下程式碼獲取列表:
val ncAirportsDF = session.read
.format("jdbc")
.option("url", "jdbc:mysql://localhost:3306/my_db")
.option("user", "root")
.option("password", "****") // 請替換為實際密碼
.option(
"dbtable",
"(select airport, city from airports where state = 'NC') qry"
)
.load()
ncAirportsDF.show(numRows = 100, truncate = 200, vertical = true)
上述程式碼展示了dbtable選項如何接受一個子查詢。透過將select airport, city from airports where state = 'NC'這個SQL查詢包裹在括號中並給予一個別名qry,Spark會將這個子查詢的結果視為一個臨時表來讀取。show方法的參數numRows = 100顯示最多100條記錄,truncate = 200確保欄位值在200個字元內不會被截斷,而vertical = true則會將DataFrame以垂直堆疊的方式顯示,這對於欄位較多或數據較寬的DataFrame來說,可以提高可讀性。
以下螢幕截圖顯示了以垂直堆疊表形式列印DataFrame的輸出:
此圖示:堆疊輸出:將DataFrame列印為堆疊表而不是平面表
@startuml
!define DISABLE_LINK
!define PLANTUML_FORMAT svg
!theme _none_
skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100
rectangle "控制台輸出 (垂直堆疊)" as ConsoleOutputVertical {
header "ncAirportsDF.show(numRows = 100, truncate = 200, vertical = true)"
component "-RECORD 0----------------------------------------------------------------------------------------------------" as Record0Separator
component "| airport | Charlotte Douglas International Airport |" as Record0Airport
component "| city | Charlotte |" as Record0City
component "-RECORD 1----------------------------------------------------------------------------------------------------" as Record1Separator
component "| airport | Raleigh-Durham International Airport |" as Record1Airport
component "| city | Raleigh/Durham |" as Record1City
component "-RECORD 2----------------------------------------------------------------------------------------------------" as Record2Separator
component "| airport | Piedmont Triad International Airport |" as Record2Airport
component "| city | Greensboro |" as Record2City
component "-RECORD 3----------------------------------------------------------------------------------------------------" as Record3Separator
component "| airport | Asheville Regional Airport |" as Record3Airport
component "| city | Asheville |" as Record3City
component "-RECORD 4----------------------------------------------------------------------------------------------------" as Record4Separator
component "| airport | Wilmington International Airport |" as Record4Airport
component "| city | Wilmington |" as Record4City
component "..." as Ellipsis
}
Record0Separator -down-> Record0Airport
Record0Airport -down-> Record0City
Record0City -down-> Record1Separator
Record1Separator -down-> Record1Airport
Record1Airport -down-> Record1City
Record1City -down-> Record2Separator
Record2Separator -down-> Record2Airport
Record2Airport -down-> Record2City
Record2City -down-> Record3Separator
Record3Separator -down-> Record3Airport
Record3Airport -down-> Record3City
Record3City -down-> Record4Separator
Record4Separator -down-> Record4Airport
Record4Airport -down-> Record4City
Record4City -down-> Ellipsis
@enduml
看圖說話:
此圖示展示了當show方法使用vertical = true參數時,DataFrame在控制台的堆疊輸出格式。這種顯示方式將每一條記錄的欄位名和對應值垂直排列,而不是傳統的水平表格形式。對於包含大量欄位或欄位值較長的DataFrame,垂直堆疊輸出可以顯著提高可讀性,避免了因截斷而丟失資訊。圖中清晰地呈現了北卡羅來納州(NC)機場的airport和city資訊,每條記錄都以-RECORD X-------------------分隔,使得每條數據的細節一目了然。
在本節中,玄貓探討了如何使用JDBC API從資料庫讀取資料表。然而,讀者可能已經注意到,玄貓是以明文形式傳遞資料庫憑證的,這在任何情況下都必須避免。
檢視此數據串接方法的實踐效果,我們不僅看到Spark與傳統資料庫整合的強大潛力,更觸及了數據工程師在追求高效能時必須面對的核心權衡。從直接讀取全表到彈性運用SQL子查詢,Spark JDBC API展現了高度的實務適應性與操作靈活性。然而,數據載入時的警告與最終暴露的明文憑證問題,也精準地揭示了從「功能實現」到「生產級應用」的關鍵鴻溝。前者考驗的是對數據不完美性的容錯與調適能力,後者則直接挑戰了系統的安全性與維運紀律,這是任何數據專案不可輕忽的成熟度指標。
可以預見,未來的數據平台發展將更強調安全無縫的整合。數據存取層的抽象化與憑證管理的自動化(如整合Vault或雲端金鑰服務)將成為評估數據工程師專業深度的重要標準,而不僅止於功能實現。
因此,玄貓認為,熟練掌握JDBC串接是基礎,但真正的專業價值體現在能預見並系統性地解決其衍生的安全與維運挑戰。這不僅是技術選擇,更是工程倫理與長期思維的體現。