成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

實戰攻略:使用KubeMQ簡化多LLM集成流程

譯文 精選
人工智能
將多個大語言模型集成至應用程序當中往往是項艱巨的挑戰,各類不同API及通信協議的協同處理,以及如何確保請求路由的復雜性難題往往令人望而生畏。

譯者 | 核子可樂

審校 | 重樓

將多個大語言模型集成至應用程序當中往往是項艱巨的挑戰,各類不同API及通信協議的協同處理,以及如何確保請求路由的復雜性難題往往令人望而生畏。

好在可以使用消息代理與路由機制更優雅地解決此類問題,在解決痛點的同時實現多個關鍵優勢。

本文將向大家介紹具體操作步驟。這里以KubeMQ為例,配合代碼示例來指導大家逐步建立一套可與OpenAI及Anthropic Claude交互的路由體系。

使用消息代理作為大模型路由工具的主要優勢

1. 簡化集成

通過使用消息代理作為路由機制,我們可以將不同大模型API交互所涉及的復雜性抽象出來,從而簡化客戶端代碼并降低出錯幾率。

2. 多模型用例

消息代理能夠實現多模型或專門用于不同任務的模型間的通信(如一個模型用于摘要,另一模型用于情緒分析)。其可以確保請求被有效路由至適當模型,使得應用程序能夠利用各模型的優勢且無需額外開銷。

3. 批處理與大規模推理

對于需要批處理或大規模推理任務的應用程序,消息代理通過在大模型繁忙或不可用時建立請求隊列,從而實現異步處理。這將確保不會丟失任何數據或請求,即使是在繁重的工作負載下也能提供可靠的處理響應。

4. 冗余與回退保證

對于特別關注正常運行時間的用例,消息代理可確保無縫回退至替代環境。例如,如果與提供OpenAI模型的某家云服務商發生連接失敗,KubeMQ可自動切換至另一服務商。這樣的冗余設計保證AI不間斷操作,有助于增強服務可靠性與客戶滿意度。

5. 處理高流量應用程序

消息代理能夠將傳入的請求分發至多個大模型實例或副本,防止過載并確保平衡運行。這種負載均衡設計對于高流量應用程序至關重要,可使其在不影響性能的前提下有效擴展。

使用KubeMQ建立大模型路由機制:集成OpenAI與Claude

現在,我們將分步了解如何使用KubeMQ設置能夠與OpenAI和Anthropic Claude交互的路由機制。

