如何利用OpenAI、NATS和Streamlight徹底改變實時警報
譯文本文將介紹如何使用Streamlight、NATS和OpenAI這些非??岬墓ぞ撸赑ython中構建一個全棧事件驅動的天氣警報聊天應用程序。該應用程序可以實時收集天氣信息,使用人工智能了解警報標準,并將這些警報發送到用戶界面。
這段內容和代碼示例對于那些希望了解現代實時警報系統如何與大型語言模型(LLM)協調工作以及如何實現的開發人員來說非常有幫助。
人們也可以采用GitHub上的源代碼自己進行嘗試。
幕后的力量
以下了解人工智能天氣警報聊天應用程序是如何工作的,并將原始數據轉換為可操作的警報,實時了解天氣變化。應用程序的核心是一個用Python實現的響應式后端,由NATS提供支持,以確保實時數據處理和消息管理。集成OpenAI的GPT模型,使對話式人工智能能夠理解警報的性質,并響應用戶的查詢。用戶可以使用自然語言指定他們的警報標準, 然后GPT模型將對其進行解釋。
圖1實時警報應用架構
實時數據采集
從后端自各種來源連續異步收集天氣數據開始。應用程序現在使用api.weatherapi.com服務,每10秒實時獲取一次天氣信息。這些數據包括全球各地溫度、濕度、降水等參數。這段代碼異步獲取愛沙尼亞當前的天氣數據,但應用程序可以改進為從用戶輸入動態設置位置:
async def fetch_weather_data():
api_url = f"http://api.weatherapi.com/v1/current.json?key={weather_api_key}&q=estonia"
try:
async with aiohttp.ClientSession() as session:
async with session.get(api_url) as response:
if response.status == 200:
return await response.json()
else:
logging.error(f"Error fetching weather data: HTTP {response.status}")
return None
except Exception as e:
logging.error(f"Error fetching weather data: {e}")
return None
NATS在數據流中的作用
backend.py文件中main()函數中的代碼段演示了NATS的集成,用于驅動消息傳遞、連續的天氣監測和警報。使用NATS.py庫將NATS集成到Python代碼中。首先,在NATs://localhost:4222建立運行在Docker中的NATS服務器的連接。
nats_client = await nats.connect("nats://localhost:4222")
然后,定義一個異步message_handler函數。該函數訂閱并處理聊天主題上從NATS服務器接收到的消息。如果消息以“Set Alert:”開頭(將其附加在前端),它將提取并更新用戶的警報標準。
async def message_handler(msg):
nonlocal user_alert_criteria
data = msg.data.decode()
if data.startswith("Set Alert:"):
user_alert_criteria = data[len("Set Alert:"):].strip()
logging.info(f"User alert criteria updated: {user_alert_criteria}")
await nats_client.subscribe("chat", cb=message_handler)
后端服務集成了天氣API和Open AI Chat Completion API等外部服務。如果同時存在天氣數據和用戶警報標準,該應用程序會為OpenAI的GPT模型構建一個提示,以確定天氣是否符合用戶的標準。該提示要求人工智能根據用戶的標準分析當前天氣,并以“是”或“否”和簡短的天氣摘要做出回應。
一旦人工智能確定傳入的天氣數據符合用戶的警報標準,它就會制作個性化的警報消息,并向NATS服務器上的chat_response主題發布天氣警報,以更新前端應用程序的最新變化。此消息包含用戶友好的通知,旨在通知和建議用戶。例如,它可能會提示,“小心!愛沙尼亞明天會下雨。別忘了帶傘!”
while True:
current_weather = await fetch_weather_data()
if current_weather and user_alert_criteria:
logging.info(f"Current weather data: {current_weather}")
prompt = f"Use the current weather: {current_weather} information and user alert criteria: {user_alert_criteria}. Identify if the weather meets these criteria and return only YES or NO with a short weather temperature info without explaining why."
response_text = await get_openai_response(prompt)
if response_text and "YES" in response_text:
logging.info("Weather conditions met user criteria.")
ai_response = f"Weather alert! Your specified conditions have been met. {response_text}"
await nats_client.publish("chat_response", payload=ai_response.encode())
else:
logging.info("Weather conditions did not meet user criteria.")
else:
logging.info("No current weather data or user alert criteria set.")await asyncio.sleep(10)
實時發送和接收警報
了解一下后端和前端之間的整體通信流程。
- 通過使用Streamlit構建的簡單聊天界面(請參閱frontend.py文件),用戶可以使用自然語言輸入天氣警報標準并提交。
alert_criteria = st.text_input("Set your weather alert criteria", key="alert_criteria", disabled=st.session_state['alert_set'])
- Streamlit前端代碼通過NATS消息傳遞與后端服務交互。它將這些標準發布到聊天主題上的NATS服務器。
def send_message_to_nats_handler(message):
with NATSClient() as client:
client.connect()
client.publish("chat", payload=message.encode())
client.subscribe("chat_response", callback=read_message_from_nats_handler)
client.wait()
if set_alert_btn:
st.session_state['alert_set'] = True
st.success('Alert criteria set')
send_message_to_nats_handler(f"Set Alert: {alert_criteria}")
正如在前一節中看到的,后端服務監聽聊天主題,接收標準,獲取當前天氣數據,并使用人工智能來確定是否應該觸發警報。如果滿足條件,后端服務將向chat_response主題發送警報消息。前端接收此消息并更新用戶界面(UI)以通知用戶。
def read_message_from_nats_handler(msg):
message = msg.payload.decode()
st.session_state['conversation'].append(("AI", message))
st.markdown(f"<span style='color: red;'></span> AI: {message}", unsafe_allow_html=True)
進行嘗試
要詳細探索實時天氣警報聊天應用程序并親自嘗試,可以訪問前面鏈接的GitHub存儲庫。該存儲庫包含所有必要的代碼、詳細的設置說明和幫助入門的附加文檔。在設置完成之后,就可以啟動Streamlit前端和Python后端。設置天氣警報標準,并查看系統如何處理實時天氣數據以了解情況。
圖2警報應用程序的Streamlight UI
建立流處理管道
實時天氣警報聊天應用程序演示了NATS在分布式系統中用于實時消息傳遞的強大用例,允許在面向用戶的前端和數據處理后端之間進行有效的通信。但是,應該考慮幾個關鍵步驟,以確保呈現給用戶的信息是相關的、準確的和可操作的。在應用程序中,只是獲取實時的原始天氣數據,并將其直接發送到OpenAI或前端。有時,需要在數據到達外部服務之前對其進行實時轉換,以便對其進行過濾、豐富、聚合或規范化。需要開始考慮創建具有多個階段的流處理管道。
例如,并非從API獲取的所有數據都與每個用戶相關,可以在初始階段過濾掉不必要的信息。此外,數據可以采用各種格式,特別是如果需要從多個API獲取信息以獲得全面警報,這就需要對這些數據進行規范化。在下一階段,使用額外的場景或原始數據的信息來豐富數據,使其更有用。這可能包括將當前天氣狀況與歷史數據進行比較,以識別異常模式,或者使用另一個外部API添加基于位置的見解,例如針對特定地區天氣狀況的特定建議。在后期階段,可能會匯總每小時的溫度數據,以給出白天的平均溫度或突出顯示白天達到的峰值溫度。
下一個步驟
當涉及到在生產環境中轉換數據、部署、運行和擴展應用程序時,你可能希望使用Python中的專用框架(例如GlassFlow)來構建復雜的流處理管道。GlassFlow為流處理提供了一個完全托管的無服務器基礎設施,不必考慮設置或維護,應用程序可以輕松處理大量數據和用戶請求。它提供了高級狀態管理功能,可以更輕松地跟蹤用戶警報標準和其他應用程序狀態。而應用程序可以根據其用戶群進行擴展,而不會影響性能。
原文標題:Revolutionizing Real-Time Alerts With AI, NATS, and Streamlit,作者:Bobur Umurzokov
鏈接:https://dzone.com/articles/revolutionizing-real-time-alerts-with-ai-nats-and。