返回文章列表

Unity Catalog 資料儲存與存取控制

本文介紹如何在 Unity Catalog 中管理資料儲存位置,並利用目錄、結構描述、資料表等物件實作精細的存取控制。同時示範如何使用 Python 程式碼生成不同格式的範例資料,以及如何建立 DLT 管道處理資料並合併成 streaming tables。此外,文章也說明瞭如何使用 Data Lineage

資料工程 雲端服務

Unity Catalog 提供集中式的資料治理和存取控制,讓開發者能更有效地管理資料湖倉中的資料。本文除了說明如何使用 Unity Catalog 的核心元件(如目錄、結構描述、資料表)儲存資料外,也示範如何使用 Python 和 Faker 函式庫產生不同格式(txt、csv、pdf)的測試資料,並將這些資料儲存至指定的磁碟區路徑。程式碼中也包含了模擬真實環境資料生成間隔的暫停機制。接著,文章介紹如何使用 DLT 建立 streaming tables,將不同來源的資料合併,方便後續分析。最後,文章詳細說明瞭如何使用 Data Lineage REST API 擷取表格和欄位的血緣資訊,並輔以 Databricks UI 操作示範,讓使用者能清楚追蹤資料的來源、轉換過程以及上下游的依賴關係,確保資料的可靠性和可追溯性。

在 Unity Catalog 中管理資料位置

統一管理資料儲存與存取控制

本章節將介紹如何在 Unity Catalog 中使用不同的安全物件來儲存資料並維護精細的存取控制。我們將探討如何使用目錄、結構描述、資料表、檢視表、磁碟區和外部位置來儲存資料。

建立範例筆記本

首先,我們建立一個新的筆記本,並新增以下程式碼片段:

from faker import Faker
import time
import random
Faker.seed(631)
fake = Faker()

# 隨機生成檔案
num_docs = 5
num_sentences_per_doc = 100
doc_types = ["txt", "pdf", "csv"]
volume_path = f"/Volumes/{catalog_name}/{schema_name}/{volume_name}"

for _ in range(num_docs):
    paragraph = fake.paragraph(nb_sentences=num_sentences_per_doc)
    # 隨機選擇檔案格式
    doc_type = doc_types[random.randrange(2)]
    print(doc_type)
    if doc_type == "txt":
        doc_name = f"{fake.pystr()}.txt"
        save_doc_as_text(doc_name, volume_path, paragraph)
    elif doc_type == "pdf":
        doc_name = f"{fake.pystr()}.pdf"
        save_doc_as_pdf(doc_name, volume_path, paragraph)
    elif doc_type == "csv":
        doc_name = f"{fake.pystr()}.csv"
        save_doc_as_csv(doc_name, volume_path, paragraph)

    # 暫停一段隨機時間
    sleep_time = random.randint(3, 30)
    print(f"Sleeping for {sleep_time} seconds...\n\n")
    time.sleep(sleep_time)

程式碼解密:

  1. Faker 函式庫的使用:用於生成假資料,包括文欄位落。
  2. 隨機檔案生成:根據設定的數量和句子數量生成隨機檔案。
  3. 檔案格式選擇:隨機選擇 txtpdfcsv 格式儲存檔案。
  4. 儲存檔案:根據選擇的檔案格式呼叫相應的儲存函式。
  5. 暫停機制:模擬真實環境中檔案生成的間歇性。

儲存檔案為不同格式的輔助函式

儲存為 PDF 檔案

def save_doc_as_pdf(file_name, save_path, paragraph):
    """將段落文字儲存為 PDF 檔案的輔助函式"""
    from reportlab.pdfgen.canvas import Canvas
    from reportlab.lib.pagesizes import letter
    from reportlab.lib.units import cm
    tmp_path = f"/local_disk0/tmp/{file_name}"
    volume_path = f"{save_path}/{file_name}"
    canvas = Canvas(tmp_path, pagesize=letter)
    lines = paragraph.split(".")
    textobject = canvas.beginText(5*cm, 25*cm)
    for line in lines:
        textobject.textLine(line)
    canvas.drawText(textobject)
    canvas.save()
    print(f"Saving PDF file at : {tmp_path}")
    copyfile(tmp_path, volume_path)

內容解密:

  1. 使用 reportlab 函式庫:生成 PDF 檔案。
  2. 文書處理:將段落文字按句號分割,並逐行寫入 PDF。
  3. 檔案儲存:先存到暫存路徑,再複製到指定儲存路徑。

儲存為 CSV 檔案

def save_doc_as_csv(file_name, save_path, paragraph):
    """將段落文字儲存為 CSV 檔案的輔助函式"""
    import csv
    tmp_path = f"/local_disk0/tmp/{file_name}"
    volume_path = f"{save_path}/{file_name}"
    print(f"Saving CSV file at : {tmp_path}")
    with open(tmp_path, 'w', newline='') as file:
        writer = csv.writer(file)
        writer.writerow(["Id", "Sentence"])
        i = 1
        for line in paragraph.split("."):
            writer.writerow([i, line])
            i = i + 1
    copyfile(tmp_path, volume_path)

