譯者 | 核子可樂
審校 | 重樓
將多個大語言模型集成至應用程序當中往往是項艱巨的挑戰,各類不同API及通信協議的協同處理,以及如何確保請求路由的復雜性難題往往令人望而生畏。
好在可以使用消息代理與路由機制更優雅地解決此類問題,在解決痛點的同時實現多個關鍵優勢。
本文將向大家介紹具體操作步驟。這里以KubeMQ為例,配合代碼示例來指導大家逐步建立一套可與OpenAI及Anthropic Claude交互的路由體系。
使用消息代理作為大模型路由工具的主要優勢
1. 簡化集成
通過使用消息代理作為路由機制,我們可以將不同大模型API交互所涉及的復雜性抽象出來,從而簡化客戶端代碼并降低出錯幾率。
2. 多模型用例
消息代理能夠實現多模型或專門用于不同任務的模型間的通信(如一個模型用于摘要,另一模型用于情緒分析)。其可以確保請求被有效路由至適當模型,使得應用程序能夠利用各模型的優勢且無需額外開銷。
3. 批處理與大規模推理
對于需要批處理或大規模推理任務的應用程序,消息代理通過在大模型繁忙或不可用時建立請求隊列,從而實現異步處理。這將確保不會丟失任何數據或請求,即使是在繁重的工作負載下也能提供可靠的處理響應。
4. 冗余與回退保證
對于特別關注正常運行時間的用例,消息代理可確保無縫回退至替代環境。例如,如果與提供OpenAI模型的某家云服務商發生連接失敗,KubeMQ可自動切換至另一服務商。這樣的冗余設計保證AI不間斷操作,有助于增強服務可靠性與客戶滿意度。
5. 處理高流量應用程序
消息代理能夠將傳入的請求分發至多個大模型實例或副本,防止過載并確保平衡運行。這種負載均衡設計對于高流量應用程序至關重要,可使其在不影響性能的前提下有效擴展。
使用KubeMQ建立大模型路由機制:集成OpenAI與Claude
現在,我們將分步了解如何使用KubeMQ設置能夠與OpenAI和Anthropic Claude交互的路由機制。
全部示例代碼均保存在KubeMQ的GitHub repo當中(https://github.com/kubemq-io/kubemq-llm-router)。
準備工作
在開始之前,請確保你已安裝以下內容:
- Python 3.7或更高版本。
- 本地安裝Docker。
- 擁有有效的OpenAI和Anthropic API密鑰。
- KubeMQ令牌(可從KubeMQ官網處獲取)。
- kubemq-cq Python包:
Plain Text
pip install kubemq-cq
- .env文件中包含你的AIP密鑰:
Plain Text
OPENAI_API_KEY=your_openai_api_key
ANTHROPIC_API_KEY=your_anthropic_api_key
設置KubeMQ
首先,我們需要確保KubeMQ能夠正常運行。這里使用Docker進行部署:
Shell
docker run -d --rm \
-p 8080:8080 \
-p 50000:50000 \
-p 9090:9090 \
-e KUBEMQ_TOKEN="your_token" \
kubemq/kubemq-community:latest
端口說明:
- 8080 – 公開KubeMQ REST API
- 50000 – 打開 gRPC端口以進行實施意見-服務器通信
- 9090 – 公開KubeMQ REST網關
注意: 將 your_token部分替換為你的真實KubeMQ令牌。
創建大模型路由服務器
大模型路由將充當客戶端與大模型之間的中介,負責監聽特定渠道的查詢并將其路由至適當的大模型。
server.py
Python
import time
from kubemq.cq import Client, QueryMessageReceived, QueryResponseMessage, QueriesSubscription, CancellationToken
from langchain.chat_models import ChatOpenAI
from langchain.llms import Anthropic
import os
from dotenv import load_dotenv
import threading
load_dotenv()
class LLMRouter:
def __init__(self):
self.openai_llm = ChatOpenAI(
api_key=os.getenv("OPENAI_API_KEY"),
model_name="gpt-3.5-turbo"
)
self.claude_llm = Anthropic(
api_key=os.getenv("ANTHROPIC_API_KEY"),
model="claude-3"
)
self.client = Client(address="localhost:50000")
def handle_openai_query(self, request: QueryMessageReceived):
try:
message = request.body.decode('utf-8')
result = self.openai_llm(message)
response = QueryResponseMessage(
query_received=request,
is_executed=True,
body=result.encode('utf-8')
)
self.client.send_response_message(response)
except Exception as e:
self.client.send_response_message(QueryResponseMessage(
query_received=request,
is_executed=False,
error=str(e)
))
def handle_claude_query(self, request: QueryMessageReceived):
try:
message = request.body.decode('utf-8')
result = self.claude_llm(message)
response = QueryResponseMessage(
query_received=request,
is_executed=True,
body=result.encode('utf-8')
)
self.client.send_response_message(response)
except Exception as e:
self.client.send_response_message(QueryResponseMessage(
query_received=request,
is_executed=False,
error=str(e)
))
def run(self):
def on_error(err: str):
print(f"Error: {err}")
def subscribe_openai():
self.client.subscribe_to_queries(
subscription=QueriesSubscription(
channel="openai_requests",
on_receive_query_callback=self.handle_openai_query,
on_error_callback=on_error,
),
cancel=CancellationToken()
)
def subscribe_claude():
self.client.subscribe_to_queries(
subscription=QueriesSubscription(
channel="claude_requests",
on_receive_query_callback=self.handle_claude_query,
on_error_callback=on_error,
),
cancel=CancellationToken()
)
threading.Thread(target=subscribe_openai).start()
threading.Thread(target=subscribe_claude).start()
print("LLM Router running on channels: openai_requests, claude_requests")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("Shutting down...")
if __name__ == "__main__":
router = LLMRouter()
router.run()
說明:
- 初始化。
A.為API密鑰加載環境變量。
B.初始化OpenAI和Anthropic大模型的客戶端。
C.設置KubeMQ客戶端。
- 處理查詢。
A.handle_openai_query和 handle_claude_query負責解碼傳入消息,將其傳遞給相應大模型,而后發回響應。
B.捕捉錯誤并將 is_executed 標記設置為 False。
- 訂閱。
A.此路由將訂閱兩個小道:openai_requests 和 claude_requests。
B.使用線程并行處理訂閱。
- 運行服務器。
A.run方法啟動訂閱并保持服務器運行,直至中斷。
開發大模型客戶端
客戶端向大模型路由發送查詢,指定要使用的模型。
client.py
Python
from kubemq.cq import Client, QueryMessage
import json
class LLMClient:
def __init__(self, address="localhost:50000"):
self.client = Client(address=address)
def send_message(self, message: str, model: str) -> dict:
channel = f"{model}_requests"
response = self.client.send_query_request(QueryMessage(
channel=channel,
body=message.encode('utf-8'),
timeout_in_seconds=30
))
if response.is_error:
return {"error": response.error}
else:
return {"response": response.body.decode('utf-8')}
if __name__ == "__main__":
client = LLMClient()
models = ["openai", "claude"]
message = input("Enter your message: ")
model = input(f"Choose model ({'/'.join(models)}): ")
if model in models:
response = client.send_message(message, model)
if "error" in response:
print(f"Error: {response['error']}")
else:
print(f"Response: {response['response']}")
else:
print("Invalid model selected")
說明:
- 初始化。
A.設置KubeMQ客戶端。
- 發送消息。
A.send_message 方法根據所選模型構建適當通道。
B.向路由發送查詢消息并等待響應。
C.處理錯誤并解碼響應主體。
- 用戶交互。
A.提示用戶輸入消息并選擇模型。
B.從大模型處輸出響應。
通過REST發送和接收
對于傾向或需要RESTful通信的服務或客戶端,KubeMQ亦可提供REST端點。
通過REST發送請求
端點:
Plain Text
POST http://localhost:9090/send/request
標頭:
Plain Text
Content-Type: application/json
實體:
JSON
{
"RequestTypeData": 2,
"ClientID": "LLMRouter-sender",
"Channel": "openai_requests",
"BodyString": "What is the capital of France?",
"Timeout": 30000
}
負載細節:
- RequestTypeData – 指定請求類型(查詢為2)。
- ClientID – 發送請求的客戶端標識符。
- Channel – 與大模型(openai_requests或claude_requests)對應的通道。
- BodyString – 要發送至大模型的消息。
- Timeout – 等待響應的時間(單位為毫秒)。
接收響應
響應是一個包含大模型輸出或錯誤消息的JSON對象。
總結
在消息代理(KubeMQ)的幫助下,我們建立起可擴展且高效的路由機制,能夠與多個大模型進行交互。此設置允許客戶端無縫向不同模型發送查詢,并可擴展以引入更多模型或功能。
這種方法的好處包括:
- 簡化集成。大家可以將與不同大模型API交互與涉及的復雜性抽象出來,簡化客戶端代碼并降低出錯幾率。
- 多模型支持。有效將請求路由至專門用于不同任務的適當模型。
- 可靠性。確保在大模型繁忙或不可用時,數據不致丟失。
- 冗余。提供后備機制以保持不間斷操作。
- 可擴展性。通過在多個大模型實例間分配請求以應對高流量需求。