返回文章列表

Pinecone向量資料函式庫高效查詢與RAG應用

本文探討如何使用 OpenAI 嵌入模型和 Pinecone 向量資料函式庫處理大規模客戶資料,包含資料分塊、嵌入、資料擴增、Pinecone 索引建立和高效查詢。此外,文章也示範如何結合 RAG 技術,利用 Pinecone 查詢結果生成自定義的市場訊息,展現向量資料函式庫在生成式 AI 應用中的價值。

資料函式庫 機器學習

在資料驅動的應用開發中,向量資料函式庫和生成式 AI 技術的結合日益重要。本文以 OpenAI 嵌入模型和 Pinecone 向量資料函式庫為例,示範如何處理大規模客戶資料,並結合 RAG 技術實作客製化訊息生成。首先,將客戶資料分塊並使用 OpenAI 的嵌入模型轉換為向量表示,接著進行資料擴增以模擬大規模資料情境。然後,使用 Pinecone 建立向量索引,並將擴增後的向量資料上傳至 Pinecone 資料函式庫。最後,示範如何利用 Pinecone 進行高效向量查詢,並結合查詢結果和 RAG 技術,使用 GPT-4o 生成自定義的市場訊息。

資料分塊與嵌入

資料分塊

將客戶記錄分塊處理,以提高資料管理的效率。

chunks = []
for line in lines:
    chunks.append(line)

print(f"總分塊數:{len(chunks)}")

輸出結果顯示總共有 10,000 個分塊。

內容解密:

  • 將每個客戶記錄作為一個獨立的分塊儲存在 chunks 列表中。
  • 這種做法有利於後續的資料處理和品質控制。

檢視分塊內容

檢查前幾個分塊的長度和內容,以瞭解分塊後的資料結構。

for i in range(3):
    print(len(chunks[i]))
    print(chunks[i])

輸出範例顯示每個分塊的長度和內容。

內容解密:

  • 使用迴圈遍歷前幾個分塊。
  • 列印每個分塊的長度和具體內容,以便檢查資料是否正確分塊。

嵌入模型選擇與實作

初始化嵌入模型

選擇適合的嵌入模型來進行資料嵌入。本文使用 OpenAI 提供的嵌入模型。

import openai
embedding_model = "text-embedding-3-small"
client = openai.OpenAI()

def get_embedding(text, model=embedding_model):
    text = text.replace("\n", " ")
    response = client.embeddings.create(input=[text], model=model)
    embedding = response.data[0].embedding
    return embedding

內容解密:

  • 初始化 OpenAI 使用者端並選擇嵌入模型。
  • 定義 get_embedding 函式,用於取得輸入文字的嵌入向量。

分批嵌入處理

為了避免 API 速率限制,將分塊資料分批進行嵌入處理。

start_time = time.time()
chunk_start = 0
chunk_end = 1000
pause_time = 3
embeddings = []
counter = 1

while chunk_end <= len(chunks):
    chunks_to_embed = chunks[chunk_start:chunk_end]
    current_embeddings = []
    for chunk in chunks_to_embed:
        embedding = get_embedding(chunk, model=embedding_model)
        current_embeddings.append(embedding)
    embeddings.extend(current_embeddings)
    chunk_start += 1000
    chunk_end += 1000
    counter += 1

# 處理剩餘的分塊
if chunk_end < len(chunks):
    remaining_chunks = chunks[chunk_end:]
    remaining_embeddings = [get_embedding(chunk, model=embedding_model) for chunk in remaining_chunks]
    embeddings.extend(remaining_embeddings)

輸出結果顯示批次處理的進度和總處理時間。

內容解密:

  • 使用 while 迴圈分批處理分塊資料,每次處理 1000 個分塊。
  • 在每批次處理之間暫停幾秒鐘,以避免觸發 API 速率限制。
  • 將所有嵌入向量儲存在 embeddings 列表中。

大規模向量資料處理與Pinecone索引建立