內容解密:

  1. 使用 csv 函式庫:生成 CSV 檔案。
  2. 文書處理:將段落文字按句號分割,並逐行寫入 CSV,包含 ID 和句子內容。
  3. 檔案儲存:先存到暫存路徑,再複製到指定儲存路徑。

建立 DLT 管道

我們建立了一個新的 DLT 管道,用於串流處理隨機生成的檔案並進行簡單的文字擷取。

建立 streaming tables

@dlt.table(
    name="text_docs_silver",
    comment="為生成式 AI 管道合併的文字檔案。"
)
def text_docs_silver():
    text_docs_df = dlt.read("text_docs_raw").withColumn("type", F.lit("text"))
    csv_docs_df = dlt.read("csv_docs_raw").withColumn("type", F.lit("csv"))
    pdf_docs_df = dlt.read("pdf_docs_raw").withColumn("type", F.lit("pdf"))
    combined_df = text_docs_df.union(csv_docs_df).union(pdf_docs_df)
    return combined_df

程式碼解密:

  1. DLT 表格定義:建立一個名為 text_docs_silver 的 DLT 表格。
  2. 讀取原始資料:從 text_docs_rawcsv_docs_rawpdf_docs_raw 三個原始資料表中讀取資料。
  3. 合併資料:將三個資料表合併成一個統一的資料表 combined_df

使用 Unity Catalog 檢視資料血統

在本章中,我們將探討資料血統在 Databricks Data Intelligence Platform 中的關鍵作用。您將學習如何追蹤資料來源、視覺化資料集轉換、識別上下游依賴關係,並使用 Catalog Explorer 的血統圖功能進行檔案記錄。在本章結束時,您將具備確保資料來自可信來源並在變更發生前發現問題所需的技能。

本章主要內容

  • 在 Unity Catalog 中介紹資料血統
  • 使用 Data Lineage REST API 追蹤資料來源
  • 視覺化上下游資料轉換
  • 識別依賴關係和影響
  • 實作實驗室 - 在組織中記錄資料血統

技術需求

在 Unity Catalog 中介紹資料血統

資料血統是指能夠追蹤 Unity Catalog(UC)中安全物件(如表格)之間的關係,以便使用者可以檢視資料資產如何從上游來源形成並驗證下游依賴關係。

圖 7.1 – 資料血統追蹤資料流動及其如何被內部程式隨時間轉換

在 Databricks 中,使用者可以近乎實時地追蹤資料資產的血統,以便資料管理員確保他們正在使用最新的資產。此外,Unity Catalog 中的資料血統跨越多個工作區,這些工作區連線到同一個 Unity Catalog 中繼儲存,允許資料專業人員全面瞭解資料集如何被轉換以及彼此之間的關聯。

使用 Data Lineage REST API 追蹤資料來源

與 Databricks Data Intelligence Platform 中的許多安全物件一樣,有多種方法可以檢索與物件相關的詳細血統資訊。其中一種常見模式是透過 Data Lineage REST API 檢索特定物件的血統資訊。

目前,Data Lineage REST API 僅限於檢索表格血統資訊以及欄位血統資訊的唯讀檢視。

UC 物件HTTP 動詞端點描述
表格GET/api/2.0/lineage-tracking/table-lineage給定 UC 表格名稱,檢索上游和下游表格連線的列表,以及有關其相關筆記本連線的資訊
欄位GET/api/2.0/lineage-tracking/column-lineage給定 UC 表格名稱和欄位名稱,檢索上游和下游欄位連線的列表
表格 7.1 – Data Lineage REST API 擷取有關 UC 表格和欄位物件的上游和下游連線資訊

然而,預計 Data Lineage REST API 將隨著時間的推移而演進,為資料管理員提供額外的功能,以檢索資訊甚至操縱平台內資料資產的端對端血統。

讓我們看看如何使用 Lineage Tracking API 檢索由本章附帶的 GitHub 儲存函式庫中的資料集產生器筆記本建立的表格的上游和下游連線資訊。

首先,我們將在 Databricks 工作區中建立一個全新的筆記本,並匯入 requests Python 函式庫。我們將專門使用 Python requests 函式庫向 Databricks REST API 傳送資料血統請求並解析來自 Databricks 控制平面的回應:

import requests

程式碼解密:

此段程式碼匯入了 requests 函式庫,這是一個用於在 Python 中傳送 HTTP 請求的流行函式庫。在此範例中,我們將使用它向 Databricks REST API 傳送請求以檢索資料血統資訊。

建立並啟動一個通用叢集以附加筆記本並執行筆記本儲存格。您需要產生一個個人存取權杖(PAT)以向 Databricks REST 端點進行身份驗證並傳送 Data Lineage API 請求。強烈建議將 PAT 儲存在 Databricks 秘密物件中,以避免意外洩漏對 Databricks 工作區的身份驗證詳細資訊。

使用 Data Lineage REST API 追蹤資料來源

重要注意事項

以下的程式碼片段僅供說明用途,您需要更新工作區名稱以符合您的 Databricks 工作區名稱,以及 API 權杖的值。

