返回文章列表

Haystack 檢索器評估與效能提升

本文探討 Haystack 中檢索器評估方法,比較內建 eval() 方法和自定義 Pipeline 搭配 EvalRetriever

機器學習 自然語言處理

Haystack 提供兩種評估檢索器的方法:使用內建 eval() 方法和自定義 Pipeline 結合 EvalRetrievereval() 方法適用於開放域和封閉域問答系統,但對於需要根據特定 ID 篩選的資料集(例如 SubjQA)則需要自定義流程。本文採用自定義 Pipeline 方法,可以更彈性地設定評估指標和查詢流程。除了召回率,平均精確度也是重要的評估指標,它更重視正確答案在結果排序中的位置。為了根據產品評估召回率並匯總結果,我們使用 Pipeline 建構評估流程,每個節點代表一個處理步驟,透過 run() 方法接收輸入並產生輸出。首先,建立 Label 物件列表,包含問題、答案、ID 和後設資料等資訊,然後將這些標籤寫入索引,以便後續評估使用。接著,建立 run_pipeline 函式,將每個問題-答案對饋送到評估管道,並追蹤正確的檢索結果。透過調整 top_k_retriever 引數,可以控制檢索的結果數量,並使用篩選器限制檢索範圍。

在 Haystack 中評估檢索器的方法

在 Haystack 中,有兩種方法可以評估檢索器(retriever):

  1. 使用檢索器內建的 eval() 方法:此方法可用於開放域和封閉域的問答系統,但不適用於像 SubjQA 這樣的資料集,因為每個檔案都與單一產品配對,需要根據產品 ID 進行篩選。
  2. 建立自定義的 Pipeline,將檢索器與 EvalRetriever 類別結合:這使得實作自定義指標和查詢流程成為可能。

檢索器評估指標

除了召回率(recall)之外,另一個重要的評估指標是平均精確度(mean average precision, mAP),它獎勵能夠將正確答案排在檔案排名較前面的檢索器。

自定義評估流程

由於需要根據產品評估召回率並在所有產品上匯總,因此選擇第二種方法。每個 Pipeline 節點代表一個類別,該類別透過 run() 方法接收輸入並產生輸出。

PipelineNode 類別

class PipelineNode:
    def __init__(self):
        self.outgoing_edges = 1

    def run(self, **kwargs):
        ...
        return (outputs, "outgoing_edge_name")

EvalRetrieverPipeline 類別

from haystack.pipeline import Pipeline
from haystack.eval import EvalDocuments

class EvalRetrieverPipeline:
    def __init__(self, retriever):
        self.retriever = retriever
        self.eval_retriever = EvalDocuments()
        pipe = Pipeline()
        pipe.add_node(component=self.retriever, name="ESRetriever", inputs=["Query"])
        pipe.add_node(component=self.eval_retriever, name="EvalRetriever", inputs=["ESRetriever"])
        self.pipeline = pipe

pipe = EvalRetrieverPipeline(es_retriever)

建立標籤索引

首先,建立一個 Label 物件列表,透過迴圈遍歷測試集中的每個問題,提取匹配的答案和額外的後設資料。

Label 物件

from haystack import Label

labels = []
for i, row in dfs["test"].iterrows():
    meta = {"item_id": row["title"], "question_id": row["id"]}
    if len(row["answers.text"]):
        for answer in row["answers.text"]:
            label = Label(
                question=row["question"], 
                answer=answer, 
                id=i, 
                origin=row["id"],
                meta=meta, 
                is_correct_answer=True, 
                is_correct_document=True,
                no_answer=False)
            labels.append(label)
    else:
        label = Label(
            question=row["question"], 
            answer="", 
            id=i, 
            origin=row["id"],
            meta=meta, 
            is_correct_answer=True, 
            is_correct_document=True,
            no_answer=True)
        labels.append(label)

寫入標籤索引

document_store.write_labels(labels, index="label")
print(f"""Loaded {document_store.get_label_count(index="label")} question-answer pairs""")

評估檢索器

建立一個函式,將每個問題-答案對饋送到評估管道,並追蹤正確的檢索結果。

run_pipeline 函式

def run_pipeline(pipeline, top_k_retriever=10, top_k_reader=4):
    for l in labels_agg:
        _ = pipeline.pipeline.run(
            query=l.question,
            top_k_retriever=top_k_retriever,
            top_k_reader=top_k_reader,
            top_k_eval_documents=top_k_retriever,
            labels=l,
            filters={"item_id": [l.meta["item_id"]], "split": ["test"]})

內容解密:

此函式遍歷所有聚合的標籤,並對每個問題執行檢索和評估。top_k_retrievertop_k_reader 引數控制檢索和閱讀的結果數量。filters 引數用於根據產品 ID 和測試集進行篩選。

改善問答系統的檢索與閱讀器評估

在前面的章節中,我們已經瞭解瞭如何建立一個基本的問答系統。現在,我們將探討如何評估和改善檢索器(Retriever)和閱讀器(Reader)的效能。

評估檢索器

首先,我們來評估檢索器的效能。檢索器的任務是從檔案資料函式庫中檢索出與問題相關的檔案。我們使用 EvalRetrieverPipeline 來評估檢索器的效能。

run_pipeline(pipe, top_k_retriever=3)
print(f"Recall@3: {pipe.eval_retriever.recall:.2f}")

內容解密:

  • run_pipeline 函式用於執行檢索流程,引數 top_k_retriever=3 表示檢索器傳回前 3 個最相關的檔案。
  • pipe.eval_retriever.recall 用於計算檢索器的召回率(Recall),即檢索器正確檢索出的相關檔案比例。

為了進一步評估檢索器的效能,我們建立了一個函式 evaluate_retriever,該函式迴圈多個 top_k_retriever 值,並計算每個值的召回率。

def evaluate_retriever(retriever, topk_values = [1,3,5,10,20]):
    topk_results = {}
    for topk in topk_values:
        # 建立 Pipeline
        p = EvalRetrieverPipeline(retriever)
        # 對測試集中的每個問題-答案對執行檢索
        run_pipeline(p, top_k_retriever=topk)
        # 取得評估指標
        topk_results[topk] = {"recall": p.eval_retriever.recall}
    return pd.DataFrame.from_dict(topk_results, orient="index")

es_topk_df = evaluate_retriever(es_retriever)

內容解密:

  • evaluate_retriever 函式接受一個檢索器和一系列 top_k_retriever 值,傳回一個 DataFrame,其中包含每個 top_k_retriever 值對應的召回率。
  • 我們使用 EvalRetrieverPipeline 建立了一個評估 Pipeline,並對每個 top_k_retriever 值執行檢索流程。

接下來,我們繪製了召回率隨 top_k_retriever 值的變化圖。

def plot_retriever_eval(dfs, retriever_names):
    fig, ax = plt.subplots()
    for df, retriever_name in zip(dfs, retriever_names):
        df.plot(y="recall", ax=ax, label=retriever_name)
    plt.xticks(df.index)
    plt.ylabel("Top-k Recall")
    plt.xlabel("k")
    plt.show()

plot_retriever_eval([es_topk_df], ["BM25"])

內容解密:

  • plot_retriever_eval 函式接受一系列 DataFrame 和對應的檢索器名稱,繪製召回率隨 top_k_retriever 值的變化圖。
  • 從圖中可以看出,當 k=5 時,召回率有一個拐點,之後召回率趨於穩定。

密集段落檢索(DPR)

為了進一步改善檢索器的效能,我們引入了密集段落檢索(DPR)技術。DPR 使用兩個 BERT 模型分別對問題和檔案進行編碼,將它們對映到一個高維向量空間中。

from haystack.retriever.dense import DensePassageRetriever

dpr_retriever = DensePassageRetriever(document_store=document_store,
                                      query_embedding_model="facebook/dpr-question_encoder-single-nq-base",
                                      passage_embedding_model="facebook/dpr-ctx_encoder-single-nq-base",
                                      embed_title=False)

document_store.update_embeddings(retriever=dpr_retriever)

內容解密:

  • DensePassageRetriever 用於建立一個 DPR 檢索器,需要指定檔案資料函式庫、問題編碼模型和檔案編碼模型。
  • document_store.update_embeddings 用於更新檔案資料函式庫中的向量表示。

我們對 DPR 檢索器進行了評估,並與 BM25 檢索器進行了比較。

dpr_topk_df = evaluate_retriever(dpr_retriever)
plot_retriever_eval([es_topk_df, dpr_topk_df], ["BM25", "DPR"])

內容解密:

  • 從圖中可以看出,DPR 檢索器在小 k 值下表現更好,但整體上與 BM25 檢索器相差不大。

評估閱讀器

在抽取式問答中,閱讀器的評估指標主要有兩個:精確匹配(EM)和 F1 分數。

  • 精確匹配(EM):如果預測答案與真實答案完全匹配,則 EM = 1,否則 EM = 0。
  • F1 分數:衡量預測答案與真實答案之間的精確度和召回率的調和平均值。