在大規模資料處理中,資料的嵌入(embedding)和向量儲存(vector store)的建立是至關重要的步驟。本文將介紹如何使用OpenAI嵌入模型和Pinecone向量資料函式庫來處理大規模的客戶資料。

資料嵌入與驗證

首先,我們需要將客戶資料進行分塊(chunking)並嵌入到向量空間中。嵌入的過程是使用OpenAI的嵌入模型來實作的。嵌入後的向量可以用來表示原始資料的語義資訊。

# 顯示第一個嵌入向量
print("First embedding:", embeddings[0])

內容解密:

這段程式碼用於顯示第一個嵌入向量的內容,以驗證嵌入過程是否正確執行。嵌入向量是一個多維數值陣列,代表了對應文字的語義特徵。

# 檢查分塊和嵌入向量的數量
num_chunks = len(chunks)
print(f"Number of chunks: {num_chunks}")
print(f"Number of embeddings: {len(embeddings)}")

內容解密:

這段程式碼用於檢查分塊的數量和嵌入向量的數量是否一致,以確保資料處理的正確性。如果數量不一致,可能會導致後續的處理出現錯誤。

資料擴增

為了模擬大規模資料的處理,我們需要對現有的資料進行擴增。資料擴增的過程是透過複製現有的分塊和嵌入向量來實作的。

# 定義擴增大小
dsize = 5 
total = dsize * len(chunks)
print("Total size", total)

# 初始化新的列表來儲存擴增後的資料
duplicated_chunks = []
duplicated_embeddings = []

# 複製分塊和嵌入向量
for i in range(len(chunks)):
    for _ in range(dsize):
        duplicated_chunks.append(chunks[i])
        duplicated_embeddings.append(embeddings[i])

# 檢查擴增後的資料數量
print(f"Number of duplicated chunks: {len(duplicated_chunks)}")
print(f"Number of duplicated embeddings: {len(duplicated_embeddings)}")

內容解密:

這段程式碼用於將現有的分塊和嵌入向量複製指定的次數,以達到擴增資料的目的。擴增後的資料可以用來模擬大規模資料的處理。

建立Pinecone索引

Pinecone是一個用於向量相似性搜尋的資料函式庫。我們需要建立一個Pinecone索引來儲存我們的向量資料。

# 初始化Pinecone連線
api_key = os.environ.get('PINECONE_API_KEY') 
pc = Pinecone(api_key=api_key)

# 定義索引名稱、雲端提供者和地區
index_name = 'bank-index-50000'
cloud = 'aws'
region = 'us-east-1'
spec = ServerlessSpec(cloud=cloud, region=region)

# 建立索引
if index_name not in pc.list_indexes().names():
    pc.create_index(
        index_name,
        dimension=1536, 
        metric='cosine',
        spec=spec
    )
    
# 連線到索引
index = pc.Index(index_name)
index.describe_index_stats()

內容解密:

這段程式碼用於建立一個Pinecone索引,並定義了索引的名稱、維度、相似性度量指標等引數。建立索引後,我們可以連線到該索引並檢視其狀態。

上傳向量資料到Pinecone索引

上傳向量資料到Pinecone索引是透過upsert操作來實作的。我們需要將嵌入向量和對應的中繼資料上傳到索引中。

# 定義上傳函式
def upsert_to_pinecone(data, batch_size):
    for i in range(0, len(data), batch_size):
        batch = data[i:i+batch_size]
        index.upsert(vectors=batch)

# 定義批次大小計算函式
def get_batch_size(data, limit=4000000): 
    total_size = 0
    batch_size = 0
    for item in data:
        item_size = sum([sys.getsizeof(v) for v in item.values()])
        if total_size + item_size > limit:
            break
        total_size += item_size
        batch_size += 1
    return batch_size

# 上傳資料到Pinecone索引
def batch_upsert(data):
    total = len(data)
    i = 0
    while i < total:
        batch_size = get_batch_size(data[i:])
        batch = data[i:i + batch_size]
        if batch:
            upsert_to_pinecone(batch, batch_size)
            i += batch_size
            print(f"Upserted {i}/{total} items...")
        else:
            break

