返回文章列表

Python 操作 PostgreSQL 與 Elasticsearch 資料函式庫整合應用

本文深入探討如何使用 Python 操作 PostgreSQL 和 Elasticsearch 資料函式庫,涵蓋資料函式庫連線、資料插入、批次插入、資料提取、查詢、以及如何結合 Pandas DataFrames 和 Psycopg2 進行高效的資料函式庫操作。同時,文章也介紹瞭如何使用 Elasticsearch

資料函式庫 Web 開發

在資料工程領域,資料函式庫操作是不可或缺的一環。本文將示範如何使用 Python 連線並操作 PostgreSQL 關係型資料函式庫和 Elasticsearch 非關係型資料函式庫。首先,我們會使用 psycopg2 函式函式庫連線 PostgreSQL 資料函式庫,示範如何插入單筆和多筆資料,並說明提交變更和關閉連線的必要性。接著,我們會介紹如何使用 executemany() 方法進行批次資料插入,以提升效率。同時,也會示範如何從 PostgreSQL 資料函式庫提取資料,並使用 fetchall() 方法取得查詢結果。此外,文章還會介紹如何使用 Pandas DataFrames 查詢資料,簡化資料處理流程。最後,我們將探討如何使用 Elasticsearch 儲存和查詢 JSON 檔案,以及如何建立索引和進行資料操作。

連線資料函式庫

首先,需要建立與 PostgreSQL 資料函式庫的連線。這可以透過 psycopg2 函式庫來完成。以下是建立連線的步驟:

import psycopg2

# 連線資料函式庫
conn = psycopg2.connect(
    host="localhost",
    database="mydatabase",
    user="myuser",
    password="mypassword"
)

# 建立 cursor 物件
cur = conn.cursor()

資料插入

建立連線後,可以使用 SQL 指令將資料插入資料函式庫。以下是插入單一筆資料的範例:

# 定義 SQL 指令
query = "INSERT INTO users (id, name, street, city, zip) VALUES (%s, %s, %s, %s, %s)"

# 定義資料
data = (1, 'Big Bird', 'Sesame Street', 'Fakeville', '12345')

# 執行 SQL 指令
cur.execute(query, data)

多筆資料插入

如果需要插入多筆資料,可以使用以下方法:

# 定義 SQL 指令
query = "INSERT INTO users (id, name, street, city, zip) VALUES (%s, %s, %s, %s, %s)"

# 定義多筆資料
data = [
    (1, 'Big Bird', 'Sesame Street', 'Fakeville', '12345'),
    (2, 'Elmo', 'Sesame Street', 'Fakeville', '12345'),
    (3, 'Cookie Monster', 'Sesame Street', 'Fakeville', '12345')
]

# 執行 SQL 指令
for row in data:
    cur.execute(query, row)

提交變更

執行 SQL 指令後,需要提交變更才能使資料函式庫更新。可以使用以下方法:

# 提交變更
conn.commit()

閉合連線

最後,需要閉合連線以釋放資源。可以使用以下方法:

# 閉合連線
conn.close()

內容解密:

以上程式碼示範瞭如何連線 PostgreSQL 資料函式庫、插入單一筆資料和多筆資料,以及提交變更和閉合連線。這些步驟是基本的資料函式庫操作,需要仔細理解和掌握。

圖表翻譯:

以下是使用 Plantuml 圖表展示資料函式庫連線和資料插入的流程: 這個圖表展示了資料函式庫連線和資料插入的流程,幫助理解程式碼的邏輯和執行順序。

使用 Psycopg2 進行批次插入

在這個例子中,我們將使用 Psycopg2 來處理批次插入。首先,讓我們瞭解一下 Psycopg2 是什麼。Psycopg2 是一個 PostgreSQL 的 Python 驅動程式,允許您在 Python 中存取 PostgreSQL 資料函式庫。

步驟 1:匯入所需的函式函式庫

import psycopg2 as db
from faker import Faker

步驟 2:建立 Faker 物件和資料陣列

fake = Faker()
data = []
i = 2

步驟 3:建立假資料

for r in range(1000):
    data.append((i, fake.name(), fake.street_address(), fake.city(), fake.zipcode()))
    i += 1

步驟 4:轉換資料陣列為元組

data_for_db = tuple(data)

步驟 5:建立 Psycopg2 連線

conn_string = "dbname='dataengineering' host='localhost' user='postgres' password='postgres'"
conn = psycopg2.connect(conn_string)
cur = conn.cursor()

