返回文章列表

Scala數據工程:整合Doobie實現函數式資料庫操作

本文探討在 Scala 數據工程實踐中,如何整合純函數式 JDBC 函式庫 Doobie 以強化資料庫操作。文章首先介紹透過工廠方法集中管理 SparkSession 的創建,接著深入解析 Doobie 的核心概念,如 ConnectionIO 與 Transactor。內容將實際演示如何配置專案依賴、建立資料庫連接,並執行 SQL 查詢,同時將結果安全地映射至 Scala Case Class。此方法不僅提升了程式碼的健壯性與可讀性,也為複雜的資料庫事務處理提供了更優雅的解決方案。

數據工程 軟體開發

在現代數據工程架構中,雖然 Apache Spark 提供了強大的 JDBC 介面進行大規模資料讀寫,但在處理更細緻的資料庫操作時,例如資料定義語言(DDL)執行、臨時表管理或複雜的交易控制,其原生 API 往往顯得功能有限。為了應對這些挑戰,開發者常尋求更專業的函式庫來彌補此一差距。本文將引入 Doobie,一個專為 Scala 設計的純函數式 JDBC 層。它透過提供型別安全、組合性強的抽象(如 ConnectionIO),讓工程師能以宣告式與更穩健的方式編寫資料庫互動邏輯,從而提升數據管道的可靠性與可維護性,這對於需要精確控制資料庫狀態的 ETL 流程至關重要。

數據工程核心理論與實踐

第四章:資料庫操作與Spark JDBC API

創建SparkSession的工廠方法

到目前為止,玄貓一直在為每個應用程式創建一個SparkSession物件。如果能提供一個簡單的介面,讓玄貓範例中的每個應用程式檔案都能創建一個Spark會話,那將會更好。例如,玄貓可以創建一個Spark物件,其中包含一個initSparkSession方法,如下所示:

package com.blackcat.dewithscala.utils

import org.apache.spark.sql.SparkSession

object Spark {
def initSparkSession(appName: String): SparkSession = SparkSession
.builder()
.appName(appName)
.master("local[*]")
.getOrCreate()
}

這個Spark物件提供了一個靜態方法initSparkSession,它接受一個appName字串作為參數,並返回一個配置好的SparkSession實例。這種設計將SparkSession的創建邏輯集中管理,使得所有應用程式都可以透過一個統一且簡潔的介面來獲取Spark會話,避免了重複的程式碼。

任何應用程式檔案現在都可以透過以下方式創建一個SparkSession物件:

val session = Spark.initSparkSession("app-name")

這個簡單的工廠方法提供了一種創建SparkSession物件的便捷方式,玄貓從現在開始將使用它。

到目前為止,玄貓已經探討了如何從資料表讀取記錄。然而,能夠刪除和創建資料表、執行授權、更新資料庫統計資訊等操作並不少見。例如,為了避免對下游應用程式(例如商業智慧(BI)儀表板)產生影響,讀者可能希望創建一組臨時的事實表和維度表,載入它們(在運行檢查以確保數據準確後),更新資料庫統計資訊以提高查詢運行時效能,刪除現有的事實表和維度表,然後重命名這些臨時表。

在下一節中,玄貓將探討如何使用doobie——一個開源的、純函數式JDBC函式庫——透過玄貓的資料庫介面提供這些功能。

執行各種資料庫操作

在本節中,玄貓將擴展玄貓的Database API,以提供額外的功能,例如創建資料表、刪除資料表、將查詢結果作為集合返回等。但在玄貓繼續之前,玄貓需要更新玄貓的依賴項:

object Version {
val spark = "3.3.1"
val deequ = "2.0.4-spark-3.3"
val pureconfig = "0.17.2"
val doobie = "1.0.0-RC1"
}

val mainDeps: Seq[ModuleID] = Seq(
"org.apache.spark" %% "spark-sql" % Version.spark,
"com.amazon.deequ" %% "deequ" % Version.deequ,
"com.github.pureconfig" %% "pureconfig" % Version.pureconfig,
"org.tpolecat" %% "doobie-core" % Version.doobie,
"org.tpolecat" %% "doobie-hikari" % Version.doobie, // 數據庫連接池
"org.tpolecat" %% "doobie-mysql" % Version.doobie, // MySQL驅動
"org.tpolecat" %% "doobie-postgres" % Version.doobie, // PostgreSQL驅動
"org.tpolecat" %% "doobie-h2" % Version.doobie // H2驅動,用於測試
)

這段程式碼定義了專案中使用的各種函式庫版本,並將它們組織成一個mainDeps序列,方便在build.sbt中統一管理。除了Spark、Deequ和PureConfig,這裡還引入了doobie函式庫及其相關模組,包括doobie-core(核心功能)、doobie-hikari(高效能連接池)、doobie-mysql(MySQL支援)、doobie-postgres(PostgreSQL支援)和doobie-h2(H2資料庫支援,常用於測試)。這些依賴項的引入為玄貓接下來使用doobie進行資料庫操作奠定了基礎。

