成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

RAG系列:切分優化 - 基于句子余弦距離的語義切分

人工智能
本文介紹了一種更智能的切分方法 - 基于句子余弦距離的語義切分,并通過 langchain-experimental 中的 SemanticChunker 的源碼來帶大家了解了語義切分的實現原理。

引言

傳統的文檔切分方法通常采用基于特定字符和固定長度的切分策略,這種方法雖然實現簡單,但在實際應用中容易割裂完整的語義單元,導致后續的信息檢索與理解受到影響。

相比之下,一種更智能的切分方法是基于句子余弦距離的語義切分。它不再依據特定字符和固定長度進行機械切分,而是對每個句子進行 embedding,以此來計算相鄰句子的余弦距離,再通過算法算出一個相對合理的切分點(某個距離值),最后將不大于該閾值的相鄰句子聚合在一起作為一個文檔塊,從而實現文檔語義切分。

例如 句子_1、句子_2、句子_3 之間的余弦距離都小于該閾值,而 句子_3 與 句子_4 的余弦距離大于該閾值,則在 句子_3 和 句子_4 之間增加切分點,最終的切分結果就是把 句子_1、句子_2、句子_3 聚合在一個文檔塊中,句子_4 在其它的文檔塊中。

實現原理

基于余弦距離的語義切分大致分為以下5個步驟:

圖片

langchain-experimental中的 SemanticChunker[1] 實現了基于余弦距離的語義切分,因此本文我將通過 SemanticChunker 的源碼來帶大家了解語義切分的實現原理。

以下是 SemanticChunker 的初始化參數,后面根據不同步驟所需要的參數來了解這些參數的具體作用。

class SemanticChunker(
    # 向量模型
    embeddings: Embeddings,
    # 向前向后取 buffer_size 個句子一起 embedding
    buffer_size: int = 1, 
    # 是否在元數據添加開始切分的位置(以文檔字符長度計算)
    add_start_index: bool = False,
    # 切分點計算方法
    breakpoint_threshold_type: BreakpointThresholdType = "percentile",
    # 切分點計算閾值
    breakpoint_threshold_amount: float | None = None,
    # 切分后的文檔塊數量
    number_of_chunks: int | None = None,
    # 句子切分規則
    sentence_split_regex: str = r"(?<=[.?!])\s+",
    # 最小文檔塊大小
    min_chunk_size: int | None = None
)

句子切分

這一步是通過特定規則將文檔切分為一個個句子,在 SemanticChunker 中通過參數 sentence_split_regex 來設置規則進行切分,默認值為 r"(?<=[.?!])\s+",這是以英文的句號、問號、感嘆號來進行切分的,而且是對比較規范的英文行文,也就是這三種標點后還跟空白字符的。如果要對中文文檔切分,那就需要將這個正則表達式替換成能切分中文的,例如:r"(?<=[。?!\n])",也就是以中文的句號、問號、感嘆號以及換行符來進行切分。

SemanticChunker的實現源碼如下:

import re

def _get_single_sentences_list(self, text: str) -> List[str]:
    return re.split(self.sentence_split_regex, text)

句子 embedding

這一步是將每個句子進行 embedding,理論上接著就以每個句子 embedding 結果來計算相鄰句子的距離就可以了。但通過實際操作發現對單個句子處理噪音比較大,后續切分的效果并不理想,因此 SemanticChunker 通過 buffer_size 來控制當前句子前、后各取幾個句子組成一組來計算 embedding 并計算余弦距離。例如buffer_size設置為為1(默認值),表示取當前句子前、后各取1個句子組成一組來計算 embedding。

SemanticChunker的實現源碼如下:

首先根據buffer_size得到當前句子的組合。

def combine_sentences(sentences: List[dict], buffer_size: int = 1) -> List[dict]:
    for i inrange(len(sentences)):
        # 創建一個字符串變量來保存連接的句子
        combined_sentence = ""

        # 添加當前句子之前 buffer_size 個句子
        for j inrange(i - buffer_size, i):
            if j >= 0:
                combined_sentence += sentences[j]["sentence"] + " "

        # 添加當前句子
        combined_sentence += sentences[i]["sentence"]

        # 添加當前句子之后 buffer_size 個句子
        for j inrange(i + 1, i + 1 + buffer_size):
            if j < len(sentences):
                combined_sentence += " " + sentences[j]["sentence"]

        # 將合并好的句子存儲在當前的句子 combined_sentence 中
        sentences[i]["combined_sentence"] = combined_sentence

    return sentences

