成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

RAGFlow框架優化經驗分享(附代碼):圖文識別+動態分塊 、API調優+源碼修改

開發
這篇以 RAGFlow 框架為例,針對上述后三個問題結合目前團隊實踐經驗,給各位做個分享,大家辨證參考。

在這個過程中很高興能了解到很多之前沒涉獵過的行業和場景,其中經常被集中問到的問題大概有四個:中小企業做RAG知識庫落地選擇框架哪個比較好?如果選擇 RAGFlow 如何進行定制化開發?如何對文檔中的圖片進行識別檢索?如何對復雜文檔進行動態分塊等。

圖片

關于選擇什么框架的問題,仁者見仁,需要綜合對比的盆友可以參考我之前發的那篇文章企業RAG落地避坑指南:自主開發 vs 三大框架,核心配置與選型全解析。這篇以 RAGFlow 框架為例,針對上述后三個問題結合目前團隊實踐經驗,給各位做個分享,大家辨證參考。

畢竟,RAG并沒有“一招鮮”的神奇魔法,傳說那幾個大廠手里掌握的RAG”核心技術“,私以為也是經過了必要且復雜的“策略優化-管道設計-訓練-調優-發布”等專業開發流程,不過成熟的開源應用框架,無疑是更有想象空間的社會化大創新。

注:本篇會比較偏開發導向,非技術向盆友選擇性翻翻就好。

以下,enjoy:

1、優化實施路線圖

以上篇介紹的機械加工行業的維保場景為例RAGFlow+DeepSeek-R1:14b落地案例分享(足夠詳細):機加工行業設備維保場景,推薦先采用官方的 Python API 進行合理的優化配置后,再修改項目源碼,最后根據業務目標做必要的高級優化拓展。此處附上一張個人項目實施過程中積累的些優化要點,供各位參考:

圖片

1.1 階段一:API 優化配置

配置不同文檔類型的解析策略

調整檢索參數優化語義搜索質量

定制大模型提示詞以適應機械行業特點

1.2 階段二:基礎源碼修改

實現專業術語處理模塊

開發查詢路由機制

增加上下文增強功能

1.3 階段三:高級優化擴展

實現多級索引結構

開發高性能緩存機制

添加查詢日志分析系統

2、6 個官方 Python API 優化全解析

根據 RAGFlow 官方 Python API 文檔,為大家完整梳理了所有可進行 API 優化的模塊和參數。這些 API 調優可以不修改源碼,直接通過參數配置實現性能提升。

建議在看完之后還是去官網看下原文,親自動手試過一遍會有不一樣的體感。原文出處:

圖片

https://ragflow.io/docs/dev/python_api_reference

2.1 數據集管理

創建數據集 create_dataset

RAGFlow.create_dataset(
    name: str,
    avatar: str = "",
    description: str = "",
    embedding_model: str = "BAAI/bge-large-zh-v1.5",
    language: str = "English",
    permission: str = "me", 
    chunk_method: str = "naive",
    parser_config: DataSet.ParserConfig = None
)

優化參數:

  • embedding_model: 選擇合適的嵌入模型,影響檢索質量

中文場景推薦: "BAAI/bge-large-zh-v1.5"

英文場景可選: "BAAI/bge-large-en-v1.5"

  • language: 選擇與文檔匹配的語言
  • chunk_method: 關鍵參數,根據文檔類型選擇最佳分塊策略

"naive": 通用文檔

"paper": 論文/設備手冊

"book": 結構化書籍

"table": 表格數據

"qa": 問答格式文檔

"picture": 圖片文檔

"one": 整個文檔作為一個塊

"knowledge_graph": 知識圖譜

  • parser_config: 精細調整解析配置

chunk_token_num: 控制分塊大小 

delimiter: 自定義分隔符 

layout_recognize: 是否啟用布局識別 

raptor: 高級解析選項

更新數據集 DataSet.update

DataSet.update(update_message : dict)

優化參數:

embedding_model: 更換更適合的嵌入模型 