步驟 6:執行批次插入

cur.executemany("INSERT INTO table_name (id, name, address, city, zipcode) VALUES (%s, %s, %s, %s, %s)", data_for_db)
conn.commit()
cur.close()
conn.close()

內容解密:

在這個例子中,我們使用 Psycopg2 來連線 PostgreSQL 資料函式庫,並使用 executemany() 方法來執行批次插入。這個方法可以讓我們一次性插入多筆資料,從而提高效率。

圖表翻譯:

這個流程圖顯示了我們如何使用 Psycopg2 來進行批次插入。首先,我們建立 Psycopg2 連線,然後建立 Faker 物件和資料陣列。接下來,我們建立假資料,轉換資料陣列為元組,然後執行批次插入。最後,我們關閉 Psycopg2 連線。

PostgreSQL 資料函式庫操作

資料插入

要將資料插入 PostgreSQL 資料函式庫,需要使用 psycopg2 函式庫。以下是插入資料的步驟:

  1. 匯入 psycopg2 函式庫並建立資料函式庫連線。
  2. 建立一個 cursor 物件,用於執行 SQL 查詢。
  3. 定義一個 SQL 查詢,使用 insert into 陳述式將資料插入資料函式庫。
  4. 使用 executemany() 方法批次插入資料。
  5. 提交交易以儲存變更。

以下是插入資料的範例程式碼:

import psycopg2 as db

# 定義資料函式庫連線字串
conn_string = "dbname='dataengineering' host='localhost' user='postgres' password='postgres'"

# 建立資料函式庫連線
conn = db.connect(conn_string)

# 建立一個 cursor 物件
cur = conn.cursor()

# 定義 SQL 查詢
query = "insert into users (id, name, street, city, zip) values (%s, %s, %s, %s, %s)"

# 定義資料
data_for_db = [(1, 'John Doe', '123 Main St', 'Anytown', '12345'), ...]

# 執行查詢
cur.executemany(query, data_for_db)

# 提交交易
conn.commit()

資料提取

要從 PostgreSQL 資料函式庫提取資料,需要使用 psycopg2 函式庫。以下是提取資料的步驟:

  1. 匯入 psycopg2 函式庫並建立資料函式庫連線。
  2. 建立一個 cursor 物件,用於執行 SQL 查詢。
  3. 定義一個 SQL 查詢,使用 select 陳述式從資料函式庫提取資料。
  4. 執行查詢並取得結果。
  5. 使用 fetchall() 方法或迭代 cursor 物件取得結果。

以下是提取資料的範例程式碼:

import psycopg2 as db

# 定義資料函式庫連線字串
conn_string = "dbname='dataengineering' host='localhost' user='postgres' password='postgres'"

# 建立資料函式庫連線
conn = db.connect(conn_string)

# 建立一個 cursor 物件
cur = conn.cursor()

# 定義 SQL 查詢
query = "select * from users"

# 執行查詢
cur.execute(query)

# 取得結果
results = cur.fetchall()

# 或使用迭代 cursor 物件
for record in cur:
    print(record)

內容解密:

以上程式碼示範瞭如何使用 psycopg2 函式庫插入和提取 PostgreSQL 資料函式庫的資料。插入資料的過程涉及建立資料函式庫連線、建立 cursor 物件、定義 SQL 查詢、執行查詢和提交交易。提取資料的過程涉及建立資料函式庫連線、建立 cursor 物件、定義 SQL 查詢、執行查詢和取得結果。

資料函式庫查詢與資料擷取

在資料函式庫中,查詢和擷取資料是非常重要的步驟。以下將介紹如何使用Python的資料函式庫連線函式庫來進行這些操作。

查詢資料

首先,需要建立一個資料函式庫連線和cursor物件。然後,可以使用SQL語法來查詢資料函式庫中的資料。

import psycopg2

# 建立資料函式庫連線
conn = psycopg2.connect(
    database="mydatabase",
    user="myuser",
    password="mypassword",
    host="myhost",
    port="myport"
)

# 建立cursor物件
cur = conn.cursor()

# 查詢資料
cur.execute("SELECT * FROM mytable")

擷取資料

有幾種方法可以用來擷取查詢結果的資料:

  1. fetchall(): 擷取所有查詢結果的資料。
records = cur.fetchall()
print(records)
  1. fetchmany(howmany): 擷取指定數量的查詢結果的資料。
records = cur.fetchmany(10)
print(records)
  1. fetchone(): 擷取單一筆查詢結果的資料。
