成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

基于Agent的金融問答系統(tǒng):RAG檢索模塊初建成 原創(chuàng)

發(fā)布于 2024-11-22 15:46
瀏覽
0收藏

前言

我們在上一章《【項目實戰(zhàn)】基于Agent的金融問答系統(tǒng)之項目簡介》中簡單介紹了項目背景以及數(shù)據(jù)集情況,本章將介紹RAG模塊的實現(xiàn)。

功能列表

參考之前所學(xué)內(nèi)容《大模型之初識RAG》,我們需要實現(xiàn)如下功能:

  • 向量庫的基礎(chǔ)功能

     向量庫

     數(shù)據(jù)入庫

  • 文件導(dǎo)入

    PDF文件的讀取

    PDF文件的切分

    調(diào)用向量庫接口入庫

  • 文件檢索

    連接向量庫

    檢索器檢索文件

開發(fā)過程

1、規(guī)劃工程文件

項目開始之后,我們?nèi)绻軌蛞种谱≈苯訑]代碼的沖動,改為提前做好規(guī)劃,這會是一個好的習(xí)慣。 為此,我提前做了如下規(guī)劃:

代碼管理

  • 代碼使用Git進(jìn)行管理,這樣后期多人協(xié)作時方便代碼更新、代碼Merge、沖突解決等。
  • 由于國內(nèi)訪問Github優(yōu)勢會ban,所以我們將代碼倉庫放在Gitee上。
  • 為代碼倉庫起了一個響亮的名稱后,倉庫地址定為https://gitee.com/deadwalk/smart-finance-bot

項目目錄

考慮這個項目會涉及到前端、后端、數(shù)據(jù)集、模型等,所以項目目錄規(guī)劃如下:

smart-finance-bot \
|- dataset \  # 該目錄用于保存PDF以及SQLite數(shù)據(jù)
|- doc \  # 該目錄用于保存文檔類文件,例如:需求文檔、說明文檔、數(shù)據(jù)文檔
|- app \   # 該目錄用于服務(wù)端代碼
|- agent \ # 該目錄用于保存agent相關(guān)代碼
|- rag \   # 該目錄用于保存rag相關(guān)代碼
|-test \   # 該目錄用于保存測試類驅(qū)動相關(guān)代碼
|- conf \  # 該目錄用于保存配置文件
|-.qwen # 該文件保存QWen的配置文件(請自行添加對應(yīng)的API KEY)
|-.ernie # 該文件保存百度千帆的配置文件(請自行添加對應(yīng)的API KEY)
|- chatweb \   # 該目錄用于保存前端頁面代碼
|- scripts \   # 該目錄用于保存腳本文件,例如:啟動腳本、導(dǎo)入向量數(shù)據(jù)庫腳本等
|- test_result \   # 該目錄用于保存測試結(jié)果
|- docker \
|- backend \ # 該目錄對應(yīng)后端python服務(wù)的Dockerfile
|- frontend \ # 該目錄對應(yīng)前端python服務(wù)的Dockerfile

上述目錄中,??dataset?? 是直接使用git的submodul命令,直接將天池大賽提供的數(shù)據(jù)集引入到本項目中,方便后續(xù)使用。

引入方法:

git submodule add https://www.modelscope.cn/datasets/BJQW14B/bs_challenge_financial_14b_dataset.git dataset

命名規(guī)范

項目如果能夠約束統(tǒng)一的命名規(guī)范,這對于后續(xù)代碼的可讀性、可維護(hù)性會提供需要便利,在此我沿用了約定俗成的代碼命名規(guī)范:

  • 類名:使用大駝峰命名法,例如:??MyClass??
  • 函數(shù)名:使用小駝峰命名法,例如:??my_function??
  • 變量名:使用小駝峰命名法,例如:??my_variable??
  • 文件夾:使用小駝峰命名法。

整體命名時,要盡量見文知意。

2、實現(xiàn)基本的連接大模型的util庫

代碼文件及目錄:??app/utils/util.py??

from dotenv import load_dotenv
import os

# 獲取當(dāng)前文件的目錄
current_dir = os.path.dirname(__file__)