chunk_method: 調整分塊策略

 meta_fields: 更新元數據字段

2.2 文件管理 (FILE MANAGEMENT)

上傳文檔 DataSet.upload_documents

DataSet.upload_documents(document_list : list[dict])

優化參數:

display_name: 文件顯示名,方便檢索與管理 

blob: 文件內容

更新文檔 Document.update

Document.update(update_message : dict)

優化參數:

  • chunk_method: 文檔分塊方法 
  • parser_config: 文檔解析配置

對于圖文文檔,可設置: "layout_recognize": True

對于表格文檔,可設置: "html4excel": True

解析文檔 

DataSet.async_parse_documents(document_ids : list[str])

用于觸發文檔解析流程,支持批量處理。

2.3 分塊管理 (CHUNK MANAGEMENT)

添加分塊 Document.add_chunk

Document.add_chunk(content: str, important_keywords: list[str] = [])

優化參數:

important_keywords: 關鍵詞標注,增強檢索相關性

更新分塊 Chunk.update

Chunk.update(update_message : dict)

優化參數:

content: 更新分塊內容 

important_keywords: 更新關鍵詞

 available: 控制分塊可用性

檢索 RAGFlow.retrieve(關鍵API)

RAGFlow.retrieve(
    question: str = "",
    dataset_ids: list[str] = None,
    document_ids: list[str] = None,
    page: int = 1,
    page_size: int = 30,
    similarity_threshold: float = 0.2,
    vector_similarity_weight: float = 0.3,
    top_k: int = 1024,
    rerank_id: str = None,
    keyword: bool = False,
    highlight: bool = False
)

優化參數 (最重要的檢索相關參數):

  • similarity_threshold: 相似度閾值,影響召回范圍
  •  vector_similarity_weight: 向量相似度權重與關鍵詞匹配權重的比例

設置為 0-1 之間,值越大向量權重越高

工業領域建議參考0.3-0.5,平衡專業術語與語義理解

  • top_k: 參與向量檢索的 chunk 數量,影響檢索范圍 
  • rerank_id: 重排序模型 ID,提升檢索精度 
  • keyword: 開啟關鍵詞匹配,對專業領域極其有用 
  • highlight: 高亮匹配內容,幫助理解匹配原因

2.4 聊天助手管理 

創建聊天助手 RAGFlow.create_chat

RAGFlow.create_chat(
    name: str, 
    avatar: str = "", 
    dataset_ids: list[str] = [], 
    llm: Chat.LLM = None, 
    prompt: Chat.Prompt = None
)

優化參數:

  • llm: LLM 模型配置

model_name: 模型名稱 

temperature: 溫度,影響創造性 

top_p: 詞匯采樣范圍 

presence_penalty: 重復懲罰 

frequency_penalty: 頻率懲罰 

max_token: 最大輸出 token 數

  • prompt: 提示詞配置

similarity_threshold: 相似度閾值 

keywords_similarity_weight: 關鍵詞相似度權重 

top_n: 提供給 LLM 的 chunk 數量 

rerank_model: 重排序模型 

top_k: 重排序參與的候選數量 empty_response: 無匹配時的回復 

show_quote: 是否顯示引用來源 

prompt: 系統提示詞內容

更新聊天助手 Chat.update

Chat.update(update_message : dict)

優化參數: 同 create_chat 中的參數

2.5 會話管理 (SESSION MANAGEMENT)

創建會話 Chat.create_session

Chat.create_session(name: str = "New session")

提問 Session.ask

Session.ask(question: str = "", stream: bool = False, **kwargs)

優化參數:

stream: 流式輸出,提升用戶體驗

**kwargs: 可傳遞給 prompt 中定義的變量

2.6 代理管理 (AGENT MANAGEMENT)

創建代理會話 Agent.create_session

Agent.create_session(id, rag, **kwargs)

代理提問 Session.ask

Session.ask(question: str = "", stream: bool = False)

與普通會話的ask方法類似。

3、調整項目源碼思路參考

3.1 專業術語處理