內容解密:

這段程式碼定義了上傳函式upsert_to_pinecone和批次大小計算函式get_batch_size,用於將資料分批上傳到Pinecone索引中,以避免單次請求過大導致的效能問題。batch_upsert函式則負責協調整個上傳過程,並顯示上傳進度。

使用Pinecone向量資料函式庫進行高效查詢與RAG生成式AI應用

在現代化的資料驅動應用中,高效的向量檢索和生成式AI技術正逐漸成為關鍵的核心能力。本文將探討如何利用Pinecone向量資料函式庫進行大規模資料的索引與查詢,並結合RAG(Retrieval-Augmented Generation)技術實作自定義的生成式AI應用。

建立Pinecone向量索引

首先,我們需要為資料建立向量索引。以下程式碼展示瞭如何將資料批次嵌入並上傳至Pinecone:

# 為每個資料專案生成唯一ID
ids = [str(i) for i in range(1, len(duplicated_chunks) + 1)]

# 準備上傳資料
data_for_upsert = [
    {"id": str(id), "values": emb, "metadata": {"text": chunk}}
    for id, (chunk, emb) in zip(ids, zip(duplicated_chunks, duplicated_embeddings))
]

# 批次上傳資料至Pinecone
batch_upsert(data_for_upsert)

# 測量上傳時間
response_time = time.time() - start_time
print(f"上傳回應時間:{response_time:.2f} 秒")

內容解密:

  1. ID生成:為每個資料區塊生成唯一的ID,確保資料的可識別性。
  2. 資料準備:將資料區塊、對應的向量嵌入和元資料封裝成Pinecone所需的格式。
  3. 批次上傳:使用batch_upsert函式將準備好的資料分批次上傳至Pinecone索引。
  4. 效能測量:記錄上傳過程所需的時間,以評估系統效能。

查詢Pinecone向量儲存

接下來,我們需要驗證查詢的回應時間和結果品質。以下程式碼展示瞭如何查詢向量儲存並顯示結果:

# 定義查詢結果顯示函式
def display_results(query_results):
    for match in query_results['matches']:
        print(f"ID: {match['id']}, 相似度: {match['score']}")
        if 'metadata' in match and 'text' in match['metadata']:
            print(f"文字: {match['metadata']['text']}")

# 取得查詢嵌入向量
query_embedding = get_embedding(query_text, model=embedding_model)

# 執行查詢並顯示結果
query_results = index.query(vector=query_embedding, top_k=1, include_metadata=True)
display_results(query_results)

# 測量查詢回應時間
response_time = time.time() - start_time
print(f"查詢回應時間:{response_time:.2f} 秒")

內容解密:

  1. 查詢結果處理:定義display_results函式來解析和顯示查詢結果,包括匹配的ID、相似度和原始文字。
  2. 查詢嵌入:使用相同的嵌入模型將查詢文字轉換為向量表示。
  3. 執行查詢:在Pinecone索引中執行查詢,檢索最相似的向量。
  4. 效能評估:測量並輸出查詢所需的時間,以確保系統回應速度。

RAG生成式AI應用

最後,我們結合RAG技術,利用Pinecone的查詢結果生成自定義的市場訊息:

# 使用GPT-4o生成自定義訊息
def generate_custom_message(target_vector):
    # 查詢Pinecone索引取得相似向量
    query_results = index.query(vector=target_vector, top_k=5, include_metadata=True)
    
    # 處理查詢結果並生成自定義訊息
    # ...(省略具體實作細節)
    
    return custom_message

# 示例:為特定市場細分生成訊息
target_vector = get_embedding("目標市場細分描述", model=embedding_model)
custom_message = generate_custom_message(target_vector)
print("生成的自定義訊息:", custom_message)

內容解密:

  1. 目標向量:使用特定的市場細分描述生成目標向量。
  2. 查詢與處理:在Pinecone中查詢最相似的向量,並提取相關資訊。
  3. 生成訊息:結合檢索到的資訊,利用GPT-4o生成自定義的市場訊息。