record = cur.fetchone()
print(record)

查詢結果的資料結構

無論是使用哪種方法來擷取資料,查詢結果的資料都會以陣列的形式傳回。即使只查詢一筆資料,也會傳回一個包含單一元素的陣列。

data = cur.fetchone()
print(data[0])  # 存取第一個欄位的值

查詢結果的資料數量

可以使用 rowcount 屬性來取得查詢結果的資料數量。

print(cur.rowcount)  # 列印查詢結果的資料數量

目前資料位置

可以使用 rownumber 屬性來取得目前的資料位置。

print(cur.rownumber)  # 列印目前的資料位置

將資料寫入CSV檔

可以使用 copy_to() 方法將查詢結果的資料寫入CSV檔。

import csv

# 開啟CSV檔
with open('data.csv', 'w', newline='') as csvfile:
    # 將資料寫入CSV檔
    cur.copy_to(csvfile, 'mytable')

關閉連線

最後,別忘了關閉資料函式庫連線。

conn.close()

使用Python連線PostgreSQL資料函式庫

要連線PostgreSQL資料函式庫,需要使用psycopg2函式庫。以下是連線資料函式庫的步驟:

步驟1:安裝psycopg2函式庫

可以使用pip安裝psycopg2函式庫:

pip install psycopg2

步驟2:匯入psycopg2函式庫

import psycopg2

步驟3:建立連線

建立連線需要提供資料函式庫名稱、主機名稱、使用者名稱和密碼:

conn_string = "dbname='dataengineering' host='localhost' user='postgres' password='postgres'"
conn = psycopg2.connect(conn_string)

步驟4:建立cursor物件

cursor物件用於執行SQL查詢:

cur = conn.cursor()

步驟5:執行SQL查詢

可以使用cursor物件執行SQL查詢:

cur.execute("SELECT * FROM users")

步驟6:取得查詢結果

可以使用fetchall()方法取得查詢結果:

rows = cur.fetchall()

步驟7:關閉連線

關閉連線:

conn.close()

使用DataFrames查詢資料

可以使用pandas函式庫的DataFrames查詢資料。以下是步驟:

步驟1:匯入pandas函式庫

import pandas as pd

步驟2:建立連線

建立連線需要提供資料函式庫名稱、主機名稱、使用者名稱和密碼:

conn_string = "dbname='dataengineering' host='localhost' user='postgres' password='postgres'"
conn = psycopg2.connect(conn_string)

步驟3:執行SQL查詢

可以使用read_sql()方法執行SQL查詢:

df = pd.read_sql("SELECT * FROM users", conn)

步驟4:取得查詢結果

查詢結果會儲存在DataFrames中:

print(df)

步驟5:關閉連線

關閉連線:

conn.close()

使用Elasticsearch儲存和查詢資料

Elasticsearch是一種NoSQL資料函式庫,可以儲存和查詢JSON檔案。以下是步驟:

步驟1:安裝Elasticsearch

可以使用pip安裝Elasticsearch:

pip install elasticsearch

步驟2:匯入Elasticsearch函式庫

from elasticsearch import Elasticsearch

步驟3:建立連線

建立連線需要提供Elasticsearch主機名稱和埠:

es = Elasticsearch([{'host': 'localhost', 'port': 9200}])

步驟4:建立索引

建立索引需要提供索引名稱和對映:

es.indices.create(index='users', body={'mappings': {'properties': {'name': {'type': 'text'}, 'age': {'type': 'integer'}}}})

步驟5:儲存資料

可以使用index()方法儲存資料:

es.index(index='users', body={'name': 'John', 'age': 30})

步驟6:查詢資料

可以使用search()方法查詢資料:

result = es.search(index='users', body={'query': {'match': {'name': 'John'}}})

內容解密:

以上步驟展示瞭如何使用Python連線PostgreSQL資料函式庫、使用DataFrames查詢資料和使用Elasticsearch儲存和查詢資料。這些步驟可以幫助您瞭解如何使用Python進行資料函式庫操作和資料分析。

圖表翻譯:

這個圖表展示了Python連線PostgreSQL資料函式庫、查詢資料、儲存資料到Elasticsearch和查詢結果的流程。

Elasticsearch 入門

Elasticsearch 是一個強大的搜尋和分析引擎,能夠快速地處理大量的資料。要使用 Elasticsearch,首先需要安裝 elasticsearch 函式庫。

安裝 Elasticsearch 函式庫

可以使用 pip3 來安裝 elasticsearch 函式庫:

pip3 install elasticsearch

這將安裝最新版本的 elasticsearch 函式庫。如果您按照第 2 章的指示安裝了 Elasticsearch,則需要安裝相應版本的 elasticsearch 函式庫。您可以使用以下程式碼來驗證安裝和檢查版本:

import elasticsearch
print(elasticsearch.__version__)

這應該會輸出版本號,例如 (7.6.0)

將資料插入 Elasticsearch

在查詢 Elasticsearch 之前,需要將一些資料載入索引。要載入資料,需要建立連線,然後發出命令給 Elasticsearch。以下是將紀錄新增到 Elasticsearch 的步驟:

  1. 匯入函式庫:
from elasticsearch import Elasticsearch
from faker import Faker
fake = Faker()
  1. 建立連線到 Elasticsearch:
es = Elasticsearch()

這段程式碼假設您的 Elasticsearch 例項正在 localhost 上執行。如果不是,則可以指定 IP 地址:

es = Elasticsearch({'127.0.0.1'})
  1. 現在可以發出命令給 Elasticsearch 例項。索引方法允許新增資料。該方法需要索引名稱、檔案型別和主體。主體是傳送給 Elasticsearch 的 JSON 物件。以下程式碼建立一個 JSON 物件新增到資料函式庫,然後使用索引方法將其新增到 Elasticsearch:
data = {'name': fake.name(), 'address': fake.address()}
es.index(index='my_index', body=data)

這將建立一個名為 my_index 的索引,並將 data 物件新增到其中。

圖表翻譯:

@startuml
skinparam backgroundColor #FEFEFE
skinparam componentStyle rectangle

title Python 操作 PostgreSQL 與 Elasticsearch 資料函式庫整合應用

package "資料庫架構" {
    package "應用層" {
        component [連線池] as pool
        component [ORM 框架] as orm
    }

    package "資料庫引擎" {
        component [查詢解析器] as parser
        component [優化器] as optimizer
        component [執行引擎] as executor
    }

    package "儲存層" {
        database [主資料庫] as master
        database [讀取副本] as replica
        database [快取層] as cache
    }
}

pool --> orm : 管理連線
orm --> parser : SQL 查詢
parser --> optimizer : 解析樹
optimizer --> executor : 執行計畫
executor --> master : 寫入操作
executor --> replica : 讀取操作
cache --> executor : 快取命中

master --> replica : 資料同步

note right of cache
  Redis/Memcached
  減少資料庫負載
end note

@enduml

這個圖表顯示了建立連線、發出命令和新增資料的過程。

內容解密:

上述程式碼使用 elasticsearch 函式庫來建立連線到 Elasticsearch 例項,然後使用索引方法將資料新增到索引中。 fake 物件用於生成隨機資料。 es.index() 方法需要索引名稱、檔案型別和主體。主體是傳送給 Elasticsearch 的 JSON 物件。

使用Elasticsearch進行資料儲存和查詢

Elasticsearch是一種強大的搜尋和分析引擎,允許您以高效和可擴充套件的方式儲存和查詢大型資料集。在本節中,我們將探討如何使用Elasticsearch進行資料儲存和查詢。

從技術架構視角來看,Psycopg2 提供了穩定的 PostgreSQL 資料函式庫連線與操作方案,其 executemany() 方法有效提升了批次資料插入的效能。然而,單純依靠 SQL 查詢在處理複雜資料分析時略顯不足。Pandas DataFrames 的引入,則有效彌補了這項缺憾,其與 Psycopg2 的整合,讓開發者能以更直觀的方式操作和分析資料。更進一步,Elasticsearch 的加入,為系統帶來了全文檢索和更靈活的資料查詢能力,尤其適用於需要高效率搜尋和分析大量非結構化資料的場景。然而,匯入 Elasticsearch 也意味著更高的系統複雜度和維護成本。對於資源有限的團隊,需要仔細權衡其帶來的效益與成本。玄貓認為,結合 PostgreSQL 的關聯式資料儲存能力,Pandas 的資料處理能力,以及 Elasticsearch 的搜尋和分析能力,可以構建一個功能強大且靈活的資料平臺,足以應對多樣化的資料處理需求。未來,預期這三種技術的整合將更加緊密,並在雲端原生環境下發揮更大的作用。對於希望提升資料處理能力的團隊,建議優先掌握 Psycopg2 和 Pandas 的整合應用,逐步引入 Elasticsearch 以滿足更進階的搜尋和分析需求。