全部示例代碼均保存在KubeMQ的GitHub repo當中(https://github.com/kubemq-io/kubemq-llm-router)。

準備工作

在開始之前,請確保已安裝以下內容:

  • Python 3.7或更高版本。
  • 本地安裝Docker
  • 擁有有效的OpenAIAnthropic API密鑰。
  • KubeMQ令牌(可從KubeMQ官網處獲取)。
  • kubemq-cq Python包:
Plain Text
pip install kubemq-cq
  • .env文件中包含的AIP密鑰:
Plain Text
OPENAI_API_KEY=your_openai_api_key
ANTHROPIC_API_KEY=your_anthropic_api_key

設置KubeMQ

首先,我們需要確保KubeMQ能夠正常運行。這里使用Docker進行部署:

Shell
docker run -d --rm \
 -p 8080:8080 \
 -p 50000:50000 \
 -p 9090:9090 \
 -e KUBEMQ_TOKEN="your_token" \
 kubemq/kubemq-community:latest

端口說明:

  • 8080公開KubeMQ REST API
  • 50000打開 gRPC端口以進行實施意見-服務器通信
  • 9090公開KubeMQ REST網關

注意: your_token部分替換為的真實KubeMQ令牌。

創建大模型路由服務器

大模型路由將充當客戶端與大模型之間的中介,負責監聽特定渠道的查詢并將其路由至適當的大模型。

server.py

Python
import time
from kubemq.cq import Client, QueryMessageReceived, QueryResponseMessage, QueriesSubscription, CancellationToken
from langchain.chat_models import ChatOpenAI
from langchain.llms import Anthropic
import os
from dotenv import load_dotenv
import threading

load_dotenv()

class LLMRouter:
 def __init__(self):
 self.openai_llm = ChatOpenAI(
 api_key=os.getenv("OPENAI_API_KEY"),
 model_name="gpt-3.5-turbo"
 )
 self.claude_llm = Anthropic(
 api_key=os.getenv("ANTHROPIC_API_KEY"),
 model="claude-3"
 )
 self.client = Client(address="localhost:50000")

 def handle_openai_query(self, request: QueryMessageReceived):
 try:
 message = request.body.decode('utf-8')
 result = self.openai_llm(message)
 response = QueryResponseMessage(
 query_received=request,
 is_executed=True,
 body=result.encode('utf-8')
 )
 self.client.send_response_message(response)
 except Exception as e:
 self.client.send_response_message(QueryResponseMessage(
 query_received=request,
 is_executed=False,
 error=str(e)
 ))

 def handle_claude_query(self, request: QueryMessageReceived):
 try:
 message = request.body.decode('utf-8')
 result = self.claude_llm(message)
 response = QueryResponseMessage(
 query_received=request,
 is_executed=True,
 body=result.encode('utf-8')
 )
 self.client.send_response_message(response)
 except Exception as e:
 self.client.send_response_message(QueryResponseMessage(
 query_received=request,
 is_executed=False,
 error=str(e)
 ))

 def run(self):
 def on_error(err: str):
 print(f"Error: {err}")

 def subscribe_openai():
 self.client.subscribe_to_queries(
 subscription=QueriesSubscription(
 channel="openai_requests",
 on_receive_query_callback=self.handle_openai_query,
 on_error_callback=on_error,
 ),
 cancel=CancellationToken()
 )

 def subscribe_claude():
 self.client.subscribe_to_queries(
 subscription=QueriesSubscription(
 channel="claude_requests",
 on_receive_query_callback=self.handle_claude_query,
 on_error_callback=on_error,
 ),
 cancel=CancellationToken()
 )

 threading.Thread(target=subscribe_openai).start()
 threading.Thread(target=subscribe_claude).start()

 print("LLM Router running on channels: openai_requests, claude_requests")
 try:
 while True:
 time.sleep(1)
 except KeyboardInterrupt:
 print("Shutting down...")

if __name__ == "__main__":
 router = LLMRouter()
 router.run()

說明:

  • 初始化。

A.為API密鑰加載環境變量。

B.初始化OpenAI和Anthropic大模型的客戶端。

C.設置KubeMQ客戶端。

  • 處理查詢。

A.handle_openai_query和 handle_claude_query負責解碼傳入消息,將其傳遞給相應大模型,而后發回響應。

B.捕捉錯誤并將 is_executed 標記設置為 False

  • 訂閱。

A.此路由將訂閱兩個小道:openai_requests 和 claude_requests。

B.使用線程并行處理訂閱。

  • 運行服務器。

A.run方法啟動訂閱并保持服務器運行,直至中斷。

開發大模型客戶端

客戶端向大模型路由發送查詢,指定要使用的模型。

client.py

Python
from kubemq.cq import Client, QueryMessage
import json

class LLMClient:
 def __init__(self, address="localhost:50000"):
 self.client = Client(address=address)

 def send_message(self, message: str, model: str) -> dict:
 channel = f"{model}_requests"
 response = self.client.send_query_request(QueryMessage(
 channel=channel,
 body=message.encode('utf-8'),
 timeout_in_seconds=30
 ))
 if response.is_error:
 return {"error": response.error}
 else:
 return {"response": response.body.decode('utf-8')}

if __name__ == "__main__":
 client = LLMClient()
 models = ["openai", "claude"]
 message = input("Enter your message: ")
 model = input(f"Choose model ({'/'.join(models)}): ")
 if model in models:
 response = client.send_message(message, model)
 if "error" in response:
 print(f"Error: {response['error']}")
 else:
 print(f"Response: {response['response']}")
 else:
 print("Invalid model selected")

說明:

  • 初始化。

A.設置KubeMQ客戶端。

  • 發送消息。

A.send_message 方法根據所選模型構建適當通道。

B.向路由發送查詢消息并等待響應。

C.處理錯誤并解碼響應主體。

  • 用戶交互。

A.提示用戶輸入消息并選擇模型。

B.從大模型處輸出響應。

通過REST發送和接收

對于傾向或需要RESTful通信的服務或客戶端,KubeMQ亦可提供REST端點。

通過REST發送請求

端點:

Plain Text
POST http://localhost:9090/send/request

標頭:

Plain Text
Content-Type: application/json

實體:

JSON
{
 "RequestTypeData": 2,
 "ClientID": "LLMRouter-sender",
 "Channel": "openai_requests",
 "BodyString": "What is the capital of France?",
 "Timeout": 30000
}

負載細節:

  • RequestTypeData指定請求類型(查詢為2)。
  • ClientID發送請求的客戶端標識符。
  • Channel與大模型(openai_requestsclaude_requests)對應的通道。
  • BodyString要發送至大模型的消息。
  • Timeout等待響應的時間(單位為毫秒)。

接收響應

響應是一個包含大模型輸出或錯誤消息的JSON對象。

總結

在消息代理(KubeMQ)的幫助下,我們建立起可擴展且高效的路由機制,能夠與多個大模型進行交互。此設置允許客戶端無縫向不同模型發送查詢,并可擴展以引入更多模型或功能。

這種方法的好處包括:

  1. 簡化集成。大家可以將與不同大模型API交互與涉及的復雜性抽象出來,簡化客戶端代碼并降低出錯幾率。
  2. 多模型支持。有效將請求路由至專門用于不同任務的適當模型。
  3. 可靠性。確保在大模型繁忙或不可用時,數據不致丟失。
  4. 冗余。提供后備機制以保持不間斷操作。
  5. 可擴展性。通過在多個大模型實例間分配請求以應對高流量需求。
原文標題:Simplifying Multi-LLM Integration With KubeMQ,作者:John Vester


責任編輯:華軒 來源: 51CTO
相關推薦

2010-05-10 16:01:54

2024-02-21 07:48:37

KubeSlice云原生Kubernetes

2024-01-11 16:24:12

人工智能RAG

2023-11-30 15:56:54

大型語言模型人工智能

2024-12-17 08:05:34

大型語言模型MetaAILLM

2025-01-08 08:21:16

2025-06-03 08:40:00

LM StudioLLM人工智能

2019-09-29 15:21:18

SOAR工具信息安全網絡安全

2024-12-16 07:00:00

2009-02-16 09:45:00

網絡設備管理系統

2025-06-13 02:10:00

2016-01-27 09:39:13

JoobyJava Web流程

2015-02-11 09:15:46

云部署嵌套虛擬化PaaS

2024-07-02 09:10:36

Nginx配置主機

2021-04-23 20:59:02

ThreadLocal內存

2018-01-29 12:39:56

數據庫MongoDB集群

2013-07-15 15:23:03

iOS多線程GCD

2010-08-23 10:59:02

SAP金融服務CRM

2022-02-21 10:29:26

物聯網招聘IOT

2011-01-24 13:20:49

點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 精品亚洲一区二区三区 | 成人精品一区亚洲午夜久久久 | h漫在线观看 | 精品国产乱码久久久久久图片 | 不卡一区| 国产亚洲精品美女久久久久久久久久 | 国产在线播 | 偷拍自拍网 | 麻豆视频在线免费观看 | 99福利视频| 精品亚洲一区二区三区四区五区 | 日日操天天射 | 国产精品一区二区不卡 | 日批日韩在线观看 | 欧美一级免费 | 国产99久久精品一区二区永久免费 | 欧美日一区二区 | 天天色天天色 | a黄视频 | 亚洲精品电影在线 | 麻豆av电影网| www亚洲成人 | 在线日韩福利 | 亚洲国产成人精品女人久久久 | 日韩欧美在线一区 | 国产a区| 蜜桃毛片 | 中文在线视频 | 精品视频一区二区三区 | 亚洲国产精品一区二区久久 | 精品国产欧美 | 国产成人综合一区二区三区 | 国产激情99 | 欧美成人精品在线观看 | 欧美激情va永久在线播放 | 国产成人免费一区二区60岁 | 日韩欧美国产一区二区 | 亚洲精品一区中文字幕乱码 | 亚洲成人在线免费 | 一区二区三区在线 | 美女久久视频 |