返回文章列表

Apache Spark數據庫整合:SQL查詢與JDBC實戰

本文深入探討 Apache Spark 的數據處理能力,從 Spark SQL 的核心操作與臨時視圖創建開始,展示如何靈活查詢 DataFrame 與 Dataset。接著,文章將重點轉向資料庫整合,詳細介紹 Spark JDBC API 的原理與應用,說明如何透過標準化介面連接、讀取及寫入關聯式資料庫。最後,透過建立 MySQL 資料表的實例,為後續的資料庫操作提供具體實戰基礎。

數據工程 大數據技術

在現代數據工程實務中,單純處理記憶體內的數據集已不足以應對複雜的業務場景。本篇文章將從 Apache Spark 內建的 SQL 查詢能力出發,探討如何利用 DataFrame 與 Dataset API 進行高效的數據篩選與檢視。更重要的是,我們將焦點延伸至外部數據源的整合,特別是企業環境中無所不在的關聯式資料庫。文章將深入解析 Spark JDBC API 的設計理念與實作方式,展示如何建立 Spark 與 MySQL 等資料庫之間的橋樑。這項技能不僅讓數據工程師能無縫地在分散式運算框架與傳統資料庫之間讀寫數據,更是構建完整、端到端的數據管線不可或缺的關鍵環節,實現從數據提取、轉換到載入的完整流程。

數據工程高科技養成:從理論到實踐的玄貓指引

數據工程核心理論與實踐

第三章:Apache Spark與其核心API:DataFrame、Dataset與Spark SQL

Spark SQL
select函數
personDf.select($"personId").show(10)

上述命令使用personDf這個DataFrame,並在其上呼叫select函數。玄貓接著指定personId欄位,然後呼叫show函數,傳遞兩個參數。第一個參數是要返回的記錄數量。第二個參數是指定顯示時欄位中的數據是否會被截斷。預設值為1true,這將會截斷值。玄貓將第一個參數設置為10以顯示記錄數量,並將第二個參數設置為0(或false)以不截斷欄位值。結果將類似於:

此圖示:selectshow操作的結果

@startuml
!define DISABLE_LINK
!define PLANTUML_FORMAT svg
!theme _none_

skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100

rectangle "Spark 控制台輸出 (show)" as ConsoleOutputShow {
header "DataFrame.select($\"personId\").show(10, false)"
component "+--------+" as HeaderLine
component "|personId|" as Header
component "+--------+" as HeaderLine2
component "| 1|" as Row1
component "| 2|" as Row2
component "| 3|" as Row3
component "| 4|" as Row4
component "| 5|" as Row5
component "| 6|" as Row6
component "| 7|" as Row7
component "| 8|" as Row8
component "| 9|" as Row9
component "| 10|" as Row10
component "+--------+" as FooterLine
}

HeaderLine -down-> Header
Header -down-> HeaderLine2
HeaderLine2 -down-> Row1
Row1 -down-> Row2
Row2 -down-> Row3
Row3 -down-> Row4
Row4 -down-> Row5
Row5 -down-> Row6
Row6 -down-> Row7
Row7 -down-> Row8
Row8 -down-> Row9
Row9 -down-> Row10
Row10 -down-> FooterLine
@enduml

看圖說話:

此圖示展示了Spark中selectshow操作在控制台的輸出結果。當玄貓對personDf這個DataFrame執行select($"personId").show(10, false)時,Spark會從DataFrame中選取personId這個欄位,並顯示前10條記錄。show(10, false)中的false參數確保了欄位值不會被截斷。輸出結果以表格形式呈現,包含一個標題行|personId|,隨後是10行數據,每行顯示一個personId的值。這種輸出方式直觀地展示了數據的子集,對於快速驗證數據內容和格式非常有用。