需要在檢索引擎層面添加工業領域同義詞和術語映射:

# 需要修改源碼的示例邏輯
class CustomTerminologyProcessor:
    def __init__(self, terminology_mapping):
        self.terminology_mapping = terminology_mapping  # 同義詞映射表


    def process_query(self, query):
        # 專業術語標準化
        # 車間俚語轉換為標準術語
        processed_query = query
        for slang, standard_term in self.terminology_mapping.items():
            processed_query = processed_query.replace(slang, standard_term)
        return processed_query

修改點:

在查詢預處理階段添加定制的術語處理模塊

需要在 RAGFlow 的查詢管道中修改源碼添加此功能

3.2 多級索引結構實現

需要定制 Milvus 索引策略,實現基礎索引層和語義索引層的混合索引:

# 這部分需要修改源碼,以下是概念性代碼
class CustomHybridIndexBuilder:
    def __init__(self, vector_db_client):
        self.client = vector_db_client


    def create_scalar_indices(self, collection_name, fields):
        # 創建設備編號、故障代碼等標量索引
        for field in fields:
            self.client.create_index(collection_name, field, "scalar")


    def create_vector_indices(self, collection_name, fields):
        # 創建向量索引
        for field in fields:
            self.client.create_index(collection_name, field, {"index_type": "HNSW", "params": {"M": 16, "efConstruction": 200}})

修改點:

修改 RAGFlow 的索引構建模塊擴展

 Milvus 客戶端接口以支持多索引策略

3.3 查詢路由設計

需要實現定制化的查詢路由邏輯,識別不同類型的查詢并路由到最合適的檢索通道:

# 查詢路由器 - 需要源碼修改實現
class QueryRouter:
    def route_query(self, query_text):
        if self._is_equipment_code(query_text):
            return "exact_match", {"field": "equipment_code"}
        elif self._is_fault_code(query_text):
            return "exact_match", {"field": "fault_code"}
        elif self._is_parameter_query(query_text):
            return "parameter_lookup", {"field": "parameter_name"}
        else:
            return "semantic_search", {"model": "embedding_model"}

修改點:

在 RAGFlow 的查詢處理流程中添加查詢分類和路由機制

實現針對不同查詢類型的專用處理通道

3.4 上下文增強機制

增加查詢上下文增強,融入設備信息、歷史記錄等:

# 上下文增強器 - 需要修改源碼實現
class ContextEnhancer:
    def enhance_query(self, query, session_history, equipment_metadata=None):
        # 添加設備上下文信息
        if equipment_metadata:
            query_context = f"設備型號: {equipment_metadata['model']}, 生產年份: {equipment_metadata['year']}\n"
            query_context += query


        # 添加歷史查詢信息
        if session_history:
            relevant_history = self._extract_relevant_history(session_history, query)
            query_context = f"參考歷史信息: {relevant_history}\n" + query_context


        return query_context

修改點:

修改會話管理模塊,實現會話狀態跟蹤

增加設備元數據關聯機制

在查詢處理流程中加入上下文增強步驟

4、圖文結合文檔處理方案

依然是兩種方案,直接使用RAGFlow API方案優勢是更簡單,使用現有功能,無需額外的模型調用,也能夠直接顯示原始圖片,視覺效果更好,處理速度更快,不依賴外部 API,當然成本也無疑更低。

但使用一個多模態模型進行預處理的方案優勢也很明顯,圖片內容被轉換為文本,便于向量化和語義搜索,也可以依托多模態模型的能力,提供更豐富的圖片內容解釋。

圖片

目前實際測試下來,采用兩種方案的組合,效果更加穩定。

4.1 使用多模態預處理生成圖片描述

# 使用多模態模型生成圖片描述
processor = MultimodalDocumentProcessor(api_key="YOUR_API_KEY")
enhanced_docs = processor.process_pdf("設備手冊.pdf")

4.2 使用 RAGFlow 處理和存儲原始圖片