然后根據通過參數 embeddings傳入的向量模型對句子組合進行 embedding。

def _calculate_sentence_distances(self, single_sentences_list: List[str]) -> Tuple[List[float], List[dict]]:
    _sentences = [
        {"sentence": x, "index": i} for i, x in enumerate(single_sentences_list)
    ]
    sentences = combine_sentences(_sentences, self.buffer_size)
    embeddings = self.embeddings.embed_documents(
        [x["combined_sentence"] for x in sentences]
    )
    for i, sentence in enumerate(sentences):
        sentence["combined_sentence_embedding"] = embeddings[i]

    return calculate_cosine_distances(sentences)

計算相鄰句子(組)余弦距離

這一步就是通過計算相鄰句子(組) 的余弦相似度來得到相鄰句子(組) 的余弦距離。

將橫軸記為句子(組)的序號,縱軸為相鄰句子(組) 的余弦距離,就可得到下面類似的圖:

圖片

SemanticChunker的實現源碼如下:

from langchain_community.utils.math import cosine_similarity

defcalculate_cosine_distances(sentences: List[dict]) -> Tuple[List[float], List[dict]]:
    distances = []
    for i inrange(len(sentences) - 1):
        embedding_current = sentences[i]["combined_sentence_embedding"]
        embedding_next = sentences[i + 1]["combined_sentence_embedding"]

        # 計算余弦相似度
        similarity = cosine_similarity([embedding_current], [embedding_next])[0][0]

        # 轉換成余弦距離
        distance = 1 - similarity
        distances.append(distance)

        # 保存余弦距離
        sentences[i]["distance_to_next"] = distance

    # 【可選】最后一個句子的處理
    # sentences[-1]['distance_to_next'] = None  # 或其它默認值

    return distances, sentences

計算切分點

如何計算切分點,SemanticChunker給出了4種方法:

  • percentile: 分位法,默認方法。將所有余弦距離在第 X 分位數的值作為閾值,并在那些余弦距離超過該閾值的位置進行切分。第 X 分位數可通過breakpoint_threshold_amount 設置,默認為 95。還可以通過 number_of_chunks 指定切分后的文檔塊總數量,采用線性插值的方式反向推導出該分位數;

SemanticChunker的實現源碼如下:

import numpy as np

def_calculate_breakpoint_threshold(self, distances: List[float]) -> Tuple[float, List[float]]:
# 第一種方式:指定分位數
return cast(
      float,
      np.percentile(distances, self.breakpoint_threshold_amount),
  ), distances

# 第二種方式:通過 number_of_chunks 反向推導分位數
  x1, y1 = len(distances), 0.0
  x2, y2 = 1.0, 100.0
  x = max(min(self.number_of_chunks, x1), x2)
if x2 == x1:
      y = y2
else:
      y = y1 + ((y2 - y1) / (x2 - x1)) * (x - x1) # 線性插值
  y = min(max(y, 0), 100)
    return cast(
      float,
      np.percentile(distances, y),), 
      distances
  • standard_deviation: 標準差偏離法,是統計學中表示偏離的常規方法,這種方法比較適合正態分布。將所有余弦距離的平均值加上 X 倍的所有余弦距離標準差的值作為閾值,并在那些余弦距離超過該閾值的位置進行切分。X 倍可通過breakpoint_threshold_amount 設置,默認為 3,這是最常用的值;

SemanticChunker的實現源碼如下:

import numpy as np

def _calculate_breakpoint_threshold(self, distances: List[float]) -> Tuple[float, List[float]]:
  return cast(
    float,
    np.mean(distances) + 
      self.breakpoint_threshold_amount * np.std(distances),), 
    distances
  • interquartile: 四分位距法,是統計學中表示偏離的另一種常規方法,這種方法計算分位數,所以數據分布不那么正態問題也不大。將所有余弦距離的平均值加上 X 倍的所有余弦距離四分位距的值作為閾值,并在那些余弦距離超過該閾值的位置進行切分。X 倍可通過breakpoint_threshold_amount 設置,默認為 1.5,也是最常用的值;

SemanticChunker的實現源碼如下:

import numpy as np