同樣的方法也適用於Dataset。例如,personDs.select($"personId").show(10,0)會返回相同的結果。玄貓已經展示了如何透過DataFrameDataset API來存取Spark SQL,但讀者也可以將SQL作為字串傳遞給API的SQL函數。為此,玄貓需要為數據創建臨時視圖 (temporary views)

創建臨時視圖

另一種在Spark物件上運行SQL的方法是從讀者的DatasetDataFrame物件創建一個臨時視圖。以下是操作方法:

personDs.createOrReplaceTempView("personDsView")
spark.sql("select * from personDsView").show(10,0)

在上述範例中,玄貓使用createOrReplaceTempView函數定義了一個名為personDsView的臨時視圖。在下一行中,玄貓使用SparkSession物件的sql函數將SQL查詢傳遞給Dataset。這是結果:

此圖示:select語句返回的數據

@startuml
!define DISABLE_LINK
!define PLANTUML_FORMAT svg
!theme _none_

skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100

rectangle "Spark 控制台輸出 (SQL 查詢)" as ConsoleOutputSQL {
header "spark.sql(\"select * from personDsView\").show(10, false)"
component "+--------+---------+--------+" as HeaderLineSQL
component "|personId|firstName|lastName|" as HeaderSQL
component "+--------+---------+--------+" as HeaderLineSQL2
component "| 1| John | Doe |" as Row1SQL
component "| 2| Jane | Smith|" as Row2SQL
component "| 3| Peter | Jones |" as Row3SQL
component "| 4| Mary | Brown |" as Row4SQL
component "| 5| Mike | Davis |" as Row5SQL
component "| 6| Lisa | White |" as Row6SQL
component "| 7| Chris | Green |" as Row7SQL
component "| 8| Sarah | Black |" as Row8SQL
component "| 9| Paul | King |" as Row9SQL
component "| 10| Nancy | Clark |" as Row10SQL
component "+--------+---------+--------+" as FooterLineSQL
}

HeaderLineSQL -down-> HeaderSQL
HeaderSQL -down-> HeaderLineSQL2
HeaderLineSQL2 -down-> Row1SQL
Row1SQL -down-> Row2SQL
Row2SQL -down-> Row3SQL
Row3SQL -down-> Row4SQL
Row4SQL -down-> Row5SQL
Row5SQL -down-> Row6SQL
Row6SQL -down-> Row7SQL
Row7SQL -down-> Row8SQL
Row8SQL -down-> Row9SQL
Row9SQL -down-> Row10SQL
Row10SQL -down-> FooterLineSQL
@enduml

看圖說話:

此圖示展示了透過Spark SQL查詢臨時視圖personDsView後的控制台輸出。當玄貓將personDs這個Dataset註冊為一個名為personDsView的臨時視圖後,就可以像查詢傳統資料庫表一樣,使用標準SQL語法來查詢它。圖中顯示的select * from personDsView查詢,返回了personIdfirstNamelastName這三個欄位的所有數據(前10條記錄)。這種方法極大地簡化了數據分析師和開發者在Spark中處理數據的方式,讓他們能夠利用熟悉的SQL語言進行數據探索和轉換,而無需深入了解底層的Spark API細節。

上述兩種方法都使用Scala物件作為可查詢的SQL數據源!

第四章:資料庫操作與Spark JDBC API

在本章中,玄貓將探討如何與關聯式資料庫 (relational databases) 互動。資料庫仍然是數據管線讀取數據和寫入數據最常見的來源之一,因此理解如何高效地與它們協作至關重要。玄貓將從理解Spark JDBC API開始,並探討以下主題:

  • 深入理解Spark JDBC API
  • 使用Spark JDBC API
  • 載入資料庫配置
  • 建立資料庫介面
  • 執行各種資料庫操作

技術要求

深入理解Spark JDBC API

JDBC (Java Database Connectivity) 是一個應用程式程式設計介面 (API) 的規範,它允許Java應用程式存取資料庫。JDBC驅動程式 (JDBC driver) 是針對特定資料庫的規範的實際實現。