doobie是一個用於Scala的函數式JDBC層,它基於函數式I/O、單子效應等設計模式。玄貓不會深入探討該函式庫的工作原理或它提供的所有功能。玄貓將只關注該函式庫中允許玄貓編寫具有所需功能的資料庫API的方面。

在本節中,玄貓更新了玄貓的依賴項,以便與doobie協作。在下一節中,玄貓將看到doobie的實際應用。

使用資料庫

為了與資料庫協作,玄貓首先需要建立連接。在doobie中,玄貓透過創建一個ConnectionIO[A]類型的物件來實現這一點,該物件指定了在連接中發生的計算。要使用ConnectionIO[A]類型的物件,玄貓需要一個連接來協作。在doobie中有多種方法可以實現,但最簡單的選項是透過Transactor物件。Transactor物件知道如何連接到資料庫、處理連接等。讀者可以閱讀更多關於連接的資訊。

在以下程式碼片段中,玄貓查詢MySQL資料庫以檢索文字和當前時間戳:

package com.blackcat.dewithscala.chapter4

import doobie._
import doobie.implicits._
import cats.effect.unsafe.implicits.global // 引入全局執行上下文

import com.blackcat.dewithscala.utils.Database
import cats.syntax.all._ // 引入cats的語法糖

object WorkingWithDoobie extends App {
private val db = Database("my_db") // 獲取資料庫配置

// 定義一個ConnectionIO,查詢字串"hello"
private val constant: ConnectionIO[String] =
sql"select 'hello'".query[String].unique

// 定義一個ConnectionIO,查詢隨機雙精度浮點數
private val random: ConnectionIO[Double] =
sql"select rand();".query[Double].unique

// 創建一個Transactor物件
private val transactor = Transactor.fromDriverManager[IO](
db.scheme, // 使用db.scheme作為驅動名稱,這通常是錯誤的,應為db.driver
db.jbdcURL,
db.username.value,
db.password.value
)
}

這段程式碼展示了doobie的基本用法,用於與資料庫進行互動。它首先透過Database("my_db")獲取資料庫配置。然後,定義了兩個ConnectionIO物件:constant用於查詢一個固定字串,random用於查詢一個隨機數。ConnectionIO是doobie中表示資料庫操作的抽象。最後,程式碼嘗試創建一個Transactor物件,它是doobie與實際資料庫建立連接的橋樑。Transactor.fromDriverManager方法用於從JDBC驅動管理器創建一個Transactor,它需要資料庫驅動名稱(這裡誤用了db.scheme,正確應是驅動類名,如com.mysql.cj.jdbc.Driver)、JDBC URL、使用者名稱和密碼。這個Transactor將用於執行之前定義的ConnectionIO操作。

數據工程核心理論與實踐

第四章:資料庫操作與Spark JDBC API

使用資料庫
db.password.value
)

private val run = for {
c <- constant
r <- random
} yield (println(s"($c,$r)"))

run.transact(transactor).unsafeRunSync()

這段程式碼展示了如何執行doobie的ConnectionIO操作。run是一個for推導式,它將constantrandom這兩個ConnectionIO操作組合起來,並在成功後列印出它們的結果。transact(transactor)方法將ConnectionIO轉換為一個IO(Effect Monad),表示一個可能產生副作用的計算。最後,unsafeRunSync()方法執行這個IO計算並阻塞當前執行緒直到結果可用。這是一個「不安全」的操作,因為它直接執行副作用並可能拋出異常,通常只在應用程式的「世界末日」(即程式的頂層)調用。

final case class Airports(
iata_code: String,
airport: String,
city: String,
state: String,
country: String,
latitude: Double,
longitude: Double
)

private val airports: List[Airports] =
sql"select * from my_db.airports"
.query[Airports] // Query0[Airports]
.to[List] // ConnectionIO[List[Airports]]
.transact(transactor) // IO[List[Airports]]
.unsafeRunSync() // List[Airports]

// 列印北卡羅來納州有機場的城市
airports
.collect { case a if a.state === "NC" => a.city }
.foreach(a => println(a))

private val airports2: List[Airports] =
db.records[Airports]("select * from my_db.airports") // 假設db物件有records方法

airports2
.collect { case a if a.state === "NC" => a.city }
.foreach(a => println(a))

private val airports3 =
db.records[(String, String, String, String, String, Double,
Double)](
"select * from my_db.airports"
)
println(a))
}

這段程式碼展示了doobie如何將資料庫查詢結果映射到Scala物件。首先,定義了一個Airportscase class,它與my_db.airports資料表的結構相匹配。然後,使用sql"..."字串內插器構建SQL查詢,query[Airports]將查詢結果映射到Airports物件。to[List]ConnectionIO[Airports]轉換為ConnectionIO[List[Airports]]。最後,透過transact(transactor).unsafeRunSync()執行查詢並獲取Airports物件的列表。隨後,程式碼過濾出北卡羅來納州(NC)的機場,並列印出它們的城市名稱。程式碼中還展示了假設的db.records方法,這暗示了可以進一步抽象資料庫操作,使其更易於使用,並且可以將結果映射到case class或元組。

sql內插器由doobie提供(在本例中,連接到MySQL資料庫)。事實上,cats.effect中有一整套用於副作用操作的unsafe函數。此類函數應在應用程式的末尾調用,有時被稱為「世界末日」。

