FastAPI + RabbitMQ:構(gòu)建高性能異步任務(wù)系統(tǒng)
在現(xiàn)代微服務(wù)架構(gòu)中,任務(wù)解耦 和 異步處理 是系統(tǒng)擴(kuò)展能力的關(guān)鍵。本文將帶你使用 FastAPI + RabbitMQ 構(gòu)建一個(gè)簡單的 異步任務(wù)隊(duì)列,模擬一個(gè)耗時(shí)的任務(wù)(如發(fā)送郵件),由后臺獨(dú)立 worker 消費(fèi)執(zhí)行。
技術(shù)棧
- FastAPI(主服務(wù),負(fù)責(zé)接收請求)
- RabbitMQ(消息隊(duì)列)
- aio-pika(Python 異步 RabbitMQ 客戶端)
系統(tǒng)架構(gòu)簡圖
用戶請求 --> FastAPI(推送任務(wù)) --> RabbitMQ
↓
Worker(消費(fèi)執(zhí)行)
安裝依賴
pip install fastapi uvicorn aio-pika pydantic
RabbitMQ 環(huán)境準(zhǔn)備
可以用 Docker 啟動 RabbitMQ 服務(wù):
docker run -d --hostname rabbit --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
- 5672:RabbitMQ 消息端口
- 15672:Web 管理后臺(默認(rèn)賬號密碼都是 guest)
FastAPI 應(yīng)用(producer)
# app/main.py
from fastapi import FastAPI
from pydantic import BaseModel
import asyncio
import aio_pika
app = FastAPI()
class TaskRequest(BaseModel):
user_email: str
message: str
RABBITMQ_URL = "amqp://guest:guest@localhost/"
@app.on_event("startup")
async def startup():
app.state.rabbit_connection = await aio_pika.connect_robust(RABBITMQ_URL)
@app.on_event("shutdown")
async def shutdown():
await app.state.rabbit_connection.close()
@app.post("/send-task")
async def send_task(task: TaskRequest):
channel = await app.state.rabbit_connection.channel()
queue = await channel.declare_queue("task_queue", durable=True)
# 構(gòu)造消息
msg_body = task.json().encode()
message = aio_pika.Message(body=msg_body, delivery_mode=aio_pika.DeliveryMode.PERSISTENT)
await channel.default_exchange.publish(message, routing_key="task_queue")
return {"status": "success", "msg": "任務(wù)已入隊(duì)"}
消費(fèi)者 Worker(consumer)
# worker.py
import asyncio
import json
import aio_pika
RABBITMQ_URL = "amqp://guest:guest@localhost/"
async def main():
connection = await aio_pika.connect_robust(RABBITMQ_URL)
channel = await connection.channel()
await channel.set_qos(prefetch_count=1)
queue = await channel.declare_queue("task_queue", durable=True)
async def on_message(message: aio_pika.IncomingMessage):
async with message.process():
data = json.loads(message.body)
print(f"?? 收到任務(wù):發(fā)送郵件至 {data['user_email']},內(nèi)容:{data['message']}")
await asyncio.sleep(2) # 模擬耗時(shí)任務(wù)
print("? 郵件發(fā)送完成")
print("?? Worker 正在等待任務(wù)...")
await queue.consume(on_message)
if __name__ == "__main__":
asyncio.run(main())
啟動服務(wù)
啟動 FastAPI:
uvicorn app.main:app --reload
啟動 worker:
python worker.py
測試
發(fā)送 POST 請求到 /send-task:
POST http://localhost:8000/send-task
{
"user_email": "test@example.com",
"message": "歡迎使用 FastAPI + RabbitMQ!"
}
終端會看到 worker 消費(fèi)消息并執(zhí)行任務(wù)的輸出。
總結(jié)
通過 FastAPI + RabbitMQ,可以輕松實(shí)現(xiàn)異步任務(wù)分發(fā)系統(tǒng):
- 主服務(wù)響應(yīng)快速,避免卡頓
- 異步 worker 后臺處理,任務(wù)解耦
- RabbitMQ 提供可靠、高可用的消息傳遞機(jī)制