# 構(gòu)建到 conf/.qwen 的相對路徑
conf_file_path_qwen = os.path.join(current_dir,'..','conf','.qwen')

# 加載千問環(huán)境變量
load_dotenv(dotenv_path=conf_file_path_qwen)

defget_qwen_models():
"""
    加載千問系列大模型
    """
# llm 大模型
from langchain_community.llms.tongyi importTongyi

    llm =Tongyi(model="qwen-max", temperature=0.1, top_p=0.7, max_tokens=1024)

# chat 大模型
from langchain_community.chat_models importChatTongyi

    chat =ChatTongyi(model="qwen-max", temperature=0.01, top_p=0.2, max_tokens=1024)
# embedding 大模型
from langchain_community.embeddings importDashScopeEmbeddings

    embed =DashScopeEmbeddings(model="text-embedding-v3")

return llm, chat, embed

在app/conf/.qwen中,添加對應(yīng)的API KEY,例如:

DASHSCOPE_API_KEY = sk-xxxxxx

3、實現(xiàn)向量庫基礎(chǔ)功能

向量庫文件考慮使用Chroma實現(xiàn),所以我們先實現(xiàn)一個向量庫的類,用于完成基本的向量庫連接、數(shù)據(jù)入庫操作。

代碼文件及目錄:??app/rag/chroma_conn.py??

import chromadb
from chromadb importSettings
from langchain_chroma importChroma


classChromaDB:
def__init__(self,
                 chroma_server_type="local",  # 服務(wù)器類型:http是http方式連接方式,local是本地讀取文件方式
                 host="localhost", port=8000,  # 服務(wù)器地址,http方式必須指定
                 persist_path="chroma_db",  # 數(shù)據(jù)庫的路徑:如果是本地連接,需要指定數(shù)據(jù)庫的路徑
                 collection_name="langchain",  # 數(shù)據(jù)庫的集合名稱
                 embed=None  # 數(shù)據(jù)庫的向量化函數(shù)
                 ):

        self.host = host
        self.port = port
        self.path = persist_path
        self.embed = embed
        self.store =None

# 如果是http協(xié)議方式連接數(shù)據(jù)庫
if chroma_server_type =="http":
            client = chromadb.HttpClient(host=host, port=port)

            self.store =Chroma(collection_name=collection_name,
                                embedding_function=embed,
                                client=client)

if chroma_server_type =="local":
            self.store =Chroma(collection_name=collection_name,
                                embedding_function=embed,
                                persist_directory=persist_path)

if self.store isNone:
raiseValueError("Chroma store init failed!")

defadd_with_langchain(self, docs):
"""
        將文檔添加到數(shù)據(jù)庫
        """
        self.store.add_documents(documents=docs)

defget_store(self):
"""
        獲得向量數(shù)據(jù)庫的對象實例
        """
return self.store

在實際項目實踐過程中,我們發(fā)現(xiàn)導(dǎo)入Chroma數(shù)據(jù)時使用本地化連接方式更快一些,所以對連接方式做了兩個參數(shù)的擴(kuò)展,local 代表本地連接,http 代表遠(yuǎn)程連接。

4、實現(xiàn)入庫功能

本著先跑通流程,再優(yōu)化交互過程的思路,對于PDF文件入向量庫的過程,我們先通過一段腳本實現(xiàn)(暫不做前端UI的交互)。

代碼文件及目錄:??app/rag/pdf_processor.py??

import os
import logging
import time
from tqdm import tqdm
from langchain_community.document_loaders importPyMuPDFLoader
from langchain_text_splitters importRecursiveCharacterTextSplitter
from rag.chroma_conn importChromaDB


classPDFProcessor:
def__init__(self,
                 directory,  # PDF文件所在目錄
                 chroma_server_type,  # ChromaDB服務(wù)器類型
                 persist_path,  # ChromaDB持久化路徑
                 embed):# 向量化函數(shù)

        self.directory = directory
        self.file_group_num =80# 每組處理的文件數(shù)
        self.batch_num =6# 每次插入的批次數(shù)量

        self.chunksize =500# 切分文本的大小
        self.overlap =100# 切分文本的重疊大小

        self.chroma_db =ChromaDB(chroma_server_type=chroma_server_type,
                                  persist_path=persist_path,
                                  embed=embed)