def _calculate_breakpoint_threshold(self, distances: List[float]) -> Tuple[float, List[float]]:
  # 取出25分位(下四分位)和75分位(上四分位)的數值
  q1, q3 = np.percentile(distances, [25, 75])

  # 計算兩個分位的差值(四分位距)
  iqr = q3 - q1

  return np.mean(distances) + 
            self.breakpoint_threshold_amount * iqr, distances
  • gradient: 梯度法。首先計算所有余弦距離的變化梯度,變化梯度計算出來后,就可以知道哪個地方余弦距離變化得快,然后將所有變化梯度在第 X 分位數的值作為閾值,并在那些余弦距離超過該閾值的位置進行切分。第 X 分位數可通過breakpoint_threshold_amount 設置,默認為 95。

SemanticChunker的實現源碼如下:

import numpy as np

def _calculate_breakpoint_threshold(self, distances: List[float]) -> Tuple[float, List[float]]:
  # 計算所有余弦距離的變化梯度
  distance_gradient = np.gradient(distances, range(0, len(distances)))
  return cast(
      float,
      np.percentile(distance_gradient,
                    self.breakpoint_threshold_amount)),
      distance_gradient

按切分點切分

通過第4步各方法得到切分點后,就可以按切分點對文檔進行切分(通過設置min_chunk_size控制合并較小的塊),就可得到下面類似的圖(包括切分位置以及切片):

圖片

SemanticChunker的實現源碼如下:

def split_text(self, text: str,) -> List[str]:
      # 計算相鄰句子的余弦距離
      distances, sentences = self._calculate_sentence_distances(single_sentences_list)

      # 計算切分點
      breakpoint_distance_threshold, breakpoint_array = self._calculate_breakpoint_threshold(distances)

      indices_above_thresh = [
          i
          for i, x inenumerate(breakpoint_array)
          if x > breakpoint_distance_threshold
      ]

      chunks = []
      start_index = 0

      # 遍歷切分點來分割句子
      for index in indices_above_thresh:
          end_index = index
          group = sentences[start_index : end_index + 1]
          combined_text = " ".join([d["sentence"] for d in group])
        
          # 通過設置 min_chunk_size 來合并較小的文檔塊
          if (
              self.min_chunk_size isnotNone
              andlen(combined_text) < self.min_chunk_size
          ):
              continue
          chunks.append(combined_text)

          start_index = index + 1

      if start_index < len(sentences):
          combined_text = " ".join([d["sentence"] for d in sentences[start_index:]])
          chunks.append(combined_text)
      return chunks

代碼實踐

原 TypeScript 項目已使用 Python 進行了重構,后續將優先使用 Python 進行代碼實踐和講解。

其中:RAG.libs 中是封裝好的各種不同作用的模塊,如 RAG/libs/text_splitter.py 是封裝好的文檔切分器,RAG/libs/evaluator.py 是封裝好的評估器,因此文中不再貼具體的代碼,如需查看具體代碼實現,請移步到 github 代碼倉庫中查看。

本文完整代碼地址[2]:https://github.com/laixiangran/ai-learn-python/blob/main/RAG/examples/06_semantic_splitting.py

先看下基于句子余弦距離的語義切分的評估結果:

圖片

從評估結果來看,相較于 RecursiveCharacterTextSplitter 的切分方法,在上下文召回率、上下文相關性以及答案準確性都有不同程度的提升。

加載文件

from RAG.libs.file_loader import FileLoader

file_loader = FileLoader(
    file_path="RAG/datas/2024少兒編程教育行業發展趨勢報告.md",
    provider="customMarkdownLoader",
)
documents = file_loader.load()

語義切分

因為我們的文檔只要是中文,因此需要將 sentence_split_regex修改成可對中文切分的規則,如:r"(?<=[。?!\n])"

from langchain_experimental.text_splitter import SemanticChunker
from RAG.libs.embedding import Embedding

# 向量模型
embeddings = Embedding(model="nomic-embed-text", provider="ollama")

# 使用 SemanticChunker 切分
text_splitter = SemanticChunker(
    embeddings=embeddings,
    breakpoint_threshold_type="percentile",
    sentence_split_regex=r"(?<=[。?!\n])",
)
documents = text_splitter.split_documents(documents=documents)

切分后處理

使用SemanticChunker進行切分,會出現較短或者較長的切片。比如通過percentile進行切分后的結果可以看到,最小的文檔塊大小只有 1,最大的文檔塊大小有 3346。因此,為了更好的檢索效果,我們一般需要=對較長的文檔做二次切分、對較短的文檔進行合并和添加標題等其它切分后處理。