# 配置保留圖片的數據集
dataset = rag_object.create_dataset(
    name="圖文設備手冊",
    chunk_method="paper",
    parser_cnotallow={"layout_recognize": True}
)


# 上傳原始PDF文檔
with open("設備手冊.pdf", "rb") as f:
    dataset.upload_documents([{"display_name": "設備手冊.pdf", "blob": f.read()}])

4.3 創建能夠提供文本描述和圖片引用的助手

assistant = rag_object.create_chat(
    name="圖文設備專家",
    dataset_ids=[dataset.id],
    prompt=Chat.Prompt(
        prompt="""你是設備維修專家。回答時,請同時提供:
        1. 文字描述解釋故障和解決方案
        2. 引用相關圖片,包括圖片描述
        3. 告訴用戶可以參考哪些圖片獲取更多信息
        {knowledge}"""
    )
)

4.4 源碼修改的一些建議

增強圖片提取和處理:

修改 PDF 解析器,更準確地綁定文本和相關圖片

增加圖片內容分析功能,自動標注圖片類型(如"故障圖"、"結構圖"等)

實現圖文混合索引:

為圖片創建特殊索引,支持通過圖片內容或相關文本檢索圖片

在檢索結果中包含圖片 URL 或直接嵌入圖片

改進響應生成:

修改聊天助手的響應生成邏輯,自動識別圖片引用

在生成的回答中包含相關圖片或圖片鏈接

5、動態分塊策略參考

RAGFlow的chunk_method參數是在數據集級別或文檔級別設置的,不支持在單個文檔內部動態切換不同的分塊策略。但實際情況是,同一文檔中的不同部分可能需要不同的處理方式,比如針對段落、圖片、表格、圖表等,使用單一的分塊策略很難同時兼顧所有這些內容類型的特點。

5.1 4種分塊策略對比

源碼修改方案:

修改 RAGFlow 的文檔解析器,使其能夠識別文檔中的不同部分并應用不同的分塊策略,但這需要深入修改 RAGFlow 的核心處理邏輯,如果沒有深入理解RAGFlow的全局代碼,建議不要這么做。

文檔預處理方案:

在上傳到 RAGFlow 前預處理文檔,將其拆分成不同類型的子文檔。例如,將設備手冊拆分為純文本部分、表格部分、圖文部分等然后分別上傳到不同的數據集,每個數據集使用適合的分塊策略。

這個方案優點是,可以充分利用RAGFlow針對不同內容類型的專門分塊策略,但問題也很明顯,就是文檔上下文被拆分,可能影響整體理解。

自定義分塊方案:

不使用RAGFlow 的自動分塊,而是手動控制分塊,使用chunk_method="one"將整個文檔作為一個塊導入,然后使用自定義邏輯創建更細粒度的分塊。這種做法無疑可以實現最精細的控制、保留文檔完整性,當然缺點就是實現上會相對麻煩。

混合模型方案:

創建多個使用不同分塊策略的數據集,將同一文檔上傳到所有這些數據集,在檢索時查詢所有數據集并合并結果。這種方法保持文檔完整性,但創建多個使用不同分塊策略的副本,會造成存儲冗余,檢索時需要合并多個結果集。

5.2 推薦自定義方案

這種方法使用RAGFlow的API但完全控制分塊過程,最靈活且無需修改源碼。

圖片

完整保留原始文檔:

使用 chunk_method="one"將文檔整體上傳保留文檔的完整性和上下文關系

自定義內容識別:

使用 PyMuPDF 識別文檔中的不同內容類型:

文本、段落、表格內容圖片及其相關描述、章節標題和結構

動態創建精細分塊:

文本塊: 基于段落和語義分界

表格塊: 保留表格結構和行列關系

圖文塊: 關聯圖片和周圍的描述文本

添加分塊類型標記:

每個分塊添加類型標識符[text], [table], [image]等

方便檢索時區分不同類型的內容

豐富關鍵詞提取:

為每個分塊提取相關關鍵詞

保留章節上下文信息


from ragflow_sdk import RAGFlow
import fitz  # PyMuPDF
import re
import pandas as pd
import io
import logging
from typing import List, Dict, Any, Tuple


logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)


class DynamicChunker:
    """針對混合格式文檔的自定義分塊器"""


    def __init__(self, api_key: str, base_url: str):
        self.rag_object = RAGFlow(api_key=api_key, base_url=base_url)


    def process_document(self, pdf_path: str, dataset_name: str = None) -> str:
        """處理PDF文檔并實現自定義分塊"""
        # 1. 創建或獲取數據集
        if dataset_name:
            datasets = self.rag_object.list_datasets(name=dataset_name)
            if datasets:
                dataset = datasets[0]
                logger.info(f"使用現有數據集: {dataset.name}, ID: {dataset.id}")
            else:
                dataset = self.rag_object.create_dataset(
                    name=dataset_name,
                    embedding_model="BAAI/bge-large-zh-v1.5",
                    language="Chinese",
                    chunk_method="one"  # 使用"one"方式,保持文檔完整性
                )
                logger.info(f"創建新數據集: {dataset.name}, ID: {dataset.id}")
        else:
            dataset = self.rag_object.create_dataset(
                name="混合文檔庫",
                embedding_model="BAAI/bge-large-zh-v1.5",
                language="Chinese",
                chunk_method="one"
            )
            logger.info(f"創建新數據集: {dataset.name}, ID: {dataset.id}")


        # 2. 上傳文檔
        with open(pdf_path, "rb") as f:
            document_blob = f.read()


        docs = dataset.upload_documents([{
            "display_name": pdf_path.split("/")[-1],
            "blob": document_blob
        }])


        if not docs:
            logger.error("文檔上傳失敗")
            return None


        doc = docs[0]
        logger.info(f"文檔上傳成功, ID: {doc.id}")


        # 3. 分析文檔結構并創建自定義分塊
        chunks = self._create_custom_chunks(pdf_path)
        logger.info(f"創建了 {len(chunks)} 個自定義分塊")


        # 4. 手動添加分塊到文檔
        chunk_ids = []
        for chunk in chunks:
            content = chunk["content"]
            keywords = chunk["keywords"]
            chunk_type = chunk["type"]


            # 添加分塊類型標記以便未來檢索
            content_with_marker = f"[{chunk_type}]\n{content}"


            # 添加分塊
            new_chunk = doc.add_chunk(
                cnotallow=content_with_marker,
                important_keywords=keywords
            )


            chunk_ids.append(new_chunk.id)
            logger.info(f"添加分塊: 類型={chunk_type}, ID={new_chunk.id}, 關鍵詞數量={len(keywords)}")


        return doc.id


    def _create_custom_chunks(self, pdf_path: str) -> List[Dict[str, Any]]:
        """分析PDF并創建自定義分塊"""
        pdf_doc = fitz.open(pdf_path)
        chunks = []


        # 用于存儲當前上下文
        current_section = ""
        current_text_chunk = ""
        current_table_data = []
        current_keywords = []


        for page_idx, page in enumerate(pdf_doc):
            # 提取頁面文本
            text = page.get_text()


            # 檢測章節標題
            section_headers = self._detect_section_headers(text)
            if section_headers:
                # 如果有積累的文本塊,先保存
                if current_text_chunk:
                    chunks.append({
                        "content": current_text_chunk,
                        "keywords": list(set(current_keywords)),
                        "type": "text",
                        "section": current_section
                    })
                    current_text_chunk = ""
                    current_keywords = []


                # 更新當前章節
                current_section = section_headers[0]
                current_keywords.append(current_section)


            # 檢測表格
            tables = self._detect_tables(page)
            if tables:
                # 如果有積累的文本塊,先保存
                if current_text_chunk:
                    chunks.append({
                        "content": current_text_chunk,
                        "keywords": list(set(current_keywords)),
                        "type": "text",
                        "section": current_section
                    })
                    current_text_chunk = ""
                    current_keywords = []


                # 處理表格
                for table in tables:
                    table_content = self._format_table(table)
                    table_keywords = self._extract_keywords_from_table(table)


                    chunks.append({
                        "content": table_content,
                        "keywords": list(set(table_keywords + current_keywords)),
                        "type": "table",
                        "section": current_section
                    })


            # 檢測圖片
            images = self._detect_images(page)
            for img_idx, img in enumerate(images):
                # 提取圖片周圍的文本作為上下文
                img_context = self._extract_image_context(text, img["bbox"])


                # 如果找到圖片相關文本
                if img_context:
                    # 如果有積累的文本塊,先保存
                    if current_text_chunk:
                        chunks.append({
                            "content": current_text_chunk,
                            "keywords": list(set(current_keywords)),
                            "type": "text",
                            "section": current_section
                        })
                        current_text_chunk = ""
                        current_keywords = []


                    # 創建圖文塊
                    image_text_content = f"圖片描述: {img_context}\n圖片位置: 第{page_idx+1}頁"
                    image_keywords = self._extract_keywords(img_context)


                    chunks.append({
                        "content": image_text_content,
                        "keywords": list(set(image_keywords + current_keywords)),
                        "type": "image",
                        "section": current_section
                    })


                    # 防止重復處理同一段文本
                    text = text.replace(img_context, "", 1)


            # 處理剩余文本
            if text.strip():
                # 按段落拆分
                paragraphs = self._split_into_paragraphs(text)


                for para in paragraphs:
                    if len(para.strip()) < 10:  # 忽略太短的段落
                        continue


                    # 累積文本直到達到合適的大小
                    current_text_chunk += para + "\n\n"
                    current_keywords.extend(self._extract_keywords(para))


                    # 檢查是否應該創建新的文本塊
                    if len(current_text_chunk) > 1500:  # 大約300-500個詞
                        chunks.append({
                            "content": current_text_chunk,
                            "keywords": list(set(current_keywords)),
                            "type": "text",
                            "section": current_section
                        })
                        current_text_chunk = ""
                        current_keywords = []


        # 處理最后一個文本塊
        if current_text_chunk:
            chunks.append({
                "content": current_text_chunk,
                "keywords": list(set(current_keywords)),
                "type": "text",
                "section": current_section
            })


        return chunks


    def _detect_section_headers(self, text: str) -> List[str]:
        """檢測章節標題"""
        # 這是一個簡化的實現,可以根據實際文檔格式調整
        header_patterns = [
            r"^第[一二三四五六七八九十\d]+章\s*(.+)$",
            r"^\d+\.\d*\s+(.+)$",
            r"^[一二三四五六七八九十]+[、..]\s*(.+)$"
        ]


        headers = []
        for line in text.split("\n"):
            line = line.strip()
            for pattern in header_patterns:
                match = re.match(pattern, line)
                if match:
                    headers.append(line)
                    break


        return headers


    def _detect_tables(self, page) -> List[Any]:
        """檢測頁面中的表格"""
        # 這里使用簡化的檢測邏輯,實際應用中可能需要更復雜的表格檢測算法
        # 例如,可以尋找包含多個垂直和水平線的區域
        tables = []


        # 簡單表格檢測:查找水平線和垂直線的集中區域
        # 這里僅作為占位示例,實際實現會更復雜
        horizontal_lines = []
        vertical_lines = []


        for drawing in page.get_drawings():
            for item in drawing["items"]:
                if item["type"] == "l":  # 線段
                    x0, y0, x1, y1 = item["rect"]
                    if abs(y1 - y0) < 2:  # 水平線
                        horizontal_lines.append((x0, y0, x1, y1))
                    elif abs(x1 - x0) < 2:  # 垂直線
                        vertical_lines.append((x0, y0, x1, y1))


        # 簡單的表格判定:有足夠多的水平線和垂直線
        if len(horizontal_lines) >= 3 and len(vertical_lines) >= 2:
            # 找出所有線的邊界,作為表格邊界
            min_x = min([min(l[0], l[2]) for l in horizontal_lines + vertical_lines])
            max_x = max([max(l[0], l[2]) for l in horizontal_lines + vertical_lines])
            min_y = min([min(l[1], l[3]) for l in horizontal_lines + vertical_lines])
            max_y = max([max(l[1], l[3]) for l in horizontal_lines + vertical_lines])


            # 提取表格區域的文本
            table_rect = fitz.Rect(min_x, min_y, max_x, max_y)
            table_text = page.get_text("text", clip=table_rect)


            # 簡化的表格結構化(實際應用中需要更復雜的邏輯)
            table_rows = table_text.split("\n")
            table = []
            for row in table_rows:
                if row.strip():
                    cells = row.split()
                    if len(cells) >= 2:  # 至少有2個單元格才視為有效行
                        table.append(cells)


            if table:
                tables.append(table)


        return tables


    def _detect_images(self, page) -> List[Dict[str, Any]]:
        """檢測頁面中的圖片"""
        images = []


        # 獲取頁面上的圖片對象
        img_list = page.get_images(full=True)


        for img_idx, img_info in enumerate(img_list):
            xref = img_info[0]
            base_image = page.parent.extract_image(xref)


            if base_image:
                # 尋找圖片在頁面上的位置
                img_rects = []
                for rect in page.get_image_rects(xref):
                    img_rects.append(rect)


                if img_rects:
                    # 使用第一個找到的位置
                    bbox = img_rects[0]


                    images.append({
                        "bbox": [bbox.x0, bbox.y0, bbox.x1, bbox.y1]
                    })


        return images


    def _extract_image_context(self, text: str, bbox: List[float], context_size: int = 200) -> str:
        """提取圖片周圍的文本作為上下文"""
        # 在文本中查找可能的圖片標題,如"圖1","Figure 1"等
        caption_patterns = [
            r"圖\s*\d+[\..:]?\s*(.+?)(?:\n|$)",
            r"Figure\s*\d+[\.:]?\s*(.+?)(?:\n|$)",
            r"圖表\s*\d+[\..:]?\s*(.+?)(?:\n|$)"
        ]


        for pattern in caption_patterns:
            matches = re.finditer(pattern, text, re.IGNORECASE)
            for match in matches:
                return match.group(0)


        # 如果找不到明確的圖片標題,則嘗試提取圖片周圍的文本
        # 這是一個簡化實現,實際應用中可能需要更復雜的邏輯
        lines = text.split("\n")
        total_length = 0
        start_line = 0


        # 估計圖片在文本中的位置
        # 這是一個非常粗略的估計,實際應用中需要更精確的方法
        relative_position = bbox[1] / 1000  # 假設頁面高度為1000
        target_position = int(len(text) * relative_position)


        # 找到大致對應的行
        for i, line in enumerate(lines):
            total_length += len(line) + 1  # +1 for newline
            if total_length > target_position:
                start_line = max(0, i - 2)  # 從前兩行開始
                break


        # 提取上下文(當前行及其前后幾行)
        context_lines = lines[max(0, start_line):min(len(lines), start_line + 5)]
        return "\n".join(line for line in context_lines if len(line.strip()) > 5)


    def _format_table(self, table: List[List[str]]) -> str:
        """格式化表格為文本格式"""
        if not table:
            return ""


        # 創建pandas DataFrame
        df = pd.DataFrame(table[1:], columns=table[0] if len(table) > 1 else None)


        # 轉換為字符串形式
        result = io.StringIO()
        df.to_csv(result, sep="|", index=False)
        return result.getvalue()


    def _extract_keywords_from_table(self, table: List[List[str]]) -> List[str]:
        """從表格中提取關鍵詞"""
        keywords = []


        # 表頭作為關鍵詞
        if table and len(table) > 0:
            keywords.extend(table[0])


        # 第一列可能是行標題,也加入關鍵詞
        for row in table[1:] if len(table) > 1 else []:
            if row and len(row) > 0:
                keywords.append(row[0])


        return keywords


    def _split_into_paragraphs(self, text: str) -> List[str]:
        """將文本拆分為段落"""
        # 按照多個換行符拆分
        paragraphs = re.split(r"\n\s*\n", text)
        return [p.strip() for p in paragraphs if p.strip()]


    def _extract_keywords(self, text: str) -> List[str]:
        """從文本中提取關鍵詞"""
        # 這是一個簡化的關鍵詞提取方法
        # 實際應用中可以使用更復雜的NLP技術,如TF-IDF、TextRank等


        # 1. 移除停用詞和標點
        stop_words = {"的", "了", "和", "與", "或", "在", "是", "有", "被", "將", "把"}
        cleaned_text = re.sub(r'[^\w\s]', ' ', text)
        words = cleaned_text.split()
        filtered_words = [w for w in words if w not in stop_words and len(w) > 1]


        # 2. 簡單詞頻統計
        word_count = {}
        for word in filtered_words:
            if word in word_count:
                word_count[word] += 1
            else:
                word_count[word] = 1


        # 3. 選擇頻率最高的幾個詞作為關鍵詞
        sorted_words = sorted(word_count.items(), key=lambda x: x[1], reverse=True)
        return [word for word, count in sorted_words[:10] if count > 1]