# 配置日志
        logging.basicConfig(
            level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
            datefmt='%Y-%m-%d %H:%M:%S'
)

defload_pdf_files(self):
"""
        加載目錄下的所有PDF文件
        """
        pdf_files =[]
for file in os.listdir(self.directory):
if file.lower().endswith('.pdf'):
                pdf_files.append(os.path.join(self.directory, file))

        logging.info(f"Found {len(pdf_files)} PDF files.")
return pdf_files

defload_pdf_content(self, pdf_path):
"""
        讀取PDF文件內(nèi)容
        """
        pdf_loader =PyMuPDFLoader(file_path=pdf_path)
        docs = pdf_loader.load()
        logging.info(f"Loading content from {pdf_path}.")
return docs

defsplit_text(self, documents):
"""
        將文本切分成小段
        """
# 切分文檔
        text_splitter =RecursiveCharacterTextSplitter(
            chunk_size=self.chunksize,
            chunk_overlap=self.overlap,
            length_function=len,
            add_start_index=True,
)

        docs = text_splitter.split_documents(documents)

        logging.info("Split text into smaller chunks with RecursiveCharacterTextSplitter.")
return docs

definsert_docs_chromadb(self, docs, batch_size=6):
"""
        將文檔插入到ChromaDB
        """
# 分批入庫
        logging.info(f"Inserting {len(docs)} documents into ChromaDB.")

# 記錄開始時間
        start_time = time.time()
        total_docs_inserted =0

# 計算總批次
        total_batches =(len(docs)+ batch_size -1)// batch_size

with tqdm(total=total_batches, desc="Inserting batches", unit="batch")as pbar:
for i inrange(0,len(docs), batch_size):
# 獲取當(dāng)前批次的樣本
                batch = docs[i:i + batch_size]

# 將樣本入庫
                self.chroma_db.add_with_langchain(batch)
# self.chroma_db.async_add_with_langchain(batch)

# 更新已插入的文檔數(shù)量
                total_docs_inserted +=len(batch)

# 計算并顯示當(dāng)前的TPM
                elapsed_time = time.time()- start_time  # 計算已用時間(秒)
if elapsed_time >0:# 防止除以零
                    tpm =(total_docs_inserted / elapsed_time)*60# 轉(zhuǎn)換為每分鐘插入的文檔數(shù)
                    pbar.set_postfix({"TPM":f"{tpm:.2f}"})# 更新進(jìn)度條的后綴信息

# 更新進(jìn)度條
                pbar.update(1)

defprocess_pdfs_group(self, pdf_files_group):
# 讀取PDF文件內(nèi)容
        pdf_contents =[]

for pdf_path in pdf_files_group:
# 讀取PDF文件內(nèi)容
            documents = self.load_pdf_content(pdf_path)

# 將documents 逐一添加到pdf_contents
            pdf_contents.extend(documents)

# 將文本切分成小段
        docs = self.split_text(pdf_contents)

# 將文檔插入到ChromaDB
        self.insert_docs_chromadb(docs, self.batch_num)

defprocess_pdfs(self):
# 獲取目錄下所有的PDF文件
        pdf_files = self.load_pdf_files()

        group_num = self.file_group_num

# group_num 個PDF文件為一組,分批處理
for i inrange(0,len(pdf_files), group_num):
            pdf_files_group = pdf_files[i:i + group_num]
            self.process_pdfs_group(pdf_files_group)

print("PDFs processed successfully!")

5、測試導(dǎo)入功能

因為Python的導(dǎo)入庫的原因(一般都是從工作目錄查找),所以我們在項目根目錄下創(chuàng)建test_framework.py,方便后續(xù)統(tǒng)一測試工作。

smart-finance-bot \
    |- app \   # 該目錄用于服務(wù)端代碼
        |- rag \   # 該目錄用于保存rag相關(guān)代碼
            |- pdf_processor.py
            |- chroma_conn.py
        |- test_framework.py