為了與資料庫協作,玄貓需要理解Spark提供的JDBC API。首先,讓玄貓看看Spark提供的介面:

val jdbcDF = spark.read
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.load()

數據工程核心理論與實踐

第四章:資料庫操作與Spark JDBC API

深入理解Spark JDBC API
val jdbcDF = spark.read
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.load()

上述程式碼展示了使用Spark JDBC API進行資料庫讀取操作的典型範例。format("jdbc")指定了數據源類型為JDBC。接著,透過option方法配置連接參數:url定義了資料庫的連接字串,dbtable指定了要讀取的資料表,userpassword則提供了資料庫的驗證憑證。最後,load()方法執行讀取操作,將數據載入到一個DataFrame中。

以下是資料庫寫入操作的類似範例:

jdbcDF.write
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.save()

這個範例展示了如何將DataFrame中的數據寫入資料庫。與讀取操作類似,format("jdbc")指定了JDBC數據源。option方法同樣用於配置資料庫連接和目標資料表。最後,save()方法執行寫入操作,將DataFrame的內容持久化到指定的資料庫表中。

format方法可以接受多種值,例如.json.csv.parquet等。option方法用於指定額外選項。上述兩個範例涵蓋了從資料庫讀取或寫入時必須指定的最低限度選項。對於全面的選項集,讀者可以參考Spark官方文檔中關於JDBC數據源的部分。

值得注意的是,使用者憑證絕不應以明文形式提供。它們應該透過更安全的機制進行管理,例如密碼管理系統或環境變數。在上一節中,玄貓介紹了Spark提供的JDBC API,接下來玄貓將透過實際範例來展示如何使用JDBC API與資料庫互動。

使用Spark JDBC API

在上一節中,玄貓介紹了Spark JDBC API及其一些最常用的選項。為了更好地理解它們,玄貓需要看到它們的實際應用,這將是本節的重點。現在是時候開始使用玄貓在第二章中安裝的MySQL社群伺服器了。

請按照以下步驟創建本章範例所需的一些資料表:

  1. 使用以下命令登入MySQL服務:
mysql --local-infile=1 -u root -p

輸入密碼後,將會看到類似以下的歡迎訊息:

Enter password:
Welcome to the MySQL monitor. Commands end with ; or \g.

Your MySQL connection id is 11
Server version: 8.0.32 MySQL Community Server - GPL

Oracle is a registered trademark of Oracle Corporation and/or
its
affiliates. Other names may be trademarks of their respective
owners.

Type 'help;' or '\h' for help. Type '\c' to clear the current
input statement.
mysql> use my_db
Reading table information for completion of table and column
names
You can turn off this feature to get a quicker startup with -A
Database changed

上述步驟展示了如何使用mysql命令列客戶端登入MySQL伺服器,並選擇要操作的資料庫my_db--local-infile=1選項允許從本地檔案載入數據,這在後續步驟中會用到。

  1. 如下所示創建airlinesairportsflights資料表:

這是針對airlines表的創建語句:

mysql> create table airlines(
-> iata_code varchar(50),
-> airline varchar(500)
-> );
Query OK, 0 rows affected (0.02 sec)

此語句創建了一個名為airlines的資料表,包含兩個欄位:iata_code(IATA航空公司代碼,變長字串,最大長度50)和airline(航空公司名稱,變長字串,最大長度500)。

這是針對airports表的創建語句:

mysql> create table airports(
-> iata_code varchar(50),
-> airport varchar(500),
-> city varchar(100),
-> state varchar(100),
-> country varchar(100),
-> latitude double,
-> longitude double
-> );
Query OK, 0 rows affected (0.01 sec)

此語句創建了一個名為airports的資料表,用於儲存機場資訊。它包含iata_code(IATA機場代碼)、airport(機場名稱)、city(城市)、state(州/省)、country(國家),以及latitude(緯度)和longitude(經度)這兩個雙精度浮點數欄位。

