FastAPI + APScheduler:異步定時(shí)任務(wù)調(diào)度器完全指南(支持持久化)
在實(shí)際開發(fā)中,我們常常會(huì)遇到需要周期性執(zhí)行任務(wù)的場景,例如:
- 每晚 00:00 清理無效數(shù)據(jù)
- 每分鐘拉取遠(yuǎn)程接口狀態(tài)
- 每小時(shí)生成業(yè)務(wù)報(bào)表
在 FastAPI 項(xiàng)目中,我們可以借助 APScheduler 實(shí)現(xiàn)功能強(qiáng)大的 異步定時(shí)任務(wù)調(diào)度器。本文將帶你一步步封裝支持以下功能的調(diào)度模塊:
- 同時(shí)支持同步函數(shù)和異步函數(shù)
- 支持 cron、interval 等多種調(diào)度方式
- 支持任務(wù)持久化(SQLite)
- 支持動(dòng)態(tài)添加、移除、暫停任務(wù)
- 支持任務(wù)狀態(tài)查看
項(xiàng)目結(jié)構(gòu)
fastapi_scheduler/
├── main.py
├── scheduler/
│ ├── __init__.py
│ ├── job_store.py # 數(shù)據(jù)庫持久化
│ ├── tasks.py # 所有任務(wù)函數(shù)
│ └── scheduler.py # APScheduler 封裝
安裝依賴
pip install fastapi apscheduler aiosqlite
aiosqlite 用于異步訪問 SQLite,APScheduler 可用 SQLite 持久化任務(wù)。
(1) scheduler/job_store.py:配置數(shù)據(jù)庫持久化
通過配置 SQLAlchemyJobStore,我們可以讓任務(wù):
- 重啟后自動(dòng)恢復(fù)(斷點(diǎn)續(xù)跑)
- 不怕服務(wù)異常重啟
- 管理定時(shí)任務(wù)變得可靠
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
jobstores = {
"default": SQLAlchemyJobStore(url="sqlite:///jobs.sqlite")
}
你也可以換成 PostgreSQL、MySQL:
url="mysql+pymysql://user:pass@localhost/dbname"
(2) scheduler/tasks.py:定義定時(shí)任務(wù)函數(shù)
import asyncio
from datetime import datetime
def print_sync():
print(f"[{datetime.now()}] ? 同步任務(wù)執(zhí)行")
async def print_async():
print(f"[{datetime.now()}] ?? 異步任務(wù)開始")
await asyncio.sleep(1)
print(f"[{datetime.now()}] ? 異步任務(wù)完成")
(3) scheduler/scheduler.py:封裝 APScheduler 管理器
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.job import Job
from scheduler.job_store import jobstores
scheduler = AsyncIOScheduler(jobstores=jobstores)
defstart():
print("?? 定時(shí)任務(wù)調(diào)度器啟動(dòng)")
scheduler.start()
defadd_interval_job(func, seconds: int, job_id: str, **kwargs) -> Job:
return scheduler.add_job(func, IntervalTrigger(seconds=seconds), id=job_id, **kwargs)
defadd_cron_job(func, cron_expr: str, job_id: str, **kwargs) -> Job:
trigger = CronTrigger.from_crontab(cron_expr)
return scheduler.add_job(func, trigger, id=job_id, **kwargs)
defremove_job(job_id: str):
scheduler.remove_job(job_id)
defpause_job(job_id: str):
scheduler.pause_job(job_id)
defresume_job(job_id: str):
scheduler.resume_job(job_id)
defget_jobs():
return scheduler.get_jobs()
(4) main.py:啟動(dòng) FastAPI 和定時(shí)任務(wù)
from fastapi import FastAPI
from scheduler.scheduler import start, add_interval_job, add_cron_job
from scheduler.tasks import print_sync, print_async
app = FastAPI()
@app.on_event("startup")
asyncdefstartup_event():
start()
add_interval_job(print_sync, seconds=30, job_id="job_sync")
add_interval_job(print_async, seconds=45, job_id="job_async")
add_cron_job(print_sync, cron_expr="*/1 * * * *", job_id="job_cron")
@app.get("/")
asyncdefindex():
return {"message": "?? FastAPI + APScheduler 啟動(dòng)成功!"}
cron 表達(dá)式語法速查表
表達(dá)式位置 | 含義 | 示例 |
分鐘 | 0–59 |
(每5分鐘) |
小時(shí) | 0–23 |
(每天0點(diǎn)) |
日期 | 1–31 |
(每月1號(hào)) |
月份 | 1–12 |
(每月) |
星期 | 0–6(周日為0) |
(每周日) |
?? 示例:
- */10 * * * * → 每 10 分鐘執(zhí)行一次
- 0 0 * * * → 每天凌晨 0 點(diǎn)執(zhí)行
- 0 9 * * 1 → 每周一早上 9 點(diǎn)執(zhí)行
總結(jié)
我們完成了一個(gè)完整的 異步任務(wù)調(diào)度器封裝方案,支持:
- 同步 / 異步任務(wù)統(tǒng)一管理
- 靈活使用 cron 或 interval 觸發(fā)器
- 任務(wù)持久化
- 快速擴(kuò)展,方便維護(hù)