代碼文件: ??app/test_framework.py??

# 測試導(dǎo)入PDF到向量庫主流程
deftest_import():
from rag.pdf_processor importPDFProcessor
from utils.util import get_qwen_models

    llm , chat , embed = get_qwen_models()
# embed = get_huggingface_embeddings()

    directory ="./app/dataset/pdf"
    persist_path ="chroma_db"
    server_type ="local"

# 創(chuàng)建 PDFProcessor 實例
    pdf_processor =PDFProcessor(directory=directory,
                                 chroma_server_type=server_type,
                                 persist_path=persist_path,
                                 embed=embed)

# 處理 PDF 文件
    pdf_processor.process_pdfs()

if __name__ =="__main__":
    test_import()

1.通過命令行啟動ChromaDB服務(wù)端:

chroma run --path chroma_db --port 8000 --host localhost

2.運(yùn)行test_framework.py

運(yùn)行效果:

基于Agent的金融問答系統(tǒng):RAG檢索模塊初建成-AI.x社區(qū)


備注:一般測試框架會使用Pytest并且編寫相應(yīng)的單元測試函數(shù),本次項目中由于項目較小且函數(shù)返回結(jié)果不固定,所以就沒有寫UnitTest。如果想了解Pytest的使用示例,可以參考我的其他代碼倉庫,例如:UnitTest的使用

6、實現(xiàn)檢索功能

代碼文件: ??app/rag/rag.py??

import logging
from langchain_core.prompts importChatPromptTemplate
from langchain_core.runnables importRunnablePassthrough
from langchain_core.runnables.base importRunnableLambda
from langchain_core.output_parsers importStrOutputParser
from.chroma_conn importChromaDB

# 配置日志記錄
logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(levelname)s - %(message)s')


classRagManager:
def__init__(self,
                 chroma_server_type="http",
                 host="localhost", port=8000,
                 persist_path="chroma_db",
                 llm=None, embed=None):
        self.llm = llm
        self.embed = embed

        chrom_db =ChromaDB(chroma_server_type=chroma_server_type,
                            host=host, port=port,
                            persist_path=persist_path,
                            embed=embed)
        self.store = chrom_db.get_store()

defget_chain(self, retriever):
"""獲取RAG查詢鏈"""

# RAG系統(tǒng)經(jīng)典的 Prompt (A 增強(qiáng)的過程)
        prompt =ChatPromptTemplate.from_messages([
("human","""You are an assistant for question-answering tasks. Use the following pieces 
          of retrieved context to answer the question. 
          If you don't know the answer, just say that you don't know. 
          Use three sentences maximum and keep the answer concise.
          Question: {question} 
          Context: {context} 
          Answer:""")
])
# 將 format_docs 方法包裝為 Runnable
        format_docs_runnable =RunnableLambda(self.format_docs)
# RAG 鏈
        rag_chain =(
{"context": retriever | format_docs_runnable,
"question":RunnablePassthrough()}
| prompt
| self.llm
|StrOutputParser()
)

return rag_chain

defformat_docs(self, docs):
# 返回檢索到的資料文件名稱
        logging.info(f"檢索到資料文件個數(shù):{len(docs)}")
        retrieved_files ="\n".join([doc.metadata["source"]for doc in docs])
        logging.info(f"資料文件分別是:\n{retrieved_files}")

        retrieved_content ="\n\n".join(doc.page_content for doc in docs)
        logging.info(f"檢索到的資料為:\n{retrieved_content}")

return retrieved_content

defget_retriever(self, k=4, mutuality=0.3):
        retriever = self.store.as_retriever(search_type="similarity_score_threshold",
                                            search_kwargs={"k": k,"score_threshold": mutuality})

return retriever

defget_result(self, question, k=4, mutuality=0.3):
"""獲取RAG查詢結(jié)果"""

        retriever = self.get_retriever(k, mutuality)

        rag_chain = self.get_chain(retriever)

return rag_chain.invoke(input=question)