# 使用示例
if __name__ == "__main__":
    chunker = DynamicChunker(
        api_key="YOUR_API_KEY",
        base_url="http://YOUR_BASE_URL:9380"
    )


    doc_id = chunker.process_document("設備維修手冊.pdf", "動態分塊測試")
    print(f"處理完成,文檔ID: {doc_id}")

(完)

責任編輯:龐桂玉 來源: 韋東東
相關推薦

2010-09-26 13:48:51

JVM調優

2011-07-01 10:09:50

ASP.NET

2018-05-09 08:35:59

2025-02-24 01:00:00

LINQ核心技術語言集成

2010-01-13 18:09:09

VB.NET動態生成代

2021-05-12 13:40:16

JVM調優經驗

2011-05-16 17:36:05

SEO

2011-05-23 17:56:14

網站優化

2012-01-10 16:22:25

Web

2009-12-16 15:23:33

Ruby on rai

2018-07-18 12:12:20

Spark大數據代碼

2009-12-18 17:01:37

Ruby基礎代碼

2023-11-23 09:26:50

Java調優

2012-01-10 14:35:08

JavaJVM

2011-06-20 13:54:41

Qt 動態 切換

2011-07-07 18:39:22

SEO

2009-12-17 09:49:18

Ruby代碼管理