使用 requests 函式庫傳送請求到 Data Lineage API

讓我們使用 requests 函式庫來傳送請求到 Data Lineage API,指定完整的端點:

response = requests.get(
    f"https://{WORKSPACE_NAME}.cloud.databricks.com/api/2.0/lineage-tracking/table-lineage",
    headers={
        "Authorization": f"Bearer {API_TOKEN}"
    },
    json={
        "table_name": FULLY_QUALIFIED_TABLE_NAME,
        "include_entity_lineage": "true"
    }
)
print(response.json())

內容解密:

  • 使用 requests.get 方法傳送 GET 請求到 Data Lineage API 的 table-lineage 端點。
  • 請求中包含 Authorization 標頭,使用 Bearer 權杖進行身份驗證。
  • json 引數中指定了 table_nameinclude_entity_lineage,用於檢索表的沿襲資訊。

解析 Data Lineage API 的回應

接下來,讓我們新增一些輔助函式,用於解析 Data Lineage API 的回應,並以易於理解的方式列印連線資訊:

def print_table_info(conn_type, table_info_json):
    info = table_info_json["tableInfo"]
    print(f"""
+
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
--+
| {conn_type.upper()} Table Connection Info
|
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
--|
| Table name: {info['name']}
|
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
--|
| Catalog name: {info['catalog_name']}
|
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
--|
| Table type: {info['table_type']}
|
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
--|
| Lineage timestamp: {info['lineage_timestamp']}
+
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
--+
""")

def print_notebook_info(conn_type, notebook_info):
    print(f"""
+
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
--+
| {conn_type.upper()} Notebook Connection Info:
|
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
--|
| Workspace id: {str(notebook_info['workspace_id'])}
|
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
--|
| Notebook id: {str(notebook_info['notebook_id'])}
|
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
--|
| Timestamp: {notebook_info['lineage_timestamp']}
+
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
--+
""")

內容解密:

  • print_table_info 函式用於列印表格連線資訊,包括表格名稱、目錄名稱、表格型別等。
  • print_notebook_info 函式用於列印筆記本連線資訊,包括工作區 ID、筆記本 ID 和時間戳記。

更新回應處理邏輯

現在,讓我們更新之前的程式碼片段,用於擷取表格沿襲資訊,並呼叫這些輔助函式:

if response.status_code == 200:
    connection_flows = ["upstreams", "downstreams"]
    for flow in connection_flows:
        if flow in response.json():
            connections = response.json()[flow]
            for conn in connections:
                if "tableInfo" in conn:
                    print_table_info(flow, conn)
                elif "notebookInfos" in conn:
                    for notebook_info in conn["notebookInfos"]:
                        print_notebook_info(flow, notebook_info)

內容解密:

  • 檢查回應狀態碼是否為 200,如果是,則處理回應資料。
  • 遍歷 upstreamsdownstreams 連線,並呼叫相應的輔助函式列印連線資訊。

檢索欄位沿襲資訊

Data Lineage API 不僅可以追蹤表格之間的連線,還可以檢索欄位的沿襲資訊。讓我們檢索 description 欄位的沿襲資訊:

column_name = "description"
response = requests.get(
    f"https://{WORKSPACE_NAME}.cloud.databricks.com/api/2.0/lineage-tracking/column-lineage",
    headers={
        "Authorization": f"Bearer {API_TOKEN}"
    },
    json={
        "table_name": FULLY_QUALIFIED_TABLE_NAME,
        "column_name": column_name
    }
)

def print_column_info(conn_type, column_info):
    print(f"""
Connection flow: {conn_type.upper()}
Column name: {column_info['name']}
Catalog name: {column_info['catalog_name']}
Schema name: {column_info['schema_name']}
Table name: {column_info['table_name']}
Table type: {column_info['table_type']}
Lineage timestamp: {column_info['lineage_timestamp']}
""")

if response.status_code == 200:
    if "upstream_cols" in response.json():
        print("| Upstream cols:")
        for column_info in response.json()['upstream_cols']:
            print_column_info("Upstream", column_info)
    if "downstream_cols" in response.json():
        print("| Downstream cols:")
        for column_info in response.json()['downstream_cols']:
            print_column_info("Downstream", column_info)

內容解密:

  • 使用 requests.get 方法傳送 GET 請求到 Data Lineage API 的 column-lineage 端點。
  • 請求中包含 table_namecolumn_name,用於檢索特定欄位的沿襲資訊。
  • print_column_info 函式用於列印欄位連線資訊。

使用 Databricks UI 追蹤資料沿襲

除了使用 Data Lineage API,我們還可以使用 Databricks UI 追蹤資料沿襲。首先,執行資料產生器筆記本,建立多個資料集。然後,在 Databricks 工作區中,開啟 Catalog Explorer,搜尋並點選 combined_table 資料表。在右側面板中,切換到 Lineage 標籤頁,即可檢視資料表的沿襲資訊。

圖表翻譯:

此圖示展示了使用 Databricks UI 追蹤資料沿襲的過程,包括執行資料產生器筆記本、開啟 Catalog Explorer 和檢視沿襲資訊等步驟。