以上是實現(xiàn)了一個使用基本檢索器的RAG,其中:

  • 代碼中通過chroma_conn.py模塊連接到ChromaDB數(shù)據(jù)庫,并使用ChromaDB的as_retriever方法創(chuàng)建一個檢索器。

7、測試檢索效果

在test_framework.py中添加RAG的測試調(diào)用函數(shù)。

代碼文件:??app/test_framework.py??

# 測試RAG主流程
deftest_rag():
from rag.rag importRagManager
from utils.util import get_qwen_models

    llm, chat, embed = get_qwen_models()
    rag =RagManager(host="localhost", port=8000, llm=llm, embed=embed)

    result = rag.get_result("湖南長遠(yuǎn)鋰科股份有限公司變更設(shè)立時作為發(fā)起人的法人有哪些?")

print(result)


if __name__ =="__main__":
    test_rag()# RAG測試函數(shù)
# test_import()         # 批量導(dǎo)入PDF測試函數(shù)

注釋掉批量導(dǎo)入函數(shù),開啟test_rag()函數(shù),運(yùn)行效果:

基于Agent的金融問答系統(tǒng):RAG檢索模塊初建成-AI.x社區(qū)

至此,我們完成了RAG模塊的基本功能,它包括PDF文件的批量導(dǎo)入以及檢索功能。

內(nèi)容小結(jié)

  • 首先,我們創(chuàng)建了一個ChromaDB的類,封裝了基礎(chǔ)的Chroma連接、插入、檢索。
  • 其次,我們實現(xiàn)了PDFProcessor類,該類中會調(diào)用ChromaDB類的插入函數(shù),將批量讀取的PDF文件進(jìn)行切分后保存至向量庫。
  • 然后,我們實現(xiàn)了RagManager類,該類中封裝了RAG的檢索鏈,并定義了檢索的參數(shù)。
  • 最后,我們實現(xiàn)了一個測試函數(shù),用于測試RAG的檢索功能。
  • 除此之外,有兩個注意事項:

    在項目初期,進(jìn)行合理的項目文件目錄規(guī)劃,可以有效減少項目維護(hù)的難度。

    在項目行進(jìn)中,通過搭建測試框架,可以方便函數(shù)驗證以及后續(xù)重構(gòu)的回歸測試。


本文轉(zhuǎn)載自公眾號一起AI技術(shù) 作者:Dongming

原文鏈接:??https://mp.weixin.qq.com/s/ZMwZSsmms03U-GRRwnxMNg??


?著作權(quán)歸作者所有,如需轉(zhuǎn)載,請注明出處,否則將追究法律責(zé)任
標(biāo)簽
收藏
回復(fù)
舉報
回復(fù)
相關(guān)推薦
主站蜘蛛池模板: 久久国产精品一区二区 | 国产精品欧美一区二区三区不卡 | 亚洲+变态+欧美+另类+精品 | 草久在线 | 亚洲综合色视频在线观看 | 在线亚洲欧美 | 永久av| 91精品国产色综合久久不卡蜜臀 | 精品91久久| 手机av免费在线 | 婷婷99 | 久久久亚洲一区 | 日韩国产一区二区 | 精精国产xxxx视频在线播放 | 国产精品1区2区3区 欧美 中文字幕 | 国产精品美女久久久久久免费 | 天天操伊人 | 精品一区二区久久久久久久网站 | 亚洲欧美精品久久 | 亚洲精品www. | 成人在线黄色 | www.色午夜.com | 欧美日韩成人在线观看 | 欧美久 | 亚洲精品国产偷自在线观看 | 国产免费a视频 | 91看片| 国产男女猛烈无遮掩视频免费网站 | 中文在线www | 亚洲精品久久久久久久久久吃药 | 国产日韩一区二区三区 | 色综合久久天天综合网 | 国产精品一卡二卡三卡 | 精品视频一二区 | 欧美久久久久 | 国产亚洲精品久久久久久豆腐 | 国产精品福利网站 | 精品久久久久久亚洲精品 | 欧美日韩一区二区三区在线观看 | 一区二区三区视频免费观看 | 欧美一区二区免费电影 |