AI Agents-7 | Muti-Agent的架構解析 原創
在當今快速發展的AI領域,多智能體架構正逐漸成為解決復雜任務的強大工具。從簡單的單智能體系統到復雜的多智能體協同,我們看到了AI在任務管理、資源分配和決策效率上的巨大潛力。今天,就讓我們深入探討多智能體架構的魅力,看看它是如何通過不同的模式和策略,為我們的生活和工作帶來變革的。
一、單智能體與多智能體架構:選擇適合的路徑
在AI的世界里,單智能體架構就像是一個“全能型選手”,它試圖用一個智能體來完成所有任務,無論是瀏覽網頁還是處理文件操作。這種架構在任務簡單且明確時非常高效,但隨著任務復雜度的增加和工具數量的增多,單智能體系統往往會陷入困境。比如,當智能體需要處理過多的工具或面對過于復雜的上下文時,它可能會開始犯錯,甚至產生次優或錯誤的結果。
而多智能體架構則像是一個“團隊作戰”的模式,每個智能體專注于自己的領域和工具集。這種架構在面對復雜、動態的用例時表現得尤為出色,尤其是在需要專業知識和協作的場景中。例如,一個軟件開發項目中,可以有規劃者、研究者、數學專家等多個智能體協同工作,每個智能體各司其職,共同完成任務。
二、多智能體架構的模式:解鎖協同的力量
多智能體架構的魅力在于其多樣性和靈活性。不同的任務和場景需要不同的協同模式,下面我們來看看幾種常見的模式。
(一)并行模式:多任務處理的高效方式
并行模式是多智能體架構中最直觀的一種。在這種模式下,多個智能體同時處理任務的不同部分。比如,我們需要對一段文本進行總結、翻譯和情感分析,三個智能體可以同時開始工作,分別完成各自的任務。這種方式大大提高了任務處理的效率,因為它可以充分利用多核處理器的并行計算能力。
代碼示例:
from typing import Dict, Any, TypedDict
from langgraph.graph import StateGraph, END
from langchain_core.runnables import RunnableConfig
from textblob import TextBlob
import re
import time
# 定義狀態
class AgentState(TypedDict):
text: str
summary: str
translation: str
sentiment: str
summary_time: float
translation_time: float
sentiment_time: float
# 總結智能體
def summarize_agent(state: AgentState) -> Dict[str, Any]:
print("總結智能體:正在運行")
start_time = time.time()
try:
text = state["text"]
ifnot text.strip():
return {
"summary": "未提供文本進行總結。",
"summary_time": 0.0
}
time.sleep(2)
sentences = re.split(r'(?<=[.!?]) +', text.strip())
scored_sentences = [(s, len(s.split())) for s in sentences if s]
top_sentences = [s for s, _ in sorted(scored_sentences, key=lambda x: x[1], reverse=True)[:2]]
summary = " ".join(top_sentences) if top_sentences else"文本太短,無法總結。"
processing_time = time.time() - start_time
print(f"總結智能體:完成,耗時 {processing_time:.2f} 秒")
return {
"summary": summary,
"summary_time": processing_time
}
except Exception as e:
return {
"summary": f"總結錯誤:{str(e)}",
"summary_time": 0.0
}
# 翻譯智能體
def translate_agent(state: AgentState) -> Dict[str, Any]:
print("翻譯智能體:正在運行")
start_time = time.time()
try:
text = state["text"]
ifnot text.strip():
return {
"translation": "未提供文本進行翻譯。",
"translation_time": 0.0
}
time.sleep(3)
translation = (
"El nuevo parque en la ciudad es una maravillosa adición. "
"Las familias disfrutan de los espacios abiertos, y a los ni?os les encanta el parque infantil. "
"Sin embargo, algunas personas piensan que el área de estacionamiento es demasiado peque?a."
)
processing_time = time.time() - start_time
print(f"翻譯智能體:完成,耗時 {processing_time:.2f} 秒")
return {
"translation": translation,
"translation_time": processing_time
}
except Exception as e:
return {
"translation": f"翻譯錯誤:{str(e)}",
"translation_time": 0.0
}
# 情感分析智能體
def sentiment_agent(state: AgentState) -> Dict[str, Any]:
print("情感分析智能體:正在運行")
start_time = time.time()
try:
text = state["text"]
ifnot text.strip():
return {
"sentiment": "未提供文本進行情感分析。",
"sentiment_time": 0.0
}
time.sleep(1.5)
blob = TextBlob(text)
polarity = blob.sentiment.polarity
subjectivity = blob.sentiment.subjectivity
sentiment = "Positive"if polarity > 0else"Negative"if polarity < 0else"Neutral"
result = f"{sentiment} (Polarity: {polarity:.2f}, Subjectivity: {subjectivity:.2f})"
processing_time = time.time() - start_time
print(f"情感分析智能體:完成,耗時 {processing_time:.2f} 秒")
return {
"sentiment": result,
"sentiment_time": processing_time
}
except Exception as e:
return {
"sentiment": f"情感分析錯誤:{str(e)}",
"sentiment_time": 0.0
}
# 合并節點
def join_parallel_results(state: AgentState) -> AgentState:
return state
# 構建圖
def build_parallel_graph() -> StateGraph:
workflow = StateGraph(AgentState)
parallel_branches = {
"summarize_node": summarize_agent,
"translate_node": translate_agent,
"sentiment_node": sentiment_agent
}
for name, agent in parallel_branches.items():
workflow.add_node(name, agent)
workflow.add_node("branch", lambda state: state)
workflow.add_node("join", join_parallel_results)
workflow.set_entry_point("branch")
for name in parallel_branches:
workflow.add_edge("branch", name)
workflow.add_edge(name, "join")
workflow.add_edge("join", END)
return workflow.compile()
# 主函數
def main():
text = (
"The new park in the city is a wonderful addition. Families are enjoying the open spaces, "
"and children love the playground. However, some people think the parking area is too small."
)
initial_state: AgentState = {
"text": text,
"summary": "",
"translation": "",
"sentiment": "",
"summary_time": 0.0,
"translation_time": 0.0,
"sentiment_time": 0.0
}
print("\n構建新圖...")
app = build_parallel_graph()
print("\n開始并行處理...")
start_time = time.time()
config = RunnableConfig(parallel=True)
result = app.invoke(initial_state, cnotallow=config)
total_time = time.time() - start_time
print("\n=== 并行任務結果 ===")
print(f"輸入文本:\n{text}\n")
print(f"總結:\n{result['summary']}\n")
print(f"翻譯(西班牙語):\n{result['translation']}\n")
print(f"情感分析:\n{result['sentiment']}\n")
print("\n=== 處理時間 ===")
processing_times = {
"summary": result["summary_time"],
"translation": result["translation_time"],
"sentiment": result["sentiment_time"]
}
for agent, time_taken in processing_times.items():
print(f"{agent.capitalize()}: {time_taken:.2f} 秒")
print(f"\n總耗時: {total_time:.2f} 秒")
print(f"各任務總耗時: {sum(processing_times.values()):.2f} 秒")
print(f"并行節省時間: {sum(processing_times.values()) - total_time:.2f} 秒")
if __name__ == "__main__":
main()
(二)順序模式:按部就班的協同
順序模式則是一種“接力棒”式的協同方式。在這種模式下,任務按照一定的順序依次傳遞給不同的智能體,每個智能體的輸出成為下一個智能體的輸入。比如,在一個多步驟的審批流程中,團隊負責人先審批,然后是部門經理,最后是財務總監。這種方式適合那些需要嚴格順序執行的任務。
代碼示例:
from typing import Dict
from langgraph.graph import StateGraph, MessagesState, END
from langchain_core.runnables import RunnableConfig
from langchain_core.messages import HumanMessage, AIMessage
import json
# 團隊負責人
def team_lead_agent(state: MessagesState, config: RunnableConfig) -> Dict:
print("團隊負責人:開始審批")
messages = state["messages"]
proposal = json.loads(messages[0].content)
title = proposal.get("title", "")
amount = proposal.get("amount", 0.0)
ifnot title or amount <= 0:
status = "Rejected"
comment = "團隊負責人:提案因缺少標題或金額無效而被拒絕。"
goto = END
else:
status = "Approved by Team Lead"
comment = "團隊負責人:提案完整且已批準。"
goto = "dept_manager"
print(f"團隊負責人:審批完成 - {status}")
messages.append(AIMessage(
cnotallow=json.dumps({"status": status, "comment": comment}),
additional_kwargs={"agent": "team_lead", "goto": goto}
))
return {"messages": messages}
# 部門經理
def dept_manager_agent(state: MessagesState, config: RunnableConfig) -> Dict:
print("部門經理:開始審批")
messages = state["messages"]
team_lead_msg = next((m for m in messages if m.additional_kwargs.get("agent") == "team_lead"), None)
proposal = json.loads(messages[0].content)
amount = proposal.get("amount", 0.0)
if json.loads(team_lead_msg.content)["status"] != "Approved by Team Lead":
status = "Rejected"
comment = "部門經理:因團隊負責人拒絕而跳過。"
goto = END
elif amount > 100000:
status = "Rejected"
comment = "部門經理:預算超出限制。"
goto = END
else:
status = "Approved by Department Manager"
comment = "部門經理:預算在限制范圍內。"
goto = "finance_director"
print(f"部門經理:審批完成 - {status}")
messages.append(AIMessage(
cnotallow=json.dumps({"status": status, "comment": comment}),
additional_kwargs={"agent": "dept_manager", "goto": goto}
))
return {"messages": messages}
# 財務總監
def finance_director_agent(state: MessagesState, config: RunnableConfig) -> Dict:
print("財務總監:開始審批")
messages = state["messages"]
dept_msg = next((m for m in messages if m.additional_kwargs.get("agent") == "dept_manager"), None)
proposal = json.loads(messages[0].content)
amount = proposal.get("amount", 0.0)
if json.loads(dept_msg.content)["status"] != "Approved by Department Manager":
status = "Rejected"
comment = "財務總監:因部門經理拒絕而跳過。"
elif amount > 50000:
status = "Rejected"
comment = "財務總監:預算不足。"
else:
status = "Approved"
comment = "財務總監:批準且可行。"
print(f"財務總監:審批完成 - {status}")
messages.append(AIMessage(
cnotallow=json.dumps({"status": status, "comment": comment}),
additional_kwargs={"agent": "finance_director", "goto": END}
))
return {"messages": messages}
# 路由函數
def route_step(state: MessagesState) -> str:
for msg in reversed(state["messages"]):
goto = msg.additional_kwargs.get("goto")
if goto:
print(f"路由:智能體 {msg.additional_kwargs.get('agent')} 設置跳轉到 {goto}")
return goto
return END
# 構建 LangGraph
builder = StateGraph(MessagesState)
builder.add_node("team_lead", team_lead_agent)
builder.add_node("dept_manager", dept_manager_agent)
builder.add_node("finance_director", finance_director_agent)
builder.set_entry_point("team_lead")
builder.add_conditional_edges("team_lead", route_step, {
"dept_manager": "dept_manager",
END: END
})
builder.add_conditional_edges("dept_manager", route_step, {
"finance_director": "finance_director",
END: END
})
builder.add_conditional_edges("finance_director", route_step, {
END: END
})
workflow = builder.compile()
# 主運行器
def main():
initial_state = {
"messages": [
HumanMessage(
cnotallow=json.dumps({
"title": "New Equipment Purchase",
"amount": 40000.0,
"department": "Engineering"
})
)
]
}
result = workflow.invoke(initial_state)
messages = result["messages"]
proposal = json.loads(messages[0].content)
print("\n=== 審批結果 ===")
print(f"提案標題:{proposal['title']}")
final_status = "Unknown"
comments = []
for msg in messages[1:]:
if isinstance(msg, AIMessage):
try:
data = json.loads(msg.content)
if"status"in data:
final_status = data["status"]
if"comment"in data:
comments.append(data["comment"])
except Exception:
continue
print(f"最終狀態:{final_status}")
print("評論:")
for comment in comments:
print(f" - {comment}")
if __name__ == "__main__":
main()
(三)循環模式:持續改進的協同
循環模式則像是一個“精益求精”的過程。在這種模式下,智能體們會不斷地迭代,根據其他智能體的反饋來改進自己的輸出。比如,在代碼編寫和測試的場景中,代碼編寫智能體會根據測試智能體的反饋不斷優化代碼,直到代碼通過所有測試為止。
代碼示例:
from typing import Dict, Any, List
from langgraph.graph import StateGraph, END
from langchain_core.runnables import RunnableConfig
import textwrap
# 定義狀態
class EvaluationState(Dict[str, Any]):
code: str = ""
feedback: str = ""
passed: bool = False
iteration: int = 0
max_iterations: int = 3
history: List[Dict] = []
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.setdefault("code", "")
self.setdefault("feedback", "")
self.setdefault("passed", False)
self.setdefault("iteration", 0)
self.setdefault("max_iterations", 3)
self.setdefault("history", [])
# 代碼編寫智能體
def code_writer_agent(state: EvaluationState, config: RunnableConfig) -> Dict[str, Any]:
print(f"迭代 {state['iteration'] + 1} - 代碼編寫智能體:生成代碼")
print(f"迭代 {state['iteration'] + 1} - 代碼編寫智能體:收到反饋:{state['feedback']}")
iteration = state["iteration"] + 1
feedback = state["feedback"]
if iteration == 1:
code = textwrap.dedent("""
def factorial(n):
result = 1
for i in range(1, n + 1):
result *= i
return result
""")
writer_feedback = "初始代碼已生成。"
elif"factorial(0)"in feedback.lower():
code = textwrap.dedent("""
def factorial(n):
if n == 0:
return 1
result = 1
for i in range(1, n + 1):
result *= i
return result
""")
writer_feedback = "已修復 n=0 的處理。"
elif"factorial(-1)"in feedback.lower() or"negative"in feedback.lower():
code = textwrap.dedent("""
def factorial(n):
if n < 0:
raise ValueError("Factorial not defined for negative numbers")
if n == 0:
return 1
result = 1
for i in range(1, n + 1):
result *= i
return result
""")
writer_feedback = "已添加對負數輸入的錯誤處理。"
else:
code = state["code"]
writer_feedback = "未發現進一步改進的內容。"
print(f"迭代 {iteration} - 代碼編寫智能體:代碼生成完成")
return {
"code": code,
"feedback": writer_feedback,
"iteration": iteration
}
# 代碼測試智能體
def code_tester_agent(state: EvaluationState, config: RunnableConfig) -> Dict[str, Any]:
print(f"迭代 {state['iteration']} - 代碼測試智能體:測試代碼")
code = state["code"]
try:
test_cases = [
(0, 1),
(1, 1),
(5, 120),
(-1, None)
]
namespace = {}
exec(code, namespace)
factorial = namespace.get('factorial')
ifnot callable(factorial):
return {"passed": False, "feedback": "未找到階乘函數。"}
feedback_parts = []
passed = True
for input_val, expected in test_cases:
try:
result = factorial(input_val)
if expected isNone:
passed = False
feedback_parts.append(f"測試失敗:factorial({input_val}) 應該拋出錯誤。")
elif result != expected:
passed = False
feedback_parts.append(f"測試失敗:factorial({input_val}) 返回 {result},預期為 {expected}。")
except ValueError as ve:
if expected isnotNone:
passed = False
feedback_parts.append(f"測試失敗:factorial({input_val}) 意外拋出 ValueError:{str(ve)}")
except Exception as e:
passed = False
feedback_parts.append(f"測試失敗:factorial({input_val}) 拋出錯誤:{str(e)}")
feedback = "所有測試通過!"if passed else"\n".join(feedback_parts)
print(f"迭代 {state['iteration']} - 代碼測試智能體:測試完成 - {'通過' if passed else '失敗'}")
history = state["history"]
history.append({
"iteration": state["iteration"],
"code": code,
"feedback": feedback,
"passed": passed
})
return {
"passed": passed,
"feedback": feedback,
"history": history
}
except Exception as e:
print(f"迭代 {state['iteration']} - 代碼測試智能體:測試失敗")
return {"passed": False, "feedback": f"測試錯誤:{str(e)}"}
# 條件邊:決定是否繼續循環
def should_continue(state: EvaluationState) -> str:
if state["passed"] or state["iteration"] >= state["max_iterations"]:
print(f"迭代 {state['iteration']} - {'循環停止:測試通過' if state['passed'] else '循環停止:達到最大迭代次數'}")
return"end"
print(f"迭代 {state['iteration']} - 循環繼續:測試失敗")
return"code_writer"
# 構建 LangGraph 工作流
workflow = StateGraph(EvaluationState)
workflow.add_node("code_writer", code_writer_agent)
workflow.add_node("code_tester", code_tester_agent)
workflow.set_entry_point("code_writer")
workflow.add_edge("code_writer", "code_tester")
workflow.add_conditional_edges(
"code_tester",
should_continue,
{
"code_writer": "code_writer",
"end": END
}
)
app = workflow.compile()
# 運行工作流
def main():
initial_state = EvaluationState()
result = app.invoke(initial_state)
print("\n=== 評估結果 ===")
print(f"最終狀態:{'通過' if result['passed'] else '失敗'},經過 {result['iteration']} 次迭代")
print(f"最終代碼:\n{result['code']}")
print(f"最終反饋:\n{result['feedback']}")
print("\n迭代歷史:")
for attempt in result["history"]:
print(f"迭代 {attempt['iteration']}:")
print(f" 代碼:\n{attempt['code']}")
print(f" 反饋:{attempt['feedback']}")
print(f" 是否通過:{attempt['passed']}\n")
if __name__ == "__main__":
main()
(四)路由器模式:智能分配任務
路由器模式則像是一個“任務分發中心”。在這種模式下,一個中央路由器智能體根據任務或輸入的內容,決定調用哪些智能體。比如,在一個客戶支持系統中,路由器可以根據客戶的問題內容,將問題分配給不同的支持團隊,如賬單團隊、技術支持團隊或普通咨詢團隊。
代碼示例:
from typing import Dict, Any, TypedDict, Literal
from langgraph.graph import StateGraph, END
from langchain_core.runnables import RunnableConfig
import re
import time
# 定義狀態
class TicketState(TypedDict):
ticket_text: str
category: str
resolution: str
processing_time: float
# 路由器智能體
def router_agent(state: TicketState) -> Dict[str, Any]:
print("路由器智能體:分析工單...")
start_time = time.time()
ticket_text = state["ticket_text"].lower()
if any(keyword in ticket_text for keyword in ["billing", "payment", "invoice", "charge"]):
category = "Billing"
elif any(keyword in ticket_text for keyword in ["technical", "bug", "error", "crash"]):
category = "Technical"
elif any(keyword in ticket_text for keyword in ["general", "question", "inquiry", "info"]):
category = "General"
else:
category = "Unknown"
processing_time = time.time() - start_time
print(f"路由器智能體:分類為 '{category}',耗時 {processing_time:.2f} 秒")
return {
"category": category,
"processing_time": processing_time
}
# 賬單團隊
def billing_team_agent(state: TicketState) -> Dict[str, Any]:
print("賬單團隊:處理工單...")
start_time = time.time()
ticket_text = state["ticket_text"]
resolution = f"賬單團隊:已審核工單 '{ticket_text}'。請檢查您的賬單詳情或聯系我們的賬單部門以獲取進一步幫助。"
processing_time = time.time() - start_time
time.sleep(1)
print(f"賬單團隊:完成,耗時 {processing_time:.2f} 秒")
return {
"resolution": resolution,
"processing_time": state["processing_time"] + processing_time
}
# 技術支持團隊
def technical_team_agent(state: TicketState) -> Dict[str, Any]:
print("技術支持團隊:處理工單...")
start_time = time.time()
ticket_text = state["ticket_text"]
resolution = f"技術支持團隊:已審核工單 '{ticket_text}'。請嘗試重啟您的設備或提交詳細的錯誤日志以供進一步調查。"
processing_time = time.time() - start_time
time.sleep(1.5)
print(f"技術支持團隊:完成,耗時 {processing_time:.2f} 秒")
return {
"resolution": resolution,
"processing_time": state["processing_time"] + processing_time
}
# 普通咨詢團隊
def general_team_agent(state: TicketState) -> Dict[str, Any]:
print("普通咨詢團隊:處理工單...")
start_time = time.time()
ticket_text = state["ticket_text"]
resolution = f"普通咨詢團隊:已審核工單 '{ticket_text}'。更多信息,請參考我們的常見問題解答或通過電子郵件聯系我們。"
processing_time = time.time() - start_time
time.sleep(0.8)
print(f"普通咨詢團隊:完成,耗時 {processing_time:.2f} 秒")
return {
"resolution": resolution,
"processing_time": state["processing_time"] + processing_time
}
# 手動審核團隊
def manual_review_agent(state: TicketState) -> Dict[str, Any]:
print("手動審核團隊:處理工單...")
start_time = time.time()
ticket_text = state["ticket_text"]
resolution = f"手動審核:工單 '{ticket_text}' 無法分類。標記為人工審核。請手動分配到適當的團隊。"
processing_time = time.time() - start_time
time.sleep(0.5)
print(f"手動審核團隊:完成,耗時 {processing_time:.2f} 秒")
return {
"resolution": resolution,
"processing_time": state["processing_time"] + processing_time
}
# 路由函數
def route_ticket(state: TicketState) -> Literal["billing_team", "technical_team", "general_team", "manual_review"]:
category = state["category"]
print(f"路由:工單分類為 '{category}'")
if category == "Billing":
return"billing_team"
elif category == "Technical":
return"technical_team"
elif category == "General":
return"general_team"
else:
return"manual_review"
# 構建路由器模式的圖
def build_router_graph() -> StateGraph:
workflow = StateGraph(TicketState)
workflow.add_node("router", router_agent)
workflow.add_node("billing_team", billing_team_agent)
workflow.add_node("technical_team", technical_team_agent)
workflow.add_node("general_team", general_team_agent)
workflow.add_node("manual_review", manual_review_agent)
workflow.set_entry_point("router")
workflow.add_conditional_edges(
"router",
route_ticket,
{
"billing_team": "billing_team",
"technical_team": "technical_team",
"general_team": "general_team",
"manual_review": "manual_review"
}
)
workflow.add_edge("billing_team", END)
workflow.add_edge("technical_team", END)
workflow.add_edge("general_team", END)
workflow.add_edge("manual_review", END)
return workflow.compile()
# 運行工作流
def main():
test_tickets = [
"I have a billing issue with my last invoice. It seems I was overcharged.",
"My app keeps crashing with a technical error. Please help!",
"I have a general question about your services. Can you provide more info?",
"I need assistance with something unrelated to billing or technical issues."
]
for ticket_text in test_tickets:
initial_state: TicketState = {
"ticket_text": ticket_text,
"category": "",
"resolution": "",
"processing_time": 0.0
}
print(f"\n=== 處理工單:'{ticket_text}' ===")
app = build_router_graph()
start_time = time.time()
result = app.invoke(initial_state, cnotallow=RunnableConfig())
total_time = time.time() - start_time
print("\n=== 工單結果 ===")
print(f"分類:{result['category']}")
print(f"解決方案:{result['resolution']}")
print(f"總處理時間:{result['processing_time']:.2f} 秒")
print(f"總耗時:{total_time:.2f} 秒")
print("-" * 50)
if __name__ == "__main__":
main()
(五)聚合模式:整合輸出
聚合模式則像是一個“信息匯總中心”。在這種模式下,多個智能體分別完成自己的任務后,將輸出傳遞給一個聚合智能體,由聚合智能體將所有結果整合成一個最終結果。比如,在社交媒體情感分析中,可以有多個智能體分別收集不同平臺的帖子并進行情感分析,最后由聚合智能體生成一份綜合報告。
代碼示例:
from typing import Dict, Any, TypedDict, List
from langgraph.graph import StateGraph, END
from langchain_core.runnables import RunnableConfig
from textblob import TextBlob
import time
from typing_extensions import Annotated
from operator import add
# 定義狀態
class SocialMediaState(TypedDict):
twitter_posts: List[str]
instagram_posts: List[str]
reddit_posts: List[str]
twitter_sentiment: Dict[str, float]
instagram_sentiment: Dict[str, float]
reddit_sentiment: Dict[str, float]
final_report: str
processing_time: Annotated[float, add]
# 收集推特帖子
def collect_twitter_posts(state: SocialMediaState) -> Dict[str, Any]:
print("推特智能體:收集帖子...")
start_time = time.time()
posts = [
"Loving the new product from this brand! Amazing quality.",
"Terrible customer service from this brand. Very disappointed."
]
time.sleep(1)
processing_time = time.time() - start_time
print(f"推特智能體:完成,耗時 {processing_time:.2f} 秒")
return {
"twitter_posts": posts,
"processing_time": processing_time
}
# 收集 Instagram 帖子
def collect_instagram_posts(state: SocialMediaState) -> Dict[str, Any]:
print("Instagram 智能體:收集帖子...")
start_time = time.time()
posts = [
"Beautiful design by this brand! #loveit",
"Not impressed with the latest release. Expected better."
]
time.sleep(1.2)
processing_time = time.time() - start_time
print(f"Instagram 智能體:完成,耗時 {processing_time:.2f} 秒")
return {
"instagram_posts": posts,
"processing_time": processing_time
}
# 收集 Reddit 帖子
def collect_reddit_posts(state: SocialMediaState) -> Dict[str, Any]:
print("Reddit 智能體:收集帖子...")
start_time = time.time()
posts = [
"This brand is awesome! Great value for money.",
"Had a bad experience with their support team. Not happy."
]
time.sleep(0.8)
processing_time = time.time() - start_time
print(f"Reddit 智能體:完成,耗時 {processing_time:.2f} 秒")
return {
"reddit_posts": posts,
"processing_time": processing_time
}
# 分析推特情感
def analyze_twitter_sentiment(state: SocialMediaState) -> Dict[str, Any]:
print("推特情感智能體:分析情感...")
start_time = time.time()
posts = state["twitter_posts"]
polarities = [TextBlob(post).sentiment.polarity for post in posts]
avg_polarity = sum(polarities) / len(polarities) if polarities else0.0
time.sleep(0.5)
processing_time = time.time() - start_time
print(f"推特情感智能體:完成,耗時 {processing_time:.2f} 秒")
return {
"twitter_sentiment": {"average_polarity": avg_polarity, "num_posts": len(posts)},
"processing_time": processing_time
}
# 分析 Instagram 情感
def analyze_instagram_sentiment(state: SocialMediaState) -> Dict[str, Any]:
print("Instagram 情感智能體:分析情感...")
start_time = time.time()
posts = state["instagram_posts"]
polarities = [TextBlob(post).sentiment.polarity for post in posts]
avg_polarity = sum(polarities) / len(polarities) if polarities else0.0
time.sleep(0.6)
processing_time = time.time() - start_time
print(f"Instagram 情感智能體:完成,耗時 {processing_time:.2f} 秒")
return {
"instagram_sentiment": {"average_polarity": avg_polarity, "num_posts": len(posts)},
"processing_time": processing_time
}
# 分析 Reddit 情感
def analyze_reddit_sentiment(state: SocialMediaState) -> Dict[str, Any]:
print("Reddit 情感智能體:分析情感...")
start_time = time.time()
posts = state["reddit_posts"]
polarities = [TextBlob(post).sentiment.polarity for post in posts]
avg_polarity = sum(polarities) / len(polarities) if polarities else0.0
time.sleep(0.4)
processing_time = time.time() - start_time
print(f"Reddit 情感智能體:完成,耗時 {processing_time:.2f} 秒")
return {
"reddit_sentiment": {"average_polarity": avg_polarity, "num_posts": len(posts)},
"processing_time": processing_time
}
# 聚合結果
def aggregate_results(state: SocialMediaState) -> Dict[str, Any]:
print("聚合智能體:生成最終報告...")
start_time = time.time()
twitter_sentiment = state["twitter_sentiment"]
instagram_sentiment = state["instagram_sentiment"]
reddit_sentiment = state["reddit_sentiment"]
total_posts = (twitter_sentiment["num_posts"] +
instagram_sentiment["num_posts"] +
reddit_sentiment["num_posts"])
weighted_polarity = (
twitter_sentiment["average_polarity"] * twitter_sentiment["num_posts"] +
instagram_sentiment["average_polarity"] * instagram_sentiment["num_posts"] +
reddit_sentiment["average_polarity"] * reddit_sentiment["num_posts"]
) / total_posts if total_posts > 0else0.0
overall_sentiment = ("Positive"if weighted_polarity > 0else
"Negative"if weighted_polarity < 0else"Neutral")
report = (
f"總體情感:{overall_sentiment} (平均極性:{weighted_polarity:.2f})\n"
f"推特情感:{twitter_sentiment['average_polarity']:.2f} (帖子數:{twitter_sentiment['num_posts']})\n"
f"Instagram 情感:{instagram_sentiment['average_polarity']:.2f} (帖子數:{instagram_sentiment['num_posts']})\n"
f"Reddit 情感:{reddit_sentiment['average_polarity']:.2f} (帖子數:{reddit_sentiment['num_posts']})"
)
time.sleep(0.3)
processing_time = time.time() - start_time
print(f"聚合智能體:完成,耗時 {processing_time:.2f} 秒")
return {
"final_report": report,
"processing_time": processing_time
}
# 構建聚合模式的圖
def build_aggregator_graph() -> StateGraph:
workflow = StateGraph(SocialMediaState)
workflow.add_node("collect_twitter", collect_twitter_posts)
workflow.add_node("collect_instagram", collect_instagram_posts)
workflow.add_node("collect_reddit", collect_reddit_posts)
workflow.add_node("analyze_twitter", analyze_twitter_sentiment)
workflow.add_node("analyze_instagram", analyze_instagram_sentiment)
workflow.add_node("analyze_reddit", analyze_reddit_sentiment)
workflow.add_node("aggregate", aggregate_results)
workflow.add_node("branch", lambda state: state)
workflow.set_entry_point("branch")
workflow.add_edge("branch", "collect_twitter")
workflow.add_edge("branch", "collect_instagram")
workflow.add_edge("branch", "collect_reddit")
workflow.add_edge("collect_twitter", "analyze_twitter")
workflow.add_edge("collect_instagram", "analyze_instagram")
workflow.add_edge("collect_reddit", "analyze_reddit")
workflow.add_edge("analyze_twitter", "aggregate")
workflow.add_edge("analyze_instagram", "aggregate")
workflow.add_edge("analyze_reddit", "aggregate")
workflow.add_edge("aggregate", END)
return workflow.compile()
# 運行工作流
def main():
initial_state: SocialMediaState = {
"twitter_posts": [],
"instagram_posts": [],
"reddit_posts": [],
"twitter_sentiment": {"average_polarity": 0.0, "num_posts": 0},
"instagram_sentiment": {"average_polarity": 0.0, "num_posts": 0},
"reddit_sentiment": {"average_polarity": 0.0, "num_posts": 0},
"final_report": "",
"processing_time": 0.0
}
print("\n開始社交媒體情感分析...")
app = build_aggregator_graph()
start_time = time.time()
config = RunnableConfig(parallel=True)
result = app.invoke(initial_state, cnotallow=config)
total_time = time.time() - start_time
print("\n=== 情感分析結果 ===")
print(result["final_report"])
print(f"\n總處理時間:{result['processing_time']:.2f} 秒")
print(f"總耗時:{total_time:.2f} 秒")
if __name__ == "__main__":
main()
(六)網絡模式(Network or Horizontal)
在多智能體系統中,網絡模式是一種去中心化的架構,智能體之間以多對多的方式直接通信,形成一個分散的網絡。這種架構非常適合那些沒有明確智能體層級或調用順序的任務。
優點與挑戰
優點:
- 分布式協作:每個智能體都可以獨立運行,即使部分智能體失敗,系統仍然可以正常工作。
- 群體決策:智能體之間可以相互協作,共同做出決策。
挑戰:
- 通信管理:智能體之間的通信可能會變得復雜,導致效率低下。
- 重復工作:如果沒有良好的協調機制,智能體可能會重復執行相同的工作。
代碼示例:
from typing import Literal
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.types import Command
model = ChatOpenAI()
# 定義智能體 1
def agent_1(state: MessagesState) -> Command[Literal["agent_2", "agent_3", END]]:
response = model.invoke(state["messages"])
next_agent = response.get("next_agent", END)
return Command(
goto=next_agent,
update={"messages": [response["content"]]}
)
# 定義智能體 2
def agent_2(state: MessagesState) -> Command[Literal["agent_1", "agent_3", END]]:
response = model.invoke(state["messages"])
next_agent = response.get("next_agent", END)
return Command(
goto=next_agent,
update={"messages": [response["content"]]}
)
# 定義智能體 3
def agent_3(state: MessagesState) -> Command[Literal["agent_1", "agent_2", END]]:
response = model.invoke(state["messages"])
next_agent = response.get("next_agent", END)
return Command(
goto=next_agent,
update={"messages": [response["content"]]}
)
# 構建圖
builder = StateGraph(MessagesState)
builder.add_node(agent_1)
builder.add_node(agent_2)
builder.add_node(agent_3)
builder.add_edge(START, "agent_1")
network = builder.compile()
(七)交接模式(Handoffs)
在多智能體架構中,交接模式是一種常見的交互方式,其中一個智能體在完成自己的任務后,將控制權交給另一個智能體。這種模式允許智能體在執行過程中靈活地將任務傳遞給其他智能體,甚至可以返回到自己。
代碼示例:
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.types import Command
# 定義一個智能體
def agent(state: MessagesState) -> Command[Literal["another_agent"]]:
# 假設 get_next_agent 是一個函數,用于決定下一個智能體
next_agent = get_next_agent(state)
return Command(
goto=next_agent,
update={"messages": ["Message from current agent"]}
)
(八)監督者模式(Supervisor)
監督者模式是一種集中式的架構,其中有一個中央監督者(Supervisor)智能體,負責決定下一個調用的智能體。這種模式適合需要集中管理和協調的場景。
代碼示例:
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.types import Command
model = ChatOpenAI()
# 定義監督者
def supervisor(state: MessagesState) -> Command[Literal["agent_1", "agent_2", END]]:
response = model.invoke(state["messages"])
next_agent = response.get("next_agent", END)
return Command(goto=next_agent)
# 定義智能體 1
def agent_1(state: MessagesState) -> Command[Literal["supervisor"]]:
response = model.invoke(state["messages"])
return Command(goto="supervisor", update={"messages": [response]})
# 定義智能體 2
def agent_2(state: MessagesState) -> Command[Literal["supervisor"]]:
response = model.invoke(state["messages"])
return Command(goto="supervisor", update={"messages": [response]})
# 構建圖
builder = StateGraph(MessagesState)
builder.add_node(supervisor)
builder.add_node(agent_1)
builder.add_node(agent_2)
builder.add_edge(START, "supervisor")
supervisor_graph = builder.compile()
(九)層次化模式(Hierarchical)
層次化模式是一種樹狀結構,其中高級智能體(監督者)管理低級智能體。這種架構適合大型系統,可以清晰地劃分角色和職責。
代碼示例:
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.types import Command
model = ChatOpenAI()
# 定義團隊 1 的監督者
def team_1_supervisor(state: MessagesState) -> Command[Literal["team_1_agent_1", "team_1_agent_2", END]]:
response = model.invoke(state["messages"])
next_agent = response.get("next_agent", END)
return Command(goto=next_agent)
# 定義團隊 1 的智能體 1
def team_1_agent_1(state: MessagesState) -> Command[Literal["team_1_supervisor"]]:
response = model.invoke(state["messages"])
return Command(goto="team_1_supervisor", update={"messages": [response]})
# 定義團隊 1 的智能體 2
def team_1_agent_2(state: MessagesState) -> Command[Literal["team_1_supervisor"]]:
response = model.invoke(state["messages"])
return Command(goto="team_1_supervisor", update={"messages": [response]})
# 構建團隊 1 的圖
team_1_builder = StateGraph(MessagesState)
team_1_builder.add_node(team_1_supervisor)
team_1_builder.add_node(team_1_agent_1)
team_1_builder.add_node(team_1_agent_2)
team_1_builder.add_edge(START, "team_1_supervisor")
team_1_graph = team_1_builder.compile()
# 定義頂層監督者
def top_level_supervisor(state: MessagesState) -> Command[Literal["team_1_graph", END]]:
response = model.invoke(state["messages"])
next_team = response.get("next_team", END)
return Command(goto=next_team)
# 構建頂層圖
top_builder = StateGraph(MessagesState)
top_builder.add_node(top_level_supervisor)
top_builder.add_node("team_1_graph", team_1_graph)
top_builder.add_edge(START, "top_level_supervisor")
top_builder.add_edge("team_1_graph", "top_level_supervisor")
top_graph = top_builder.compile()
(十)自定義工作流(Custom Multi-Agent Workflow)
自定義工作流允許開發者根據具體需求定義智能體之間的調用順序。這種模式結合了顯式控制流和動態控制流的優點,提供了更高的靈活性。
代碼示例:
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, MessagesState, START
model = ChatOpenAI()
# 定義智能體 1
def agent_1(state: MessagesState):
response = model.invoke(state["messages"])
return {"messages": [response]}
# 定義智能體 2
def agent_2(state: MessagesState):
response = model.invoke(state["messages"])
return {"messages": [response]}
# 構建圖
builder = StateGraph(MessagesState)
builder.add_node(agent_1)
builder.add_node(agent_2)
builder.add_edge(START, "agent_1")
builder.add_edge("agent_1", "agent_2")
custom_workflow = builder.compile()
三、智能體之間的通信:協同的關鍵
在多智能體系統中,智能體之間的通信是協同工作的核心。我們需要考慮以下幾個問題:
- 通過圖狀態還是工具調用通信?在大多數架構中,智能體通過圖狀態通信。而在工具調用的架構中,通信的內容則是工具調用的參數。
- 不同狀態模式的智能體如何通信?如果一個智能體需要與其他智能體有不同的狀態模式,可以通過定義子圖智能體或具有私有輸入狀態模式的智能體節點來實現。
- 共享消息列表如何通信?最常見的通信方式是通過共享狀態通道(通常是消息列表)。智能體可以選擇共享完整的思考過程(“草稿”),也可以只共享最終結果。
四、多智能體架構的優勢與挑戰
多智能體架構為我們解決復雜任務提供了強大的工具,但也帶來了新的挑戰。它通過并行、順序、路由器和聚合等多種工作流模式,實現了高效的協同工作。然而,隨著智能體數量的增加和任務復雜度的提升,如何管理智能體之間的通信、避免重復工作以及確保系統的可擴展性,成為了我們需要面對的問題。
五、結論
多智能體架構為我們提供了一個全新的視角來解決復雜任務。通過不同的協同模式和通信機制,我們可以構建出高效、靈活且可擴展的系統。無論是并行處理、順序執行、智能路由還是結果聚合,多智能體架構都能根據具體需求提供合適的解決方案。在未來,隨著AI技術的不斷發展,多智能體架構必將在更多領域發揮更大的作用,為我們的生活和工作帶來更多的便利和效率。
本文轉載自??Halo咯咯?? 作者:基咯咯