count 75 // 總數
mean 731 // 平均值
std 685 // 標準差
min 1 // 最小值
25% 218 // 25分位值
50% 568 // 50分位值
75% 990 // 75分位值
90% 1535 // 90分位值
97% 2577 // 97分位值
99% 2876 // 99分位值
max 3346 // 最大值

將較大的文檔塊進行二次切分、合并較小的塊和添加標題:

from RAG.libs.text_splitter import (
    TextSplitter,
    merge_small_documents,
    add_headers_to_documents,
)

# 將較大的文檔塊進行二次切分
text_splitter = TextSplitter(
    provider="recursiveCharacter",
    chunk_size=500,
    chunk_overlap=0,
)
documents = text_splitter.split_documents(documents)

# 合并較小的塊和添加標題
documents = merge_small_documents(documents, merge_max_length=100)
documents = add_headers_to_documents(documents)

效果評估

from RAG.libs.evaluator import BatchEvaluator

eval_result = BatchEvaluator(
    chat_model=chat_model,
    vector_store=vector_store,
    qa_data=qa_data,
    top_k=3,
    filter=filter,
    output_path=output_path,
)

結語

本文介紹了一種更智能的切分方法 - 基于句子余弦距離的語義切分,并通過 langchain-experimental 中的 SemanticChunker 的源碼來帶大家了解了語義切分的實現原理。

從最后評估結果來看,相較于 RecursiveCharacterTextSplitter 的切分方法,在上下文召回率、上下文相關性以及答案準確性都有不同程度的提升,這說明通過基于句子余弦距離的語義切分方法對文檔切分優化具有一定的可行性,大家可以根據自己的實際情況進一步驗證,歡迎大家留言交流。

引用鏈接

[1] SemanticChunker: https://github.com/langchain-ai/langchain-experimental/blob/main/libs/experimental/langchain_experimental/text_splitter.py#L99

[2] 本文完整代碼地址: https://github.com/laixiangran/ai-learn-python/blob/main/RAG/examples/06_semantic_splitting.py

責任編輯:龐桂玉 來源: 燃哥講AI
相關推薦

2025-06-10 04:30:00

2024-09-04 09:11:42

2022-01-07 14:00:35

分庫分表業務量

2019-11-25 10:12:59

Python技巧工具

2011-08-18 16:03:48

數據切分MySQL

2025-04-02 04:00:00

RAG分塊優化

2025-05-22 06:48:50

RAGAI應用開發框架DeepSeek

2017-07-17 14:45:43

數據庫DB分庫切分策略

2024-09-29 00:00:02

2021-03-17 16:15:55

數據MySQL 架構

2017-08-28 16:40:07

Region切分觸發策略

2017-12-08 10:42:49

HBase切分細節

2017-06-19 16:45:41

數據庫水平切分用戶中心

2024-06-24 14:32:33

2025-05-26 09:57:46

2025-05-07 08:35:11

2023-10-10 14:03:47

swap排序解法

2024-02-05 14:12:37

大模型RAG架構

2011-08-11 18:54:01

數據庫分頁查詢

2022-06-07 14:47:43

飛書智能問答模型
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 视频在线亚洲 | 国产精品色 | 久久久久久久久久久久久久国产 | 久久精品欧美一区二区三区不卡 | 久久精品一区二 | 香蕉久久a毛片 | 国产成人影院 | 免费看黄视频网站 | 久久99深爱久久99精品 | 日韩亚洲一区二区 | 国产日韩欧美在线观看 | 国产视频二区在线观看 | 日韩在线一区二区三区 | 一区二区三区国产 | 国产精品一区二区欧美 | 天天夜干 | 国产欧美精品在线观看 | 亚洲另类春色偷拍在线观看 | 亚洲综合精品 | 成人免费淫片aa视频免费 | 粉嫩av | 亚洲视频一区 | 美女人人操 | 欧美成人猛片aaaaaaa | 亚洲一区二区久久 | 亚洲一区视频在线 | 男女啪啪网址 | 国产一区二区精品在线 | 国产精品精品视频 | 久久伊人影院 | 午夜免费福利电影 | cao在线 | 福利视频一区二区三区 | 日本色高清 | 日韩精品在线免费观看视频 | 亚洲一级在线 | 欧美在线视频观看 | 久久伊人精品 | 逼逼网| 午夜亚洲| 99精品在线观看 |