Haystack 提供兩種評估檢索器的方法:使用內建 eval() 方法和自定義 Pipeline 結合 EvalRetriever。eval() 方法適用於開放域和封閉域問答系統,但對於需要根據特定 ID 篩選的資料集(例如 SubjQA)則需要自定義流程。本文採用自定義 Pipeline 方法,可以更彈性地設定評估指標和查詢流程。除了召回率,平均精確度也是重要的評估指標,它更重視正確答案在結果排序中的位置。為了根據產品評估召回率並匯總結果,我們使用 Pipeline 建構評估流程,每個節點代表一個處理步驟,透過 run() 方法接收輸入並產生輸出。首先,建立 Label 物件列表,包含問題、答案、ID 和後設資料等資訊,然後將這些標籤寫入索引,以便後續評估使用。接著,建立 run_pipeline 函式,將每個問題-答案對饋送到評估管道,並追蹤正確的檢索結果。透過調整 top_k_retriever 引數,可以控制檢索的結果數量,並使用篩選器限制檢索範圍。
在 Haystack 中評估檢索器的方法
在 Haystack 中,有兩種方法可以評估檢索器(retriever):
- 使用檢索器內建的
eval()方法:此方法可用於開放域和封閉域的問答系統,但不適用於像 SubjQA 這樣的資料集,因為每個檔案都與單一產品配對,需要根據產品 ID 進行篩選。 - 建立自定義的
Pipeline,將檢索器與EvalRetriever類別結合:這使得實作自定義指標和查詢流程成為可能。
檢索器評估指標
除了召回率(recall)之外,另一個重要的評估指標是平均精確度(mean average precision, mAP),它獎勵能夠將正確答案排在檔案排名較前面的檢索器。
自定義評估流程
由於需要根據產品評估召回率並在所有產品上匯總,因此選擇第二種方法。每個 Pipeline 節點代表一個類別,該類別透過 run() 方法接收輸入並產生輸出。
PipelineNode 類別
class PipelineNode:
def __init__(self):
self.outgoing_edges = 1
def run(self, **kwargs):
...
return (outputs, "outgoing_edge_name")
EvalRetrieverPipeline 類別
from haystack.pipeline import Pipeline
from haystack.eval import EvalDocuments
class EvalRetrieverPipeline:
def __init__(self, retriever):
self.retriever = retriever
self.eval_retriever = EvalDocuments()
pipe = Pipeline()
pipe.add_node(component=self.retriever, name="ESRetriever", inputs=["Query"])
pipe.add_node(component=self.eval_retriever, name="EvalRetriever", inputs=["ESRetriever"])
self.pipeline = pipe
pipe = EvalRetrieverPipeline(es_retriever)
建立標籤索引
首先,建立一個 Label 物件列表,透過迴圈遍歷測試集中的每個問題,提取匹配的答案和額外的後設資料。
Label 物件
from haystack import Label
labels = []
for i, row in dfs["test"].iterrows():
meta = {"item_id": row["title"], "question_id": row["id"]}
if len(row["answers.text"]):
for answer in row["answers.text"]:
label = Label(
question=row["question"],
answer=answer,
id=i,
origin=row["id"],
meta=meta,
is_correct_answer=True,
is_correct_document=True,
no_answer=False)
labels.append(label)
else:
label = Label(
question=row["question"],
answer="",
id=i,
origin=row["id"],
meta=meta,
is_correct_answer=True,
is_correct_document=True,
no_answer=True)
labels.append(label)
寫入標籤索引
document_store.write_labels(labels, index="label")
print(f"""Loaded {document_store.get_label_count(index="label")} question-answer pairs""")
評估檢索器
建立一個函式,將每個問題-答案對饋送到評估管道,並追蹤正確的檢索結果。
run_pipeline 函式
def run_pipeline(pipeline, top_k_retriever=10, top_k_reader=4):
for l in labels_agg:
_ = pipeline.pipeline.run(
query=l.question,
top_k_retriever=top_k_retriever,
top_k_reader=top_k_reader,
top_k_eval_documents=top_k_retriever,
labels=l,
filters={"item_id": [l.meta["item_id"]], "split": ["test"]})
內容解密:
此函式遍歷所有聚合的標籤,並對每個問題執行檢索和評估。top_k_retriever 和 top_k_reader 引數控制檢索和閱讀的結果數量。filters 引數用於根據產品 ID 和測試集進行篩選。
改善問答系統的檢索與閱讀器評估
在前面的章節中,我們已經瞭解瞭如何建立一個基本的問答系統。現在,我們將探討如何評估和改善檢索器(Retriever)和閱讀器(Reader)的效能。
評估檢索器
首先,我們來評估檢索器的效能。檢索器的任務是從檔案資料函式庫中檢索出與問題相關的檔案。我們使用 EvalRetrieverPipeline 來評估檢索器的效能。
run_pipeline(pipe, top_k_retriever=3)
print(f"Recall@3: {pipe.eval_retriever.recall:.2f}")
內容解密:
run_pipeline函式用於執行檢索流程,引數top_k_retriever=3表示檢索器傳回前 3 個最相關的檔案。pipe.eval_retriever.recall用於計算檢索器的召回率(Recall),即檢索器正確檢索出的相關檔案比例。
為了進一步評估檢索器的效能,我們建立了一個函式 evaluate_retriever,該函式迴圈多個 top_k_retriever 值,並計算每個值的召回率。
def evaluate_retriever(retriever, topk_values = [1,3,5,10,20]):
topk_results = {}
for topk in topk_values:
# 建立 Pipeline
p = EvalRetrieverPipeline(retriever)
# 對測試集中的每個問題-答案對執行檢索
run_pipeline(p, top_k_retriever=topk)
# 取得評估指標
topk_results[topk] = {"recall": p.eval_retriever.recall}
return pd.DataFrame.from_dict(topk_results, orient="index")
es_topk_df = evaluate_retriever(es_retriever)
內容解密:
evaluate_retriever函式接受一個檢索器和一系列top_k_retriever值,傳回一個 DataFrame,其中包含每個top_k_retriever值對應的召回率。- 我們使用
EvalRetrieverPipeline建立了一個評估 Pipeline,並對每個top_k_retriever值執行檢索流程。
接下來,我們繪製了召回率隨 top_k_retriever 值的變化圖。
def plot_retriever_eval(dfs, retriever_names):
fig, ax = plt.subplots()
for df, retriever_name in zip(dfs, retriever_names):
df.plot(y="recall", ax=ax, label=retriever_name)
plt.xticks(df.index)
plt.ylabel("Top-k Recall")
plt.xlabel("k")
plt.show()
plot_retriever_eval([es_topk_df], ["BM25"])
內容解密:
plot_retriever_eval函式接受一系列 DataFrame 和對應的檢索器名稱,繪製召回率隨top_k_retriever值的變化圖。- 從圖中可以看出,當
k=5時,召回率有一個拐點,之後召回率趨於穩定。
密集段落檢索(DPR)
為了進一步改善檢索器的效能,我們引入了密集段落檢索(DPR)技術。DPR 使用兩個 BERT 模型分別對問題和檔案進行編碼,將它們對映到一個高維向量空間中。
from haystack.retriever.dense import DensePassageRetriever
dpr_retriever = DensePassageRetriever(document_store=document_store,
query_embedding_model="facebook/dpr-question_encoder-single-nq-base",
passage_embedding_model="facebook/dpr-ctx_encoder-single-nq-base",
embed_title=False)
document_store.update_embeddings(retriever=dpr_retriever)
內容解密:
DensePassageRetriever用於建立一個 DPR 檢索器,需要指定檔案資料函式庫、問題編碼模型和檔案編碼模型。document_store.update_embeddings用於更新檔案資料函式庫中的向量表示。
我們對 DPR 檢索器進行了評估,並與 BM25 檢索器進行了比較。
dpr_topk_df = evaluate_retriever(dpr_retriever)
plot_retriever_eval([es_topk_df, dpr_topk_df], ["BM25", "DPR"])
內容解密:
- 從圖中可以看出,DPR 檢索器在小
k值下表現更好,但整體上與 BM25 檢索器相差不大。
評估閱讀器
在抽取式問答中,閱讀器的評估指標主要有兩個:精確匹配(EM)和 F1 分數。
- 精確匹配(EM):如果預測答案與真實答案完全匹配,則 EM = 1,否則 EM = 0。
- F1 分數:衡量預測答案與真實答案之間的精確度和召回率的調和平均值。
閱讀器的評估對於問答系統的整體效能至關重要。在後續章節中,我們將探討如何評估和改善閱讀器的效能。
改善我們的問答系統
讓我們來看看如何透過匯入FARM的一些輔助函式並將它們應用到一個簡單的例子中,以瞭解這些評估指標是如何工作的:
from farm.evaluation.squad_evaluation import compute_f1, compute_exact
pred = "about 6000 hours"
label = "6000 hours"
print(f"EM: {compute_exact(label, pred)}")
print(f"F1: {compute_f1(label, pred)}")
輸出結果為:
EM: 0
F1: 0.8
內容解密:
compute_exact和compute_f1是用於計算 EM(Exact Match)和 F1 分數的函式。- EM 用於衡量預測答案與真實答案是否完全一致。
- F1 分數則是預測答案與真實答案之間的相似度指標,根據詞袋模型的 token 層級比較。
在底層實作中,這些函式首先對預測和標籤進行歸一化處理,包括移除標點符號、修正空白字元以及轉換為小寫字母。然後,這些歸一化的字串被標記化為詞袋,之後在 token 層級上計算評估指標。從這個簡單的例子中,我們可以看到 EM 是一個比 F1 分數更嚴格的指標:只要在預測中新增一個 token,就會使 EM 變為零。另一方面,F1 分數可能無法捕捉到真正錯誤的答案。
評估讀取器
為了評估讀取器,我們將建立一個新的流程,其中包含兩個節點:一個讀取器節點和一個評估讀取器的節點。我們將使用 EvalReader 類別來計算對應的 EM 和 F1 分數。
from haystack.eval import EvalAnswers
def evaluate_reader(reader):
score_keys = ['top_1_em', 'top_1_f1']
eval_reader = EvalAnswers(skip_incorrect_retrieval=False)
pipe = Pipeline()
pipe.add_node(component=reader, name="QAReader", inputs=["Query"])
pipe.add_node(component=eval_reader, name="EvalReader", inputs=["QAReader"])
for l in labels_agg:
doc = document_store.query(l.question, filters={"question_id": [l.origin]})
_ = pipe.run(query=l.question, documents=doc, labels=l)
return {k: v for k, v in eval_reader.__dict__.items() if k in score_keys}
reader_eval = {}
reader_eval["Fine-tune on SQuAD"] = evaluate_reader(reader)
內容解密:
evaluate_reader函式用於評估讀取器的效能,計算其在測試資料集上的 EM 和 F1 分數。EvalAnswers類別用於計算讀取器的預測結果與真實答案之間的 EM 和 F1 分數。skip_incorrect_retrieval=False確保檢索器總是將上下文傳遞給讀取器。
繪製評估結果
def plot_reader_eval(reader_eval):
fig, ax = plt.subplots()
df = pd.DataFrame.from_dict(reader_eval)
df.plot(kind="bar", ylabel="Score", rot=0, ax=ax)
ax.set_xticklabels(["EM", "F1"])
plt.legend(loc='upper left')
plt.show()
plot_reader_eval(reader_eval)
內容解密:
plot_reader_eval函式用於繪製讀取器評估結果的柱狀圖,直觀地展示 EM 和 F1 分數。
網域適應
雖然在 SQuAD 上進行微調的模型通常能夠很好地泛化到其他領域,但對於 SubjQA 資料集,我們的模型的 EM 和 F1 分數卻遠低於在 SQuAD 上的表現。這種泛化能力的缺乏也在其他擷取式問答資料集中被觀察到,被認為是 Transformer 模型特別容易過擬合 SQuAD 的證據。
為了提高讀取器的效能,我們可以透過在 SubjQA 訓練集上進一步微調我們的 MiniLM 模型來實作網域適應。FARMReader 提供了一個 train() 方法,用於在 SQuAD JSON 格式的資料上進行訓練。
def create_paragraphs(df):
paragraphs = []
id2context = dict(zip(df["review_id"], df["context"]))
for review_id, review in id2context.items():
qas = []
review_df = df.query(f"review_id == '{review_id}'")
id2question = dict(zip(review_df["id"], review_df["question"]))
# 建構 qas 陣列
# ...
內容解密:
create_paragraphs函式用於將資料轉換為 SQuAD JSON 格式,建立與每個產品 ID 相關聯的段落陣列。- 每個段落元素包含一個上下文(即評論)和一個 qas 陣列,其中包含問題-答案對。