基于Agent的金融問答系統:RAG的檢索增強之ElasticSearch 原創
前言
本章內容,我們將在已經構建的agent框架基礎上,優化檢索器,為檢索器搭建ElasticSearch服務,實現問答系統的檢索增強。
檢索問題
通過測試天池大賽數據集的前100個問題,我們發現有很多問題RAG檢索不到,例如:
? {"id": 34, "question": "根據武漢興圖新科電子股份有限公司招股意向書,電子信息行業的上游涉及哪些企業?"}
通過查看日志,檢索器沒有檢索到相關信息:
優化方案
分析上述case原因,檢索器太過簡單所致。
class SimpleRetrieverWrapper():
"""自定義檢索器實現"""
def__init__(self, store, llm, **kwargs):
self.store = store
self.llm = llm
logger.info(f'檢索器所使用的Chat模型:{self.llm}')
defcreate_retriever(self):
logger.info(f'初始化自定義的Retriever')
chromadb_retriever = self.store.as_retriever()
return chromadb_retriever
基于以上問題,我們計劃使用集成檢索器,方案如下:
說明:
? 將檢索器改為使用??EnsembleRetriever?
?
? 集成檢索器其中之一使用??ElasticSearch?
?? 檢索器,這個檢索器通過連接??ElasticSearch?
? 服務,通過關鍵字查詢相關信息。
? 集成檢索器另外一個使用??MultiQueryRetriever?
? 檢索器,這個檢索器通過連接Chroma向量庫查詢信息。
關于MultiQueryRetriever和ElasticSearch,之前有文章做過基本內容的總結,詳情請查看課程總結】day29:大模型之深入了解Retrievers解析器。
優化步驟
1、搭建ES服務
第一步:安裝Docker,該內容不再贅述,具體請見10分鐘學會Docker的安裝和使用
第二步:創建網絡
docker network create es-net
第三步:拉取鏡像
docker pull elasticsearch:8.6.0
第四步:創建掛載點目錄
smart-finance-bot \
|- app \
|- docker \
|- elasticsearch \ # 創建elasticsearch掛載目錄
|- data \ # 創建數據目錄
|- config \ # 創建配置目錄
|- plugins \ # 創建插件目錄
第五步:命令行中輸入命令啟動Docker容器
docker run -d \
--restart=always \
--name es \
--network es-net \
-p 9200:9200 \
-p 9300:9300 \
--privileged \
-v /Users/deadwalk/Code/smart-finance-bot/docker/elasticsearch/data:/usr/share/elasticsearch/data \
-v /Users/deadwalk/Code/smart-finance-bot/docker/elasticsearch/plugins:/usr/share/elasticsearch/plugins \
-e "discovery.type=single-node" \
-e "ES_JAVA_OPTS=-Xms512m -Xmx512m" \
elasticsearch:8.6.0
注意:
? 上述的??/Users/deadwalk/Code/smart-finance-bot?
? 請根據本地路徑修改;
? 運行完畢后請使用??docker ps?
?確認容器已經啟動。
第六步:進入es容器
docker exec -it es /bin/bash
第七步:命令行輸入重置密碼命令(此處我們重置密碼為123abc)
bin/elasticsearch-reset-password -i -u elastic
第八步:使用瀏覽器訪問http://localhost:9200/,驗證服務可以使用
2、添加數據到ES服務
2.1、測試ES的連接
編寫ES連接測試代碼,驗證ES服務連接。
def test_es_connect():
from elasticsearch importElasticsearch
ELASTIC_PASSWORD ="123abc"
host ="localhost"
port =9200
schema ="https"
url =f"{schema}://elastic:{ELASTIC_PASSWORD}@{host}:{port}"
client =Elasticsearch(
url,
verify_certs=False,
)
print(client.info())
運行結果:
2.2、實現ElasticSearch連接代碼
代碼文件:??app/rag/elasticsearch_db.py?
?
# 引入
from langchain_core.retrievers importBaseRetriever
from langchain_core.documents importDocument
# ES需要導入的庫
from typing importList
import re
import jieba
import nltk
from nltk.corpus import stopwords
import time
from elasticsearch importElasticsearch
from elasticsearch.exceptions importConnectionError,AuthenticationException
from elasticsearch import helpers
import settings
from utils.logger_config importLoggerManager
from utils.util_nltk importUtilNltk
import os
import warnings
warnings.simplefilter("ignore")# 屏蔽 ES 的一些Warnings
utilnltk =UtilNltk()
logger =LoggerManager().logger
classTraditionDB:
defadd_documents(self, docs):
"""
將文檔添加到數據庫
"""
raiseNotImplementedError("Subclasses should implement this method!")
defget_store(self):
"""
獲得向量數據庫的對象實例
"""
raiseNotImplementedError("Subclasses should implement this method!")
classElasticsearchDB(TraditionDB):
def__init__(self,
schema=settings.ELASTIC_SCHEMA,
host=settings.ELASTIC_HOST,
port=settings.ELASTIC_PORT,
index_name=settings.ELASTIC_INDEX_NAME,
k=3
# docs=docs
):
# 定義索引名稱
self.index_name = index_name
self.k = k
try:
url =f"{schema}://elastic:{settings.ELASTIC_PASSWORD}@{host}:{port}"
logger.info(f'初始化ES服務連接:{url}')
self.es =Elasticsearch(
url,
verify_certs=False,
# ca_certs="./docker/elasticsearch/certs/ca/ca.crt",
# basic_auth=("elastic", settings.ELASTIC_PASSWORD)
)
response = self.es.info()# 嘗試獲取信息
logger.info(f'ES服務響應: {response}')
except(ConnectionError,AuthenticationException)as e:
logger.error(f'連接 Elasticsearch 失敗: {e}')
raise
exceptExceptionas e:
logger.error(f'發生其他錯誤: {e}')
logger.error(f'異常類型: {type(e).__name__}')# 記錄異常類型
raise
defto_keywords(self, input_string):
"""將句子轉成檢索關鍵詞序列"""
# 按搜索引擎模式分詞
word_tokens = jieba.cut_for_search(input_string)
# 加載停用詞表
stop_words =set(stopwords.words('chinese'))
# 去除停用詞
filtered_sentence =[w for w in word_tokens ifnot w in stop_words]
return' '.join(filtered_sentence)
defsent_tokenize(self, input_string):
"""按標點斷句,沒有用到"""
# 按標點切分
sentences = re.split(r'(?<=[。!?;?!])', input_string)
# 去掉空字符串
return[sentence for sentence in sentences if sentence.strip()]
defcreate_index(self):
"""如果索引不存在,則創建索引"""
ifnot self.es.indices.exists(index=self.index_name):
# 創建索引
self.es.indices.create(index=self.index_name, ignore=400)
defbluk_data(self, paragraphs):
"""批量進行數據灌庫"""
# 灌庫指令
actions =[
{
"_index": self.index_name,
"_source":{
"keywords": self.to_keywords(para.page_content),
"text": para.page_content
}
}
for para in paragraphs
]
# 文本灌庫
helpers.bulk(self.es, actions)
# # 灌庫是異步的
# time.sleep(2)
defflush(self):
# 刷新數據,數據入庫完成以后刷新數據
self.es.indices.flush()
defsearch(self, query_string):
"""關鍵詞檢索"""
# ES 的查詢語言
search_query ={
"match":{
"keywords": self.to_keywords(query_string)
}
}
res = self.es.search(index=self.index_name, query=search_query, size=self.k)
return[hit["_source"]["text"]for hit in res["hits"]["hits"]]
defdelete(self):
"""如果索引存在,則刪除索引"""
if self.es.indices.exists(index=self.index_name):
# 創建索引
self.es.indices.delete(index=self.index_name, ignore=400)
defadd_documents(self, docs):
self.bluk_data(docs)
self.flush()
說明:
? elasticsearch后續的插入操作中,使用到了nltk分詞,其代碼已經封裝在UtilNltk類中,具體代碼請查看Github倉庫代碼,本文不再贅述。
???LoggerManager?
?是代碼重構時,封裝的一個日志管理類,具體代碼請查看Github倉庫代碼,本文不再贅述。
2.3、修改PDF文件導入代碼
在settings.py中添加elasticsearch配置信息:
"""
ES數據庫相關的配置
"""
# ES服務開關:True表示開啟ES服務,False表示關閉ES服務
ELASTIC_ENABLE_ES = True
ELASTIC_PASSWORD = os.getenv("ELASTIC_PASSWORD", "123abc")
ELASTIC_HOST = os.getenv("ELASTIC_HOST", "localhost")
ELASTIC_PORT = os.getenv("ELASTIC_PORT", 9200)
ELASTIC_SCHEMA = "https"
ELASTIC_INDEX_NAME = "smart_test_index"
確認PDFProcessor.py中已經添加了對于Elasticsearch的插入操作支持,具體代碼在【項目實戰】基于Agent的金融問答系統:代碼重構已做介紹,所以本文不再贅述。
2.4、測試PDF文件導入代碼
在test_framework.py中添加如下代碼
def test_import_elasticsearch():
# from rag.elasticsearch_db import TraditionDB
from rag.elasticsearch_db importElasticsearchDB
from rag.pdf_processor importPDFProcessor
llm, chat, embed = settings.LLM, settings.CHAT, settings.EMBED
# 導入文件的文件目錄
directory ="./dataset/pdf"
# 創建 Elasticsearch 數據庫實例
es_db =ElasticsearchDB()
# 創建 PDFProcessor 實例
pdf_processor =PDFProcessor(directory=directory,
db_type="es",
es_client=es_db,
embed=embed)
# 處理 PDF 文件
pdf_processor.process_pdfs()
運行結果:
3、修改檢索器增加Elasticsearch檢索
代碼文件:??app/rag/retrievers.py?
?
from langchain_core.callbacks importCallbackManagerForRetrieverRun
from utils.logger_config importLoggerManager
from langchain_core.retrievers importBaseRetriever
from langchain_core.documents importDocument
from langchain.retrievers importEnsembleRetriever
from langchain.retrievers.multi_query importMultiQueryRetriever
from rag.elasticsearch_db importElasticsearchDB
# ES需要導入的庫
from typing importList
import logging
import settings
logger =LoggerManager().logger
classSimpleRetrieverWrapper():
"""自定義檢索器實現"""
def__init__(self, store, llm, **kwargs):
self.store = store
self.llm = llm
logger.info(f'檢索器所使用的Chat模型:{self.llm}')
defcreate_retriever(self):
logger.info(f'初始化自定義的Retriever')
# 初始化一個空的檢索器列表
retrievers =[]
weights =[]
# Step1:創建一個 多路召回檢索器 MultiQueryRetriever
chromadb_retriever = self.store.as_retriever()
mq_retriever =MultiQueryRetriever.from_llm(retriever=chromadb_retriever, llm=self.llm)
retrievers.append(mq_retriever)
weights.append(0.5)
logger.info(f'已啟用 MultiQueryRetriever')
# Step2:創建一個 ES 檢索器
if settings.ELASTIC_ENABLE_ES isTrue:
es_retriever =ElasticsearchRetriever()
retrievers.append(es_retriever)
weights.append(0.5)
logger.info(f'已啟用 ElasticsearchRetriever')
# 使用集成檢索器,將所有啟用的檢索器集合在一起
ensemble_retriever =EnsembleRetriever(retrievers=retrievers, weights=weights)
return ensemble_retriever
classElasticsearchRetriever(BaseRetriever):
def_get_relevant_documents(self, query: str, )->List[Document]:
"""Return the first k documents from the list of documents"""
es_connector =ElasticsearchDB()
query_result = es_connector.search(query)
logger.info(f"ElasticSearch檢索到資料文件個數:{len(query_result)}")
if query_result:
return[Document(page_content=doc)for doc in query_result]
return[]
asyncdef_aget_relevant_documents(self, query: str)->List[Document]:
"""(Optional) async native implementation."""
es_connector =ElasticsearchDB()
query_result = es_connector.search(query)
if query_result:
return[Document(page_content=doc)for doc in query_result]
return []
4、測試驗證
在test_framework.py中運行test_financebot_ex()函數,測試檢索功能。
def test_financebot_ex():
from finance_bot_ex import FinanceBotEx
# 使用Chroma 的向量庫
financebot = FinanceBotEx()
example_query = "根據武漢興圖新科電子股份有限公司招股意向書,電子信息行業的上游涉及哪些企業?"
financebot.handle_query(example_query)
運行結果: 連接ES后檢索到3個資料文件
使用多路召回,生成3個檢索問題
最終通過集成檢索器檢索到答案
優化效果
通過對天池大賽前100個問題的對比測試,我們最終得到如下對比驗證結果:
內容小結
- 集成檢索器:
可以有效提高檢索的效率,同時可以增加檢索的準確度。
可以添加多個檢索器并配置不同的權重,以實現靈活的組合。
- Elasticsearch
作為傳統搜索引擎,可以通過keyword_search檢索到相關內容。
使用時需要使用Docker搭建ES服務。
數據文件需要添加到ES服務中,方便檢索。
- MultiQueryRetriever
多路召回,將問題拆分成多個問題,然后進行檢索,最終合并結果。
本文轉載自公眾號一起AI技術 作者:Dongming