這是針對flights表的創建語句:

mysql> create table flights(
-> year smallint,
-> month smallint,
-> day smallint,
-> day_of_week smallint,
-> airline varchar(50),
-> flight_number smallint,
-> tail_number varchar(50),
-> origin_airport varchar(50),
-> destination_airport varchar(50),
-> scheduled_departure varchar(15),
-> departure_time varchar(15),
-> departure_delay smallint,
-> taxi_out smallint,
-> wheels_off varchar(15),
-> scheduled_time smallint,
-> elapsed_time smallint,
-> air_time smallint,
-> distance smallint,
-> wheels_on varchar(15),
-> taxi_in smallint,
-> scheduled_arrival varchar(15),
-> arrival_time varchar(15),
-> arrival_delay smallint,
-> diverted smallint,
-> cancelled smallint,
-> cancellation_reason varchar(500),
-> air_system_delay smallint,
-> security_delay smallint,
-> airline_delay smallint,
-> late_aircraft_delay smallint,
-> weather_delay smallint
-> );
Query OK, 0 rows affected (0.01 sec)

此語句創建了一個名為flights的資料表,用於儲存航班詳細資訊。它包含了從日期、時間、航空公司、航班號到各種延誤原因等豐富的欄位,數據類型涵蓋了小型整數、變長字串和雙精度浮點數,以詳細記錄航班的各個方面。

  1. 將數據檔案(例如airlines.csvairports.csvflights.csv)下載到讀者的主目錄。然後將檔案載入到資料表中:
mysql> LOAD DATA LOCAL INFILE 'airlines.csv' INTO TABLE airlines
-> FIELDS TERMINATED BY ','
-> ENCLOSED BY '"'
-> LINES TERMINATED BY '\n'
-> IGNORE 1 LINES;
Query OK, 14 rows affected (0.02 sec)

Records: 14 Deleted: 0 Skipped: 0 Warnings: 0

這個LOAD DATA LOCAL INFILE語句用於從本地檔案airlines.csv中將數據高效地載入到airlines資料表中。FIELDS TERMINATED BY ','指定欄位之間以逗號分隔,ENCLOSED BY '"'表示欄位值可能被雙引號包圍,LINES TERMINATED BY '\n'定義行結束符為換行符,而IGNORE 1 LINES則指示資料庫跳過檔案的第一行,通常是標題行。這是一個在MySQL中快速批量導入數據的常用方法。

從個人價值觀對職涯選擇的影響考量,數據工程師的養成路徑,其核心在於將抽象理論轉化為具體的系統整合能力。本章節從 Spark 內部查詢的彈性,延伸至與外部關聯式資料庫的串接,完整勾勒出此一關鍵能力的養成藍圖。

此發展路徑的價值,首先體現在新舊系統的整合能力上。精通 Spark JDBC API 不僅是技術操作,更是將現代分散式運算框架無縫嵌入企業既有數據資產的策略性技能。其次,從 DataFrame API 到臨時視圖的靈活切換,反映了工程師在面對不同協作對象(如數據分析師)時的溝通效率與適應性。然而,真正的成長瓶頸往往不在語法本身,而在於從「能動」邁向「穩固」的過程,例如文中所提的憑證管理,這正是區分資淺與資深工程師的關鍵思維落差。

展望未來,數據生態系統的複雜性只增不減。能夠同時駕馭 Spark 的高效能運算與傳統資料庫的穩定性、並在兩者之間建立高效數據橋樑的工程師,將擁有不可取代的職涯議價能力,從單一工具的操作者蛻變為數據流動的架構師。

玄貓認為,此修養路徑已展現足夠效益,適合關注長期成長的管理者採用。對於高階經理人而言,培養團隊不僅要掌握這些 API 的應用,更要建立起在不同數據存取策略間進行權衡的架構思維,才能真正釋放數據工程的完整商業潛力。