RAG系列:切分優化 - 基于句子余弦距離的語義切分
引言
傳統的文檔切分方法通常采用基于特定字符和固定長度的切分策略,這種方法雖然實現簡單,但在實際應用中容易割裂完整的語義單元,導致后續的信息檢索與理解受到影響。
相比之下,一種更智能的切分方法是基于句子余弦距離的語義切分。它不再依據特定字符和固定長度進行機械切分,而是對每個句子進行 embedding,以此來計算相鄰句子的余弦距離,再通過算法算出一個相對合理的切分點(某個距離值),最后將不大于該閾值的相鄰句子聚合在一起作為一個文檔塊,從而實現文檔語義切分。
例如 句子_1、句子_2、句子_3 之間的余弦距離都小于該閾值,而 句子_3 與 句子_4 的余弦距離大于該閾值,則在 句子_3 和 句子_4 之間增加切分點,最終的切分結果就是把 句子_1、句子_2、句子_3 聚合在一個文檔塊中,句子_4 在其它的文檔塊中。
實現原理
基于余弦距離的語義切分大致分為以下5個步驟:
langchain-experimental
中的 SemanticChunker[1] 實現了基于余弦距離的語義切分,因此本文我將通過 SemanticChunker
的源碼來帶大家了解語義切分的實現原理。
以下是 SemanticChunker
的初始化參數,后面根據不同步驟所需要的參數來了解這些參數的具體作用。
class SemanticChunker(
# 向量模型
embeddings: Embeddings,
# 向前向后取 buffer_size 個句子一起 embedding
buffer_size: int = 1,
# 是否在元數據添加開始切分的位置(以文檔字符長度計算)
add_start_index: bool = False,
# 切分點計算方法
breakpoint_threshold_type: BreakpointThresholdType = "percentile",
# 切分點計算閾值
breakpoint_threshold_amount: float | None = None,
# 切分后的文檔塊數量
number_of_chunks: int | None = None,
# 句子切分規則
sentence_split_regex: str = r"(?<=[.?!])\s+",
# 最小文檔塊大小
min_chunk_size: int | None = None
)
句子切分
這一步是通過特定規則將文檔切分為一個個句子,在 SemanticChunker
中通過參數 sentence_split_regex
來設置規則進行切分,默認值為 r"(?<=[.?!])\s+"
,這是以英文的句號、問號、感嘆號來進行切分的,而且是對比較規范的英文行文,也就是這三種標點后還跟空白字符的。如果要對中文文檔切分,那就需要將這個正則表達式替換成能切分中文的,例如:r"(?<=[。?!\n])"
,也就是以中文的句號、問號、感嘆號以及換行符來進行切分。
SemanticChunker
的實現源碼如下:
import re
def _get_single_sentences_list(self, text: str) -> List[str]:
return re.split(self.sentence_split_regex, text)
句子 embedding
這一步是將每個句子進行 embedding,理論上接著就以每個句子 embedding 結果來計算相鄰句子的距離就可以了。但通過實際操作發現對單個句子處理噪音比較大,后續切分的效果并不理想,因此 SemanticChunker
通過 buffer_size
來控制當前句子前、后各取幾個句子組成一組來計算 embedding 并計算余弦距離。例如buffer_size
設置為為1(默認值),表示取當前句子前、后各取1個句子組成一組來計算 embedding。
SemanticChunker
的實現源碼如下:
首先根據buffer_size
得到當前句子的組合。
def combine_sentences(sentences: List[dict], buffer_size: int = 1) -> List[dict]:
for i inrange(len(sentences)):
# 創建一個字符串變量來保存連接的句子
combined_sentence = ""
# 添加當前句子之前 buffer_size 個句子
for j inrange(i - buffer_size, i):
if j >= 0:
combined_sentence += sentences[j]["sentence"] + " "
# 添加當前句子
combined_sentence += sentences[i]["sentence"]
# 添加當前句子之后 buffer_size 個句子
for j inrange(i + 1, i + 1 + buffer_size):
if j < len(sentences):
combined_sentence += " " + sentences[j]["sentence"]
# 將合并好的句子存儲在當前的句子 combined_sentence 中
sentences[i]["combined_sentence"] = combined_sentence
return sentences
然后根據通過參數 embeddings
傳入的向量模型對句子組合進行 embedding。
def _calculate_sentence_distances(self, single_sentences_list: List[str]) -> Tuple[List[float], List[dict]]:
_sentences = [
{"sentence": x, "index": i} for i, x in enumerate(single_sentences_list)
]
sentences = combine_sentences(_sentences, self.buffer_size)
embeddings = self.embeddings.embed_documents(
[x["combined_sentence"] for x in sentences]
)
for i, sentence in enumerate(sentences):
sentence["combined_sentence_embedding"] = embeddings[i]
return calculate_cosine_distances(sentences)
計算相鄰句子(組)余弦距離
這一步就是通過計算相鄰句子(組) 的余弦相似度來得到相鄰句子(組) 的余弦距離。
將橫軸記為句子(組)的序號,縱軸為相鄰句子(組) 的余弦距離,就可得到下面類似的圖:
SemanticChunker
的實現源碼如下:
from langchain_community.utils.math import cosine_similarity
defcalculate_cosine_distances(sentences: List[dict]) -> Tuple[List[float], List[dict]]:
distances = []
for i inrange(len(sentences) - 1):
embedding_current = sentences[i]["combined_sentence_embedding"]
embedding_next = sentences[i + 1]["combined_sentence_embedding"]
# 計算余弦相似度
similarity = cosine_similarity([embedding_current], [embedding_next])[0][0]
# 轉換成余弦距離
distance = 1 - similarity
distances.append(distance)
# 保存余弦距離
sentences[i]["distance_to_next"] = distance
# 【可選】最后一個句子的處理
# sentences[-1]['distance_to_next'] = None # 或其它默認值
return distances, sentences
計算切分點
如何計算切分點,SemanticChunker
給出了4種方法:
- percentile: 分位法,默認方法。將所有余弦距離在第 X 分位數的值作為閾值,并在那些余弦距離超過該閾值的位置進行切分。第 X 分位數可通過
breakpoint_threshold_amount
設置,默認為 95。還可以通過number_of_chunks
指定切分后的文檔塊總數量,采用線性插值的方式反向推導出該分位數;
SemanticChunker
的實現源碼如下:
import numpy as np
def_calculate_breakpoint_threshold(self, distances: List[float]) -> Tuple[float, List[float]]:
# 第一種方式:指定分位數
return cast(
float,
np.percentile(distances, self.breakpoint_threshold_amount),
), distances
# 第二種方式:通過 number_of_chunks 反向推導分位數
x1, y1 = len(distances), 0.0
x2, y2 = 1.0, 100.0
x = max(min(self.number_of_chunks, x1), x2)
if x2 == x1:
y = y2
else:
y = y1 + ((y2 - y1) / (x2 - x1)) * (x - x1) # 線性插值
y = min(max(y, 0), 100)
return cast(
float,
np.percentile(distances, y),),
distances
- standard_deviation: 標準差偏離法,是統計學中表示偏離的常規方法,這種方法比較適合正態分布。將所有余弦距離的平均值加上 X 倍的所有余弦距離標準差的值作為閾值,并在那些余弦距離超過該閾值的位置進行切分。X 倍可通過
breakpoint_threshold_amount
設置,默認為 3,這是最常用的值;
SemanticChunker
的實現源碼如下:
import numpy as np
def _calculate_breakpoint_threshold(self, distances: List[float]) -> Tuple[float, List[float]]:
return cast(
float,
np.mean(distances) +
self.breakpoint_threshold_amount * np.std(distances),),
distances
- interquartile: 四分位距法,是統計學中表示偏離的另一種常規方法,這種方法計算分位數,所以數據分布不那么正態問題也不大。將所有余弦距離的平均值加上 X 倍的所有余弦距離四分位距的值作為閾值,并在那些余弦距離超過該閾值的位置進行切分。X 倍可通過
breakpoint_threshold_amount
設置,默認為 1.5,也是最常用的值;
SemanticChunker
的實現源碼如下:
import numpy as np
def _calculate_breakpoint_threshold(self, distances: List[float]) -> Tuple[float, List[float]]:
# 取出25分位(下四分位)和75分位(上四分位)的數值
q1, q3 = np.percentile(distances, [25, 75])
# 計算兩個分位的差值(四分位距)
iqr = q3 - q1
return np.mean(distances) +
self.breakpoint_threshold_amount * iqr, distances
- gradient: 梯度法。首先計算所有余弦距離的變化梯度,變化梯度計算出來后,就可以知道哪個地方余弦距離變化得快,然后將所有變化梯度在第 X 分位數的值作為閾值,并在那些余弦距離超過該閾值的位置進行切分。第 X 分位數可通過
breakpoint_threshold_amount
設置,默認為 95。
SemanticChunker
的實現源碼如下:
import numpy as np
def _calculate_breakpoint_threshold(self, distances: List[float]) -> Tuple[float, List[float]]:
# 計算所有余弦距離的變化梯度
distance_gradient = np.gradient(distances, range(0, len(distances)))
return cast(
float,
np.percentile(distance_gradient,
self.breakpoint_threshold_amount)),
distance_gradient
按切分點切分
通過第4步各方法得到切分點后,就可以按切分點對文檔進行切分(通過設置min_chunk_size
控制合并較小的塊),就可得到下面類似的圖(包括切分位置以及切片):
SemanticChunker
的實現源碼如下:
def split_text(self, text: str,) -> List[str]:
# 計算相鄰句子的余弦距離
distances, sentences = self._calculate_sentence_distances(single_sentences_list)
# 計算切分點
breakpoint_distance_threshold, breakpoint_array = self._calculate_breakpoint_threshold(distances)
indices_above_thresh = [
i
for i, x inenumerate(breakpoint_array)
if x > breakpoint_distance_threshold
]
chunks = []
start_index = 0
# 遍歷切分點來分割句子
for index in indices_above_thresh:
end_index = index
group = sentences[start_index : end_index + 1]
combined_text = " ".join([d["sentence"] for d in group])
# 通過設置 min_chunk_size 來合并較小的文檔塊
if (
self.min_chunk_size isnotNone
andlen(combined_text) < self.min_chunk_size
):
continue
chunks.append(combined_text)
start_index = index + 1
if start_index < len(sentences):
combined_text = " ".join([d["sentence"] for d in sentences[start_index:]])
chunks.append(combined_text)
return chunks
代碼實踐
原 TypeScript 項目已使用 Python 進行了重構,后續將優先使用 Python 進行代碼實踐和講解。
其中:RAG.libs 中是封裝好的各種不同作用的模塊,如 RAG/libs/text_splitter.py 是封裝好的文檔切分器,RAG/libs/evaluator.py 是封裝好的評估器,因此文中不再貼具體的代碼,如需查看具體代碼實現,請移步到 github 代碼倉庫中查看。
本文完整代碼地址[2]:https://github.com/laixiangran/ai-learn-python/blob/main/RAG/examples/06_semantic_splitting.py
先看下基于句子余弦距離的語義切分的評估結果:
從評估結果來看,相較于 RecursiveCharacterTextSplitter
的切分方法,在上下文召回率、上下文相關性以及答案準確性都有不同程度的提升。
加載文件
from RAG.libs.file_loader import FileLoader
file_loader = FileLoader(
file_path="RAG/datas/2024少兒編程教育行業發展趨勢報告.md",
provider="customMarkdownLoader",
)
documents = file_loader.load()
語義切分
因為我們的文檔只要是中文,因此需要將 sentence_split_regex
修改成可對中文切分的規則,如:r"(?<=[。?!\n])"
。
from langchain_experimental.text_splitter import SemanticChunker
from RAG.libs.embedding import Embedding
# 向量模型
embeddings = Embedding(model="nomic-embed-text", provider="ollama")
# 使用 SemanticChunker 切分
text_splitter = SemanticChunker(
embeddings=embeddings,
breakpoint_threshold_type="percentile",
sentence_split_regex=r"(?<=[。?!\n])",
)
documents = text_splitter.split_documents(documents=documents)
切分后處理
使用SemanticChunker
進行切分,會出現較短或者較長的切片。比如通過percentile
進行切分后的結果可以看到,最小的文檔塊大小只有 1,最大的文檔塊大小有 3346。因此,為了更好的檢索效果,我們一般需要=對較長的文檔做二次切分、對較短的文檔進行合并和添加標題等其它切分后處理。
count 75 // 總數
mean 731 // 平均值
std 685 // 標準差
min 1 // 最小值
25% 218 // 25分位值
50% 568 // 50分位值
75% 990 // 75分位值
90% 1535 // 90分位值
97% 2577 // 97分位值
99% 2876 // 99分位值
max 3346 // 最大值
將較大的文檔塊進行二次切分、合并較小的塊和添加標題:
from RAG.libs.text_splitter import (
TextSplitter,
merge_small_documents,
add_headers_to_documents,
)
# 將較大的文檔塊進行二次切分
text_splitter = TextSplitter(
provider="recursiveCharacter",
chunk_size=500,
chunk_overlap=0,
)
documents = text_splitter.split_documents(documents)
# 合并較小的塊和添加標題
documents = merge_small_documents(documents, merge_max_length=100)
documents = add_headers_to_documents(documents)
效果評估
from RAG.libs.evaluator import BatchEvaluator
eval_result = BatchEvaluator(
chat_model=chat_model,
vector_store=vector_store,
qa_data=qa_data,
top_k=3,
filter=filter,
output_path=output_path,
)
結語
本文介紹了一種更智能的切分方法 - 基于句子余弦距離的語義切分,并通過 langchain-experimental
中的 SemanticChunker
的源碼來帶大家了解了語義切分的實現原理。
從最后評估結果來看,相較于 RecursiveCharacterTextSplitter
的切分方法,在上下文召回率、上下文相關性以及答案準確性都有不同程度的提升,這說明通過基于句子余弦距離的語義切分方法對文檔切分優化具有一定的可行性,大家可以根據自己的實際情況進一步驗證,歡迎大家留言交流。
引用鏈接
[1]
SemanticChunker: https://github.com/langchain-ai/langchain-experimental/blob/main/libs/experimental/langchain_experimental/text_splitter.py#L99
[2]
本文完整代碼地址: https://github.com/laixiangran/ai-learn-python/blob/main/RAG/examples/06_semantic_splitting.py