構建AI Agent的最優學習路線總結:MCP、CrewAI和Streamlit
如何通過 MCP(Model Context Protocol) 創建一個結合網絡搜索、AI 代理和圖像生成的現代研究助手
引言:AI 驅動的研究未來已來
想象一下,您擁有一個個人研究助手,可以即時搜索網絡、分析信息、生成全面的總結,甚至創建相關圖像——所有這些都通過一個美觀的 Web 界面完成。如果這個助手還能由尖端的 AI 代理驅動,與外部工具和 API 無縫協作,會怎樣?
歡迎體驗 MCP-Powered Study Assistant —— 一款結合現代 AI 技術的革命性應用:
? ?? CrewAI 代理,用于智能研究和寫作
? ?? Model Context Protocol (MCP),實現無縫工具集成
? ?? Streamlit,提供直觀的 Web 界面
? ?? 通過 Brave Search API 實現實時網絡搜索
? ?? 通過 Segmind API 進行 AI 圖像生成
這不僅僅是另一個 ChatGPT 包裝器——它是一個完整的研究生態系統,展示了 AI 驅動應用的未來。在本綜合指南中,我們將詳細介紹每個實現細節,從 MCP 服務器創建到美觀的 UI 設計。
圖片
技術棧:創新的構建模塊
核心框架
? CrewAI:多代理 AI 框架,用于協調智能工作流程
? MCP (Model Context Protocol):AI 與工具集成的標準化協議
? Streamlit:用于 AI 應用的現代 Web 框架
? Python 3.8+:主要編程語言
外部 API 與服務
? Brave Search API:實時網絡搜索功能
? Segmind API:最先進的 AI 圖像生成
? Groq -llama-3.3–70b-versatile:用于智能處理的大型語言模型
開發工具
? JSON:數據交換格式
? Subprocess:模塊化架構的進程管理
? File System:結果存儲與檢索
? CSS:增強用戶體驗的自定義樣式
當前系統狀態:
圖片
架構概述:整體連接方式
我們的 Study Assistant 遵循復雜的多層架構:
?? Streamlit UI → ?? API Layer → ?? CrewAI Agents → ?? MCP Protocol → ??? MCP Servers → ?? External APIs
流程分解:
1.用戶輸入:通過 Streamlit 界面輸入主題
2.流程編排:API 層管理研究工作流程
3.AI 代理激活:CrewAI 代理開始協作研究
4.工具集成:MCP 協議將代理與專用服務器連接
5.數據收集:搜索和圖像服務器收集相關內容
6.結果處理:生成并解析文件以供顯示
7.用戶體驗:美觀的選項卡界面呈現所有結果
實現深入解析:構建每個組件
1. 設置 CrewAI 代理
系統的核心在于兩個專用 AI 代理:
Research Agent:網絡搜索專家
researcher =Agent(
role='Research Specialist',
goal='Conduct comprehensive research on {topic}',
backstory='Expert at finding and analyzing information',
tools=[search_tool],
verbose=True
)
Writer Agent:內容綜合專家
writer =Agent(
role='Content Writer',
goal='Create comprehensive study materials',
backstory='Skilled at organizing complex information',
tools=[image_tool],
verbose=True
)
2. 構建 MCP 服務器
Search Server (servers/search_server.py)
@server.call_tool()
async def search_web(arguments: dict)-> list[TextContent]:
"""Brave Search API integration"""
query = arguments.get("query","")
headers ={"X-Subscription-Token": BRAVE_API_KEY}
params={"q": query,"count":10}
response = requests.get(BRAVE_SEARCH_URL, headers=headers,params=params)
results = response.json()
return[TextContent(type="text", text=json.dumps(results))]
Image Server (servers/image_server.py)
@server.call_tool()
async def generate_image(arguments: dict)-> list[TextContent]:
"""Segmind API image generation"""
prompt = arguments.get("prompt","")
data ={
"prompt": prompt,
"style":"photographic",
"samples":1
}
response = requests.post(SEGMIND_URL, jsnotallow=data, headers=headers)
# Save and return image path
3. 創建 Streamlit 界面
美觀的 UI 與自定義樣式
def apply_custom_css():
st.markdown("""
<style>
.main-header {
background: linear-gradient(90deg,#667eea 0%, #764ba2 100%);
padding:2rem;
border-radius:10px;
color: white;
text-align: center;
margin-bottom:2rem;
}
.result-card {
background: white;
padding:1.5rem;
border-radius:10px;
box-shadow:02px4px rgba(0,0,0,0.1);
margin:1rem0;
}
</style>
""", unsafe_allow_html=True)
多選項卡結果顯示
def display_results():
tab1, tab2, tab3 = st.tabs([
"?? Search Results",
"?? Summary",
"?? Generated Images"
])
with tab1:
display_search_results()
with tab2:
display_summary_with_download()
with tab3:
display_image_gallery()
4. API 層實現
main_api.py 作為關鍵橋梁:
def run_research(topic: str)->Dict:
"""Execute research workflow"""
try:
# Run main.py as subprocess
result = subprocess.run(
[sys.executable,"main.py", topic],
capture_output=True,
text=True,
timeout=300# 5-minute timeout
)
return{
"search_results": extract_search_results(),
"summary": extract_summary_from_output(result.stdout),
"images": get_generated_images(),
"success":True
}
except subprocess.TimeoutExpired:
return{"success":False,"error":"Research timeout"}
核心功能:獨特之處
? 智能網絡搜索
?實時 Brave Search API 集成
?結構化結果解析和過濾
?基于相關性的內容排序
多代理協作
?專用研究和寫作代理
?自動化工作流程編排
?上下文感知的信息綜合
AI 圖像生成
?與主題相關的視覺內容創建
?多種圖像風格選項
?自動提示優化
美觀的 Web 界面
?響應式設計與自定義 CSS
?選項卡式結果組織
?所有內容可下載
系統監控
?實時 MCP 服務器狀態
?錯誤處理與恢復
?性能指標跟蹤
高級實現技巧
- 錯誤處理策略
def robust_mcp_call(server_path: str, max_retries:int=3):
for attempt in range(max_retries):
try:
# MCP server communication
return call_mcp_server(server_path)
exceptExceptionas e:
if attempt == max_retries -1:
st.error(f"?? Server Caiunavailable: {e}")
time.sleep(2** attempt)# Exponential backoff
- 結果提取模式
def extract_summary_from_output(output: str)-> str:
patterns =[
r"FINAL RESULT:\s*(.+?)(?=\n\n|\Z)",
r"## Final Answer:\s*(.+?)(?=\n\n|\Z)",
r"Summary:\s*(.+?)(?=\n\n|\Z)"
]
for pattern in patterns:
match = re.search(pattern, output, re.DOTALL | re.IGNORECASE)
if match:
return clean_summary_text(match.group(1))
return"Summary extraction failed"
- 性能優化
異步 MCP 服務器調用?并行文件處理?智能緩存策略?資源清理自動化
部署指南:讓您的助手上線
- 環境設置
pip install -r requirements.txt
# 配置 API 密鑰
export BRAVE_API_KEY="your-brave-key"
export SEGMIND_API_KEY="your-segmind-key"
export OPENAI_API_KEY="your-openai-key"
- 啟動序列
# 啟動 MCP 服務器
python servers/search_server.py &
python servers/image_server.py &
# 啟動 Streamlit 應用
streamlit run streamlit_app.py
生產環境考慮
?使用 Docker 進行容器部署?高流量負載均衡?結果持久化的數據庫集成?API 速率限制與監控
- 完整代碼實現
├── ?? main.py# ?? 核心 CrewAI 應用與代理
├── ??requirements.txt# ?? Python 依賴
├── ??debug_summary.py# ?? 摘要提取調試工具
├── ??app.py# ?? 美觀的 Web 界面
├── ??setup_nodejs.py# ?? Node.js 設置工具
├── ??test_python_version.py# ?? Python 版本兼容性測試
├── ??segmin.py# ?? Segmind API 工具
├── ??servers/# ?? MCP 服務器實現│
│├── ??search_server.py# ?? Brave Search MCP 服務器 (Python)
│├── ??image_server.py# ?? Segmind 圖像 MCP 服務器 (Python)
│├── ??search_results/# ?? 生成的搜索數據│
│ └── … (研究主題)│
│ └── ??images/# ??? 生成的 AI 圖像│
└── … (主題圖像)│ ├── ??pycache/# ?? Python 緩存文件└── ??.venv/ # ?? 虛擬環境
??? 架構分解
??? 前端層
?streamlit_app.py — 具有美觀 UI 的主要 Web 界面?streamlit_app_backup.py — 安全備份版本
??? API 與集成層
?main_api.py — Streamlit 與 CrewAI 之間的橋梁?app.py — 替代界面實現
??? AI 核心層
?main.py — CrewAI 代理 (Research + Writer)
?debug_summary.py — 摘要提取工具
??? MCP 服務器層
?servers/search_server.py — 通過 Brave API 進行網絡搜索?servers/image_server.py — 通過 Segmind API 進行圖像生成
??? 數據存儲層
?servers/search_results/ — 包含搜索數據的 JSON 文件(40+ 主題)?servers/images/ — 生成的 AI 圖像(30+ 視覺效果)
??? 配置與工具
?requirements.txt — 依賴管理?setup_nodejs.py — 環境設置?test_python_version.py — 兼容性測試
MCP 服務器 - Image_Server
from typing importAny
import httpx
from mcp.server.fastmcp importFastMCP
import os
import requests
import base64
import logging
from pathlib importPath
from dotenv import load_dotenv
load_dotenv()
# 設置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("image_server")
# 初始化 FastMCP 服務器
mcp =FastMCP("image_server")
# 獲取當前目錄
current_dir =Path(__file__).parent
output_dir = current_dir /"images"
os.makedirs(output_dir, exist_ok=True)
# 驗證 API 密鑰
api_key = os.getenv("SEGMIND_API_KEY")
ifnot api_key:
logger.error("SEGMIND_API_KEY environment variable is not set!")
raiseRuntimeError("Missing Segmind API key")
url ="https://api.segmind.com/v1/imagen-4"
@mcp.tool(name="image_creation_openai", descriptinotallow="Create an image using Segmind API")
def image_creation_openai(query: str Latinos, image_name: str)-> str:
try:
logger.info(f"Creating image for query: {query}")
# 請求負載
data ={
"prompt": f"Generate an image: {query}",
"negative_prompt":"blurry, pixelated",
"aspect_ratio":"4:3"
}
headers ={'x-api-key': os.getenv("SEGMIND_API_KEY")}
# 添加超時和錯誤處理
try:
response = requests.post(url, jsnotallow=data, headers=headers, timeout=30)
response.raise_for_status()
except requests.exceptions.RequestExceptionas e:
logger.error(f"API request failed: {e}")
return{"success":False,"error": f"API request failed: {str(e)}"}
# 保存圖像
image_path = output_dir / f"{image_name}.jpeg"
with open(image_path,"wb")as f:
f.write(response.content)
logger.info(f"Image saved to {image_path}")
return{"success":True,"image_path": str(image_path)}
exceptExceptionas e:
logger.exception("Image creation failed")
return{"success":False,"error": str(e)}
if __name__ =="__main__":
logger.info("Starting Image Creation MCP Server")
try:
mcp.run(transport="stdio")
exceptExceptionas e:
logger.exception("Server crashed")
# 在 Windows 中添加暫停以查看錯誤
input("Press Enter to exit...")
raise
MCP 服務器 - Search_Server
from typing importAny,Dict,List
import requests
from mcp.server.fastmcp importFastMCP
import os
import logging
import json
from pathlib importPath
from dotenv import load_dotenv
load_dotenv()
# 設置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("search_server")
# 初始化 FastMCP 服務器
mcp =FastMCP("search_server")
# 獲取當前目錄
current_dir =Path(__file__).parent
results_dir = current_dir /"search_results"
os.makedirs(results_dir, exist_ok=True)
# 驗證 API 密鑰
api_key = os.getenv("BRAVE_API_KEY")
ifnot api_key:
logger.warning("BRAVE_API_KEY environment variable is not set!")
logger.warning("Search functionality will be limited or unavailable")
# Brave Search API 端點
BRAVE_SEARCH_URL ="https://api.search.brave.com/res/v1/web/search"
@mcp.tool(name="brave_search", descriptinotallow="Search the web using Brave Search API")
def brave_search(query: str, count:int=10)->Dict[str,Any]:
"""
使用BraveSearch API 搜索網絡
參數:
query:搜索查詢字符串
count:返回的結果數量(最大20)
返回:
包含搜索結果的字典
"""
try:
logger.info(f"Searching for: {query}")
ifnot api_key:
return{
"success":False,
"error":"BRAVE_API_KEY not configured",
"results":[]
}
# 將計數限制在合理范圍內
count = max(1, min(count,20))
# 請求頭
headers ={
"Accept":"application/json",
"Accept-Encoding":"gzip",
"X-Subscription-Token": api_key
}
# 請求參數
params={
"q": query,
"count": count,
"search_lang":"en",
"country":"US",
"safesearch":"moderate",
"freshness":"pw",# 過去一周以獲取更新的結果
"text_decorations":False,
"spellcheck":True
}
# 發起 API 請求
try:
response = requests.get(
BRAVE_SEARCH_URL,
headers=headers,
params=params,
timeout=30
)
response.raise_for_status()
except requests.exceptions.RequestExceptionas e:
logger.error(f"Search API request failed: {e}")
return{
"success":False,
"error": f"Search API request failed: {str(e)}",
"results":[]
}
# 解析響應
try:
data = response.json()
except json.JSONDecodeErroras e:
logger.error(f"Failed to parse search response: {e}")
return{
"success":False,
"error":"Failed to parse search response",
"results":[]
}
# 提取并格式化結果
search_results =[]
web_results = data.get("web",{}).get("results",[])
for result in web_results:
search_result ={
"title": result.get("title",""),
"url": result.get("url",""),
"description": result.get("description",""),
"published": result.get("published",""),
"thumbnail": result.get("thumbnail",{}).get("src","")if result.get("thumbnail")else""
}
search_results.append(search_result)
# 將結果保存到文件以供參考
try:
results_file = results_dir / f"search_{query.replace(' ', '_')[:50]}.json"
with open(results_file,'w', encoding='utf-8')as f:
json.dump({
"query": query,
"timestamp": data.get("query",{}).get("posted_at",""),
"results": search_results
}, f, indent=2, ensure_ascii=False)
logger.info(f"Search results saved to {results_file}")
exceptExceptionas e:
logger.warning(f"Failed to save search results: {e}")
logger.info(f"Found {len(search_results)} search results")
return{
"success":True,
"query": query,
"total_results": len(search_results),
"results": search_results
}
exceptExceptionas e:
logger.exception("Search operation failed")
return{
"success":False,
"error": str(e),
"results":[]
}
@mcp.tool(name="search_news", descriptinotallow="Search for news using Brave Search API")
def search_news(query: str, count:int=5)->Dict[str,Any]:
"""
使用BraveSearch API 搜索新聞
參數:
query:搜索查詢字符串
count:返回的新聞結果數量(最大20)
返回:
包含新聞搜索結果的字典
"""
try:
logger.info(f"Searching news for: {query}")
ifnot api_key:
return{
"success":False,
"error":"BRAVE_API_KEY not configured",
"results":[]
}
# 將計數限制在合理范圍內
count = max(1, min(count,20))
# 請求頭
headers ={
"Accept":"application/json",
"Accept-Encoding":"gzip",
"X-Subscription-Token": api_key
}
# 新聞搜索的請求參數
params={
"q": query,
"count": count,
"search_lang":"en",
"country":"US",
"safesearch":"moderate",
"freshness":"pd",# 過去一天以獲取最新新聞
"text_decorations":False,
"result_filter":"news"# 專注于新聞結果
}
# 發起 API 請求
try:
response = requests.get(
BRAVE_SEARCH_URL,
headers=headers,
params=params,
timeout=30
)
response.raise_for_status()
except requests.exceptions.RequestExceptionas e:
logger.error(f"News search API request failed: {e}")
return{
"success":False,
"error": f"News search API request failed: {str(e)}",
"results":[]
}
# 解析響應
try:
data = response.json()
except json.JSONDecodeErroras e:
logger.error(f"Failed to parse news search response: {e}")
return{
"success":False,
"error":"Failed to parse news search response",
"results":[]
}
# 提取新聞結果
news_results =[]
# 檢查響應中的新聞部分
news_data = data.get("news",{}).get("results",[])
ifnot news_data:
# 如果沒有專用新聞部分,則回退到網頁結果
news_data = data.get("web",{}).get("results",[])
for result in news_data:
news_result ={
"title": result.get("title",""),
"url": result.get("url",""),
"description": result.get("description",""),
"published": result.get("age", result.get("published","")),
"source": result.get("profile",{}).get("name","")if result.get("profile")else"",
"thumbnail": result.get("thumbnail",{}).get("src","")if result.get("thumbnail")else""
}
news_results.append(news_result)
logger.info(f"Found {len(news_results)} news results")
return{
"success":True,
"query": query,
"total_results": len(news_results),
"results": news_results
}
exceptExceptionas e:
logger.exception("News search operation failed")
return{
"success":False,
"error": str(e),
"results":[]
}
if __name__ =="__main__":
logger.info("Starting Brave Search MCP Server")
try:
mcp.run(transport="stdio")
exceptExceptionas e:
logger.exception("Search server crashed")
# 在 Windows 中添加暫停以查看錯誤
input("Press Enter to exit...")
raise
代理 - main.py
from crewai importAgent,Task,Crew, LLM
from crewai_tools importMCPServerAdapter
from mcp importStdioServerParameters
import sys
import platform
from pathlib importPath
import os
import warnings
from pydantic importPydanticDeprecatedSince20
from dotenv import load_dotenv
import traceback
import subprocess
from pydantic importBaseModel,Field
classSummary(BaseModel):
summary: str =Field(descriptinotallow="研究成果的詳細摘要")
image_path: str =Field(descriptinotallow="代理創建的圖像文件路徑")
# 加載環境變量
load_dotenv()
def get_available_llm():
"""從環境變量中獲取第一個可用的 LLM"""
llm_configs =[
{
"name":"Groq Llama",
"model":"groq/llama-3.3-70b-versatile",
"api_key_env":"GROQ_API_KEY",
"temperature":0.7
},
{
"name":"OpenAI GPT-4",
"model":"gpt-4o-mini",
"api_key_env":"OPENAI_API_KEY",
"temperature":0.7
},
{
"name":"Anthropic Claude",
"model":"claude-3-haiku-20240307",
"api_key_env":"ANTHROPIC_API_KEY",
"temperature":0.7
},
{
"name":"Ollama Local",
"model":"ollama/llama3.2",
"api_key_env":None,# 本地模型無需 API 密鑰
"temperature":0.7
}
]
print("?? 檢查可用的 LLM 提供者...")
for config in llm_configs:
try:
if config["api_key_env"]isNone:
# 對于本地模型如 Ollama,嘗試無需 API 密鑰
print(f"? 嘗試 {config['name']} (本地)...")
llm = LLM(
model=config["model"],
temperature=config["temperature"],
max_tokens=1000,
)
print(f"? 使用 {config['name']}: {config['model']}")
return llm
else:
api_key = os.getenv(config["api_key_env"])
if api_key:
print(f"? 嘗試 {config['name']}...")
llm = LLM(
model=config["model"],
temperature=config["temperature"],
api_key=api_key
)
print(f"? 使用 {config['name']}: {config['model']}")
return llm
else:
print(f"?? {config['name']} API 密鑰未在環境中找到")
exceptExceptionas e:
print(f"? {config['name']} 失敗: {str(e)[:100]}...")
continue
# 如果全部失敗,則回退到基本配置
print("?? 使用回退 LLM 配置...")
return LLM(
model="groq/llama-3.3-70b-versatile",
temperature=0.7,
api_key=os.getenv("GROQ_API_KEY","")
)
# 配置具有回退選項的 LLM
llm = get_available_llm()
# 抑制警告
warnings.filterWarnings("ignore", category=PydanticDeprecatedSince20)
# 獲取當前目錄
base_dir =Path(__file__).parent.resolve()
print(f"Python 可執行文件: {sys.executable}")
print(f"當前目錄: {os.getcwd()}")
print(f"基礎目錄: {base_dir}")
# 確定適用于 Windows 的正確 npx 命令
npx_cmd ="npx.cmd"if platform.system()=="Windows"else"npx"
def check_npx_availability():
"""檢查 npx 是否可用且正常工作"""
try:
result = subprocess.run([npx_cmd,"--version"],
capture_output=True, text=True, timeout=10)
if result.returncode ==0:
print(f"? NPX 可用: {result.stdout.strip()}")
returnTrue
else:
print(f"? NPX 檢查失敗: {result.stderr}")
returnFalse
exceptExceptionas e:
print(f"? NPX 不可用: {e}")
returnFalse
def check_python_server():
"""檢查 Python 圖像服務器是否存在"""
server_path = base_dir /"servers"/"image_server.py"
if server_path.exists():
print(f"? 找到 Python 圖像服務器: {server_path}")
returnTrue
else:
print(f"? 未找到 Python 圖像服務器: {server_path}")
returnFalse
def check_search_server():
"""檢查 Python 搜索服務器是否存在"""
server_path = base_dir /"servers"/"search_server.py"
if server_path.exists():
print(f"? 找到 Python 搜索服務器: {server_path}")
returnTrue
else:
print(f"? 未找到 Python 搜索服務器: {server_path}")
returnFalse
def get_working_servers():
"""獲取工作服務器配置列表"""
working_servers =[]
print("\n"+"="*50)
print("診斷 MCP 服務器")
print("="*50)
# 首先檢查 Python 圖像服務器(最有可能工作)
python_server_available = check_python_server()
if python_server_available:
image_server_params =StdioServerParameters(
command="python",
args=[
str(base_dir /"servers"/"image_server.py"),
],
env={"UV_PYTHON":"3.12",**os.environ},
)
working_servers.append(("Image Server", image_server_params))
print("? 圖像服務器已配置")
else:
print("? 跳過圖像服務器(未找到服務器文件)")
# 檢查 Python 搜索服務器
search_server_available = check_search_server()
if search_server_available:
search_server_params =StdioServerParameters(
command="python",
args=[
str(base_dir /"servers"/"search_server.py"),
],
env={"UV_PYTHON":"3.12",**os.environ},
)
working_servers.append(("Python Search Server", search_server_params))
print("? Python 搜索服務器已配置")
else:
print("? 跳過 Python 搜索服務器(未找到服務器文件)")
# 僅為文件系統服務器檢查 NPX 可用性
npx_available = check_npx_availability()
# 僅在 Node.js 版本足夠新時添加 NPX 服務器
if npx_available:
node_version_check = check_node_version()
if node_version_check:
# 文件系統服務器配置
filesystem_server_params =StdioServerParameters(
command=npx_cmd,
args=[
"-y",
"@modelcontextprotocol/server-filesystem",
os.path.join(os.path.expanduser("~"),"Downloads")
],
)
working_servers.append(("Filesystem Server", filesystem_server_params))
print("? 文件系統服務器已配置")
else:
print("?? 由于 Node.js 版本兼容性問題,跳過 NPX 文件系統服務器")
print("?? 要啟用文件系統服務器,請將 Node.js 更新到 18+ 或 20+ 版本")
print(" 訪問: https://nodejs.org/en/download/")
else:
print("? 跳過 NPX 文件系統服務器(NPX 不可用)")
print(f"\n找到 {len(working_servers)} 個服務器配置")
return working_servers
def check_node_version():
"""檢查 Node.js 版本是否兼容"""
try:
result = subprocess.run(["node","--version"],
capture_output=True, text=True, timeout=10)
if result.returncode ==0:
version = result.stdout.strip()
print(f"Node.js 版本: {version}")
# 提取主版本號
major_version =int(version.lstrip('v').split('.')[0])
if major_version >=18:
print("? Node.js 版本兼容")
returnTrue
else:
print(f"?? Node.js 版本 {version} 可能過舊(推薦 v18+)")
returnFalse
returnFalse
exceptExceptionas e:
print(f"? 無法檢查 Node.js 版本: {e}")
returnFalse
classCustomMCPServerAdapter(MCPServerAdapter):
"""具有增加超時的自定義 MCP 服務器適配器"""
def __init__(self,*args,**kwargs):
super().__init__(*args,**kwargs)
self.timeout =90# 將超時增加到 90 秒
def test_servers_individually(server_configs):
"""單獨測試每個服務器以識別問題服務器"""
working_servers =[]
print("\n"+"="*50)
print("單獨測試服務器")
print("="*50)
for name, server_params in server_configs:
print(f"\n測試 {name}...")
try:
withCustomMCPServerAdapter([server_params])as tools:
print(f"? {name} 連接成功!")
print(f" 可用工具: {[tool.name for tool in tools]}")
working_servers.append(server_params)
exceptExceptionas e:
print(f"? {name} 失敗: {str(e)[:100]}...")
continue
return working_servers
def create_agent_and_tasks(tools=None):
"""創建代理和任務(帶或不帶工具)"""
tools_list = tools or[]
# 根據可用工具調整角色和任務
if tools_list:
tool_names =[getattr(tool,'name','unknown')for tool in tools_list]
print(f"代理將有權訪問: {tool_names}")
role ="AI Research Creator with Tools"
goal ="使用可用的 MCP 工具深入研究主題,創建全面的圖表,并保存摘要"
backstory ="擅長使用 MCP 工具收集信息、創建視覺表示并保存研究成果的 AI 研究者和創作者。"
else:
role ="AI Research Creator"
goal ="使用內置知識深入研究和分析主題"
backstory ="擅長分析主題并使用可用知識提供詳細見解的 AI 研究者。"
agent =Agent(
role=role,
goal=goal,
backstory=backstory,
tools=tools_list,
llm=llm,
verbose=True,
)
if tools_list:
research_task =Task(
descriptinotallow="使用可用的 MCP 工具深入研究主題 '{topic}'。如果有圖像創建工具,創建一個深入的圖表,展示主題的工作原理,包括關鍵組件、流程和關系。",
expected_output="全面的研究摘要,如果可能,包括成功創建的圖表/圖像,說明主題。",
agent=agent,
)
summary_task =Task(
descriptinotallow="創建研究成果的詳細摘要。如果有文件系統工具,將其保存為 Downloads 文件夾中的文本文件。包括關鍵見解、重要細節和對創建的任何圖表的引用。",
expected_output="研究成果的詳細摘要,如果有文件系統訪問權限,優選保存為文本文件。最終響應應采用 pydantic 模型 Summary 的格式",
agent=agent,
output_pydantic=Summary
)
else:
research_task =Task(
descriptinotallow="使用您的知識深入研究和分析主題 '{topic}'。提供有關其工作原理的詳細見解,包括關鍵組件、流程和關系。",
expected_output="對主題的全面分析和解釋,包含詳細見解。",
agent=agent,
)
summary_task =Task(
descriptinotallow="創建分析的詳細摘要,突出主題的最重要方面、關鍵見解和實際意義。",
expected_output="結構良好的摘要,包含主題的關鍵發現和見解。最終響應應采用 pydantic 模型 Summary 的格式",
agent=agent,
output_pydantic=Summary,
markdown=True,# 啟用最終輸出的 markdown 格式
output_file="report.md"
)
return agent,[research_task, summary_task]
def main():
"""運行 CrewAI 應用的主函數"""
# 獲取可用服務器配置
server_configs = get_working_servers()
ifnot server_configs:
print("\n?? 無可用 MCP 服務器。僅以回退模式運行。")
run_fallback_mode()
return
# 單獨測試服務器以找到工作中的服務器
working_server_params = test_servers_individually(server_configs)
ifnot working_server_params:
print("\n?? 無 MCP 服務器工作。以回退模式運行。")
run_fallback_mode()
return
try:
print(f"\n? 使用 {len(working_server_params)} 個工作中的 MCP 服務器")
print("初始化 MCP 服務器適配器...")
withCustomMCPServerAdapter(working_server_params)as tools:
print(f"成功連接到 MCP 服務器!")
print(f"可用工具: {[tool.name for tool in tools]}")
# 使用 MCP 工具創建代理和任務
agent, tasks = create_agent_and_tasks(tools)
# 創建具有錯誤處理的 crew
crew =Crew(
agents=[agent],
tasks=tasks,
verbose=True,
reasnotallow=True,
)
# 獲取用戶輸入
topic = input("\n請輸入要研究的主題: ").strip()
ifnot topic:
topic ="artificial intelligence"
print(f"未提供主題,使用默認值: {topic}")
# 使用重試機制執行 crew
max_retries =2
for attempt in range(max_retries +1):
try:
print(f"\n開始研究: {topic} (嘗試 {attempt + 1})")
result = crew.kickoff(inputs={"topic": topic})
# print("\n" + "="*50)
# print("來自代理的最終結果")
# print("="*50)
response = result["summary"]
print(response)
print(f"摘要任務輸出: {tasks[1].output}")
return response
exceptExceptionas e:
if attempt < max_retries:
print(f"?? 嘗試 {attempt + 1} 失敗: {str(e)[:100]}...")
print(f"?? 重試... ({attempt + 2}/{max_retries + 1})")
continue
else:
print(f"? 所有嘗試均失敗。錯誤: {e}")
raise e
exceptExceptionas e:
print(f"使用 MCP 工具運行時出錯: {e}")
traceback.print_exc()
print("\n回退到無 MCP 工具的基本代理...")
run_fallback_mode()
def run_fallback_mode():
"""在無 MCP 工具的情況下運行應用"""
print("\n"+"="*50)
print("以回退模式運行")
print("="*50)
# 創建不帶 MCP 工具但帶 LLM 的回退代理
agent, tasks = create_agent_and_tasks()
crew =Crew(
agents=[agent],
tasks=tasks,
verbose=True,
reasnotallow=True,
)
# 獲取回退模式的輸入
topic = input("請輸入要研究的主題(回退模式): ").strip()
ifnot topic:
topic ="artificial intelligence"
print(f"未提供主題,使用默認值: {topic}")
print(f"\n開始研究: {topic}(無 MCP 工具)")
result = crew.kickoff(inputs={"topic": topic})
print("\n"+"="*50)
print("最終結果(回退模式):")
print("="*50)
print(result["summary"])
return result["summary"]
if __name__ =="__main__":
print("?? 啟動 CrewAI MCP 演示")
print("\n?? 設置說明:")
print(" 要使用更多 MCP 服務器,請將 Node.js 更新到 v18+: https://nodejs.org")
print(" 在 .env 文件中添加 API 密鑰以支持更多 LLM 提供者")
print(" 支持: GROQ_API_KEY, OPENAI_API_KEY, ANTHROPIC_API_KEY, BRAVE_API_KEY")
result = main()
#print(result)
使用 Streamlit 的用戶界面 - app.py
import streamlit as st
import subprocess
import sys
import os
from pathlib importPath
import glob
from PIL importImage
import re
def find_venv_python():
"""從虛擬環境中找到正確的 Python 可執行文件"""
current_dir =Path(__file__).parent
possible_venv_paths =[
os.path.join(current_dir,".venv","Scripts","python.exe"),
os.path.join(current_dir,"venv","Scripts","python.exe"),
os.path.join(current_dir,".venv","bin","python"),
os.path.join(current_dir,"venv","bin","python"),
]
for path in possible_venv_paths:
if os.path.exists(path):
return path
return sys.executable
def run_research(topic):
"""使用給定主題運行 main.py 并返回結果"""
current_dir =Path(__file__).parent
python_executable = find_venv_python()
# 準備帶 UTF-8 編碼的環境
env = os.environ.copy()
env['PYTHONIOENCODING']='utf-8'
env['PYTHONLEGACYWINDOWSSTDIO']='1'
try:
# 作為子進程運行 main.py
process = subprocess.Popen(
[python_executable,"main.py"],
cwd=current_dir,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
encoding='utf-8',
errors='replace',
env=env
)
# 將主題作為輸入發送
stdout, stderr = process.communicate(input=topic +"\n", timeout=300)
if process.returncode ==0:
# 從 stdout 中提取最終結果
return extract_final_result(stdout),None
else:
returnNone, f"錯誤(返回代碼 {process.returncode}):\n{stderr}"
except subprocess.TimeoutExpired:
process.kill()
returnNone,"研究在 5 分鐘后超時"
exceptExceptionas e:
returnNone, f"意外錯誤: {str(e)}"
def extract_final_result(output):
"""從 main.py CrewAI 輸出中提取最終結果"""
lines = output.split('\n')
# 首先,嘗試找到最終結果部分
final_result_start =-1
for i, line in enumerate(lines):
if"FINAL RESULT:"in line or"==================================================\nFINAL RESULT:"in output:
final_result_start = i
break
if final_result_start !=-1:
# 提取 “FINAL RESULT:” 之后直到結束的內容
result_lines =[]
for line in lines[final_result_start:]:
# 跳過 “FINAL RESULT:” 行本身
if"FINAL RESULT:"in line:
# 如果同一行存在標記后的內容,則獲取
content_after = line.split("FINAL RESULT:",1)
if len(content_after)>1and content_after[1].strip():
result_lines.append(content_after[1].strip())
continue
# 跳過 CrewAI 格式化和空行
cleaned_line = re.sub(r'[╭│╰═─└├┤┬┴┼╔╗╚╝║╠╣╦╩╬▓??]','', line)
cleaned_line = cleaned_line.strip()
if cleaned_line:
result_lines.append(cleaned_line)
if result_lines:
return'\n'.join(result_lines).strip()
# 第二次嘗試:尋找 ## Final Answer 模式
final_answer_lines =[]
capturing =False
for line in lines:
if"## Final Answer"in line or"Final Answer:"in line:
capturing =True
# 如果標記后有內容,則包含
if"Final Answer:"in line:
content = line.split("Final Answer:",1)
if len(content)>1and content[1].strip():
final_answer_lines.append(content[1].strip())
continue
if capturing:
# 跳過 CrewAI 框圖字符和進度指示器
cleaned = re.sub(r'[╭│╰═─└├┤┬┴┼╔╗╚╝║╠╣╦╩╬▓?????????]','', line)
cleaned = cleaned.strip()
# 在某些模式下停止,表示答案結束
if any(pattern in line.lower()for pattern in[
'crew execution completed','task completion','crew completion',
'└──','assigned to:','status:','used'
]):
break
# 僅包含實質性內容
if cleaned and len(cleaned)>10:
final_answer_lines.append(cleaned)
if final_answer_lines:
return'\n'.join(final_answer_lines).strip()
# 第三次嘗試:在 crew 完成消息前獲取最后一段實質性內容
substantial_blocks =[]
current_block =[]
for line in lines:
# 跳過明顯的 CrewAI UI 元素
if any(skip in line for skip in['╭','│','╰','??','??','└──','Assigned to:','Status:']):
if current_block:
substantial_blocks.append('\n'.join(current_block))
current_block =[]
continue
cleaned = line.strip()
if cleaned and len(cleaned)>30:# 僅實質性行
current_block.append(cleaned)
elif current_block:# 空行結束一個塊
substantial_blocks.append('\n'.join(current_block))
current_block =[]
# 添加最后一個塊
if current_block:
substantial_blocks.append('\n'.join(current_block))
# 返回最后一個實質性塊(很可能是最終答案)
if substantial_blocks:
return substantial_blocks[-1].strip()
return"研究成功完成。請檢查控制臺輸出以獲取詳細結果。"
def get_latest_images():
"""從 images 文件夾獲取最新圖像"""
images_dir =Path("servers/images")
ifnot images_dir.exists():
return[]
# 獲取所有圖像文件
image_extensions =['*.jpg','*.jpeg','*.png','*.gif','*.bmp']
image_files =[]
for ext in image_extensions:
image_files.extend(glob.glob(str(images_dir / ext)))
ifnot image_files:
return[]
# 按修改時間排序(最新優先)
image_files.sort(key=os.path.getmtime, reverse=True)
# 返回前 5 個最新圖像
return image_files[:1]
def main():
st.set_page_config(
page_title="CrewAI-MCP 研究助手",
page_icnotallow="??",
layout="wide"
)
st.title("?? CrewAI-MCP 學習助手")
st.markdown("輸入一個主題以進行研究并生成帶視覺圖表的全面見解。")
# 主題輸入
topic = st.text_input(
"研究主題:",
placeholder="例如,解釋光合作用過程、機器學習算法等。",
help="輸入您想要詳細研究的任何主題"
)
# 研究按鈕
if st.button("?? 開始研究", type="primary", disabled=not topic.strip()):
if topic.strip():
with st.spinner(f"?? 正在研究 '{topic}'... 這可能需要幾分鐘。"):
result, error = run_research(topic.strip())
print(f"來自 CREWAI 的結果: {result}")
if result:
st.success("? 研究成功完成!")
print(f"來自 CREWAI 的結果: {result}")
# 將結果存儲在會話狀態中
st.session_state['research_result']= result
st.session_state['research_topic']= topic.strip()
st.session_state['latest_images']= get_latest_images()
else:
st.error(f"? 研究失敗: {error}")
# 并排顯示結果和圖像
if'research_result'in st.session_state:
# 創建分隔線
st.divider()
st.subheader(f"研究結果: {st.session_state.get('research_topic', '未知主題')}")
# 創建兩列以并排顯示
col1, col2 = st.columns([2,1])# 結果占 2/3 寬度,圖像占 1/3 寬度
# 左側列 - 研究結果
with col1:
st.markdown("### ?? 摘要結果")
# 以 markdown 格式顯示結果
result_text = st.session_state['research_result']
pattern = re.compile(r'\x1b\[[\d;]*m')
result_text = pattern.sub('', result_text)
# 為長內容創建可滾動容器
with st.container():
st.markdown(result_text)
# 為結果添加下載按鈕
st.download_button(
label="?? 下載結果為文本",
data=result_text,
file_name=f"research_{st.session_state.get('research_topic', 'topic').replace(' ', '_')}.txt",
mime="text/plain"
)
# 右側列 - 生成的圖像
with col2:
st.markdown("### ?? 生成的圖像")
images = st.session_state.get('latest_images',[])
if images:
st.success(f"找到 {len(images)} 張圖像")
# 垂直堆疊顯示圖像
for idx, image_path in enumerate(images):
try:
# 打開并顯示圖像
img =Image.open(image_path)
st.image(
img,
captinotallow=f"生成: {Path(image_path).name}",
use_container_width=True
)
# 為每張圖像添加下載按鈕
with open(image_path,"rb")as file:
st.download_button(
label=f"?? 下載",
data=file.read(),
file_name=Path(image_path).name,
mime="image/jpeg",
key=f"download_img_{idx}"
)
# 如果有多張圖像,添加圖像間距
if idx < len(images)-1:
st.markdown("---")
exceptExceptionas e:
st.error(f"加載圖像出錯: {str(e)}")
else:
st.info("??? 研究完成后,圖像將顯示在此處。")
with st.expander("?? 關于圖像"):
st.markdown("""
**工作原理:**
-研究期間自動生成圖像
-保存到`servers/images/`文件夾
-按創建時間排序顯示在此處
-每張圖像都有下載按鈕
""")
if __name__ =="__main__":
main()
結果與性能
您將獲得:
? 快速研究:30–60 秒完成全面研究
? 視覺增強:為每個主題生成 AI 圖像
? 結構化輸出:組織良好、可下載的結果
? 深入見解:多源信息綜合
? 用戶友好:直觀的 Web 界面
性能指標:
? 搜索速度:網絡結果約 5–10 秒
? 圖像生成:每張圖像約 15–30 秒
? 摘要創建:全面分析約 20–40 秒
? 整體工作流程:總計約 60–120 秒
未來展望:下一階段的演進
即時增強
? PDF 分析:上傳和分析文檔
? 視頻內容:YouTube 視頻摘要
? 語音界面:語音轉文本研究查詢
? 移動應用:原生 iOS/Android 應用
高級功能
? 知識圖譜:視覺關系映射
? 數據可視化:交互式圖表和圖形
? 引用管理:學術參考生成
? 協作研究:多用戶工作空間
技術改進
? RAG 集成:用于更好上下文的向量數據庫
? 實時更新:實時研究監控
? 個性化:用戶特定偏好
? 企業安全:高級認證
生態系統擴展
? 插件架構:第三方集成
? 分析儀表板:使用洞察和指標
? 教育工具:閃卡、測驗、思維導圖
? 企業版本:團隊協作功能
參考與資源
核心文檔
? CrewAI Framework: https://docs.crewai.com
? Model Context Protocol: https://modelcontextprotocol.io
? Streamlit Documentation: https://docs.streamlit.io
? Brave Search API: https://api.search.brave.com/app/documentation
技術資源
? MCP Python SDK: https://github.com/modelcontextprotocol/python-sdk
? Segmind API Docs: https://docs.segmind.com
? Groq Models Reference: https://console.groq.com/docs/models
? CrewAI MCP details: https://docs.crewai.com/mcp/overview
結論:您的 AI 研究革命現在開始
我們剛剛構建了一個非凡的成果——一個完整的 AI 驅動研究生態系統,展示了智能應用的未來。這不僅僅是代碼,而是關于我們如何與信息交互的轉型。
我們取得的成就:
? ? 無縫集成:MCP 協議將 AI 代理與現實世界工具連接
? ? 美觀界面:現代、響應式的 Web 應用
? ? 實際價值:具有可下載結果的真實研究能力
? ? 可擴展架構:企業級應用的基礎 ? ? 面向未來:采用尖端技術構建
更廣闊的圖景:
該項目展示了多種 AI 技術的融合: ? 多代理系統和諧工作 ? 協議驅動的工具集成 ? 以用戶為中心的設計 ? 現實世界的 API 利用
您的下一步:
1.構建它:遵循我們的實現指南
2. 定制它:添加您自己的功能和樣式
3. 擴展它:為您的團隊或組織部署
4.分享它:為開源社區貢獻
最終思考:
Model Context Protocol 代表了 AI 應用開發的范式轉變。通過標準化 AI 代理與外部工具的交互方式,MCP 打開了無限可能的大門。
您的 Study Assistant 只是開始。有了這個基礎,您可以構建:
? 企業研究平臺
? 教育 AI 導師
? 商業智能儀表板
? 科學研究工具
AI 的未來是協作的、上下文相關的、以用戶為中心的——您現在已經具備構建它的能力。
準備好革命化研究了嗎?立即開始構建您的 MCP 驅動的 Study Assistant,加入 AI 創新的下一波!??