2019-08-13 09:04:22

Linux性能調優

2020-02-26 15:35:17

Spring Boot項目優化JVM調優

2017-07-21 08:55:13

TomcatJVM容器
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 亚洲www啪成人一区二区 | 视频在线观看一区 | 国产精品久久精品 | 天天成人综合网 | 国产一级在线 | 欧美日韩成人影院 | 日韩三区在线观看 | 久久国产精品一区二区 | 成人精品免费视频 | 亚洲精品久久久久久久久久久 | 国产精品久久片 | 成人午夜激情 | 婷婷久久五月 | 亚洲一区成人 | 男女视频在线观看免费 | 精品久久久久香蕉网 | 国产成人精品一区二区三区在线 | 亚洲性视频网站 | 国产精品久久久久久久久免费 | 日韩国产欧美一区 | 狠狠干天天干 | 久久精品99久久 | 欧美激情一区二区三级高清视频 | 亚洲视频在线看 | 中国一级特黄真人毛片免费观看 | 免费视频一区二区 | 人人做人人澡人人爽欧美 | 国产精品久久久久久中文字 | 国产精品久久久久久久久久久久 | 亚洲视频在线观看 | 国产日韩欧美在线观看 | 午夜在线小视频 | 金莲网| 九九伊人sl水蜜桃色推荐 | 成年视频在线观看福利资源 | 亚洲精品成人在线 | 国产婷婷色综合av蜜臀av | 国产 欧美 日韩 一区 | 高清国产午夜精品久久久久久 | 日韩欧美一二三区 | 久久久精品久久久 |