閱讀器的評估對於問答系統的整體效能至關重要。在後續章節中,我們將探討如何評估和改善閱讀器的效能。

改善我們的問答系統

讓我們來看看如何透過匯入FARM的一些輔助函式並將它們應用到一個簡單的例子中,以瞭解這些評估指標是如何工作的:

from farm.evaluation.squad_evaluation import compute_f1, compute_exact
pred = "about 6000 hours"
label = "6000 hours"
print(f"EM: {compute_exact(label, pred)}")
print(f"F1: {compute_f1(label, pred)}")

輸出結果為:

EM: 0
F1: 0.8

內容解密:

  • compute_exactcompute_f1 是用於計算 EM(Exact Match)和 F1 分數的函式。
  • EM 用於衡量預測答案與真實答案是否完全一致。
  • F1 分數則是預測答案與真實答案之間的相似度指標,根據詞袋模型的 token 層級比較。

在底層實作中,這些函式首先對預測和標籤進行歸一化處理,包括移除標點符號、修正空白字元以及轉換為小寫字母。然後,這些歸一化的字串被標記化為詞袋,之後在 token 層級上計算評估指標。從這個簡單的例子中,我們可以看到 EM 是一個比 F1 分數更嚴格的指標:只要在預測中新增一個 token,就會使 EM 變為零。另一方面,F1 分數可能無法捕捉到真正錯誤的答案。

評估讀取器

為了評估讀取器,我們將建立一個新的流程,其中包含兩個節點:一個讀取器節點和一個評估讀取器的節點。我們將使用 EvalReader 類別來計算對應的 EM 和 F1 分數。

from haystack.eval import EvalAnswers

def evaluate_reader(reader):
    score_keys = ['top_1_em', 'top_1_f1']
    eval_reader = EvalAnswers(skip_incorrect_retrieval=False)
    pipe = Pipeline()
    pipe.add_node(component=reader, name="QAReader", inputs=["Query"])
    pipe.add_node(component=eval_reader, name="EvalReader", inputs=["QAReader"])
    for l in labels_agg:
        doc = document_store.query(l.question, filters={"question_id": [l.origin]})
        _ = pipe.run(query=l.question, documents=doc, labels=l)
    return {k: v for k, v in eval_reader.__dict__.items() if k in score_keys}

reader_eval = {}
reader_eval["Fine-tune on SQuAD"] = evaluate_reader(reader)

內容解密:

  • evaluate_reader 函式用於評估讀取器的效能,計算其在測試資料集上的 EM 和 F1 分數。
  • EvalAnswers 類別用於計算讀取器的預測結果與真實答案之間的 EM 和 F1 分數。
  • skip_incorrect_retrieval=False 確保檢索器總是將上下文傳遞給讀取器。

繪製評估結果

def plot_reader_eval(reader_eval):
    fig, ax = plt.subplots()
    df = pd.DataFrame.from_dict(reader_eval)
    df.plot(kind="bar", ylabel="Score", rot=0, ax=ax)
    ax.set_xticklabels(["EM", "F1"])
    plt.legend(loc='upper left')
    plt.show()

plot_reader_eval(reader_eval)

內容解密:

  • plot_reader_eval 函式用於繪製讀取器評估結果的柱狀圖,直觀地展示 EM 和 F1 分數。

網域適應

雖然在 SQuAD 上進行微調的模型通常能夠很好地泛化到其他領域,但對於 SubjQA 資料集,我們的模型的 EM 和 F1 分數卻遠低於在 SQuAD 上的表現。這種泛化能力的缺乏也在其他擷取式問答資料集中被觀察到,被認為是 Transformer 模型特別容易過擬合 SQuAD 的證據。

為了提高讀取器的效能,我們可以透過在 SubjQA 訓練集上進一步微調我們的 MiniLM 模型來實作網域適應。FARMReader 提供了一個 train() 方法,用於在 SQuAD JSON 格式的資料上進行訓練。

def create_paragraphs(df):
    paragraphs = []
    id2context = dict(zip(df["review_id"], df["context"]))
    for review_id, review in id2context.items():
        qas = []
        review_df = df.query(f"review_id == '{review_id}'")
        id2question = dict(zip(review_df["id"], review_df["question"]))
        # 建構 qas 陣列
        # ...

內容解密:

  • create_paragraphs 函式用於將資料轉換為 SQuAD JSON 格式,建立與每個產品 ID 相關聯的段落陣列。
  • 每個段落元素包含一個上下文(即評論)和一個 qas 陣列,其中包含問題-答案對。