讀者可能已經注意到,這裡沒有SparkContext參與。讀者可能還注意到,為了創建資料庫連接,玄貓需要提供驅動程式詳細資訊。在玄貓繼續之前,讓玄貓更新玄貓的配置,以便驅動程式資訊作為Database物件的一部分可用。為此,玄貓需要執行以下步驟:

  1. 玄貓需要更新配置物件中的Databasecase class,如下所示:
case class Database(
driver: String,
name: String,
scheme: String,
host: String,
port: String,
username: Opaque,
password: Opaque
)

這裡在Databasecase class中添加了一個driver: String欄位,用於儲存JDBC驅動程式的類名。這使得資料庫配置更加完整,將所有連接相關的資訊都集中管理。

  1. 然後,玄貓更新Database特徵:
sealed trait Database {
def driver: String
def scheme: String
def host: String
def port: String
def name: String
def jbdcURL: String
def username: Opaque
def password: Opaque
}

Database特徵中添加了def driver: String,確保所有實現Database特徵的類別都必須提供驅動程式資訊。sealed關鍵字限制了實現該特徵的類別只能在同一個檔案中定義,這有助於編譯器進行模式匹配檢查。

  1. 然後,玄貓在私有類別中提供driver方法的具體實現:
object Database {
def apply(name: String): Database = new DatabaseImplementation(name)

private class DatabaseImplementation(dbname: String) extends Database {
private val db = getDB(dbname).get // 這裡假設getDB一定會找到,實際應用中應處理Option
def driver = db.driver
def scheme = db.scheme
def host = db.host
def port = db.port
def name = db.name
def jbdcURL = s"$scheme://$host:$port/$name"
def username = db.username
def password = db.password
}
}

DatabaseImplementation中,driver方法現在直接從底層的db物件(即DatabaseConfig實例)中獲取driver屬性。這確保了Database特介徵中定義的driver方法得到了正確的實現。

  1. 配置檔案也需要更新,以包含驅動程式資訊:
db = [
{
driver = "com.mysql.cj.jdbc.Driver"
name = "my_db"
scheme = "jdbc:mysql"
host = "localhost"
port = "3306"
username = ${?MYSQL_USER}
password = ${?MYSQL_PASS}
}
]

db配置中為my_db資料庫添加了driver = "com.mysql.cj.jdbc.Driver"這一行。這使得驅動程式的類名可以在配置中指定,並在運行時被PureConfig載入,從而實現了完全外部化的資料庫連接配置。

透過這些更改,玄貓可以按以下方式定義Transactor物件:

val transactor = Transactor.fromDriverManager[IO](
db.driver,
db.jbdcURL,
db.username.value,
db.password.value
)

現在,Transactor.fromDriverManager可以正確地使用db.driver來獲取JDBC驅動程式的類名,從而建立正確的資料庫連接。

在本章前面,玄貓創建了一個my_db.airports資料表並向其中載入了一些記錄。這是該資料表的Schema:

mysql> describe my_db.airports;

+-----------+--------------+------+------+--------+-------+
| Field | Type | Null | Key | Default| Extra |
+-----------+--------------+------+------+--------+-------+
| iata_code | varchar(50) | YES | | NULL | |
| airport | varchar(500) | YES | | NULL | |
| city | varchar(100) | YES | | NULL | |
| state | varchar(100) | YES | | NULL | |
| country | varchar(100) | YES | | NULL | |
| latitude | double | YES | | NULL | |

結論:從工具使用者到架構定義者的思維躍升

【採用視角:創新與突破視角】

深入剖析本章從 Spark JDBC 到 doobie 的技術演進,可以發現這不僅是函式庫的替換,更是一次從「數據操作者」邁向「數據架構師」的思維典範轉移。初始的 SparkSession 工廠方法,體現了工程師對程式碼標準化與效率的追求;然而,doobie 的引入,則標誌著對數據處理完整生命週期的掌控渴望。

傳統 Spark JDBC API 擅長大規模數據的讀寫,但在精細的資料庫綱要(Schema)管理、事務控制與狀態變更上有所局限。doobie 則透過函數式編程的抽象(如 ConnectionIOTransactor),將資料庫操作從指令式的副作用轉化為可組合、具備型別安全的純粹描述。這項轉變的價值在於,它將工程師從被動的數據管道執行者,提升為主動的、能夠精準定義與管理數據基礎設施狀態的架構定義者。其挑戰也正在於此——開發者需突破傳統命令式思維,擁抱函數式I/O的管理哲學。

展望未來,大數據框架(如 Spark)與純函數式組件(如 doobie)的深度融合,將是構建高可靠性、高可維護性數據平台的關鍵趨勢。這種融合模式能有效隔離複雜的副作用,大幅提升系統的健壯性與可預測性。

玄貓認為,對於追求技術深度與架構穩健性的高階工程師而言,掌握這種融合模式已是關鍵。學習重點不應僅止於 doobie 的 API 操作,更在於理解其背後函數式編程對副作用的管理哲學,這將是構築下一代可靠數據系統的核心競爭力。