成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

FastAPI + APScheduler:異步定時(shí)任務(wù)調(diào)度器完全指南(支持持久化)

開發(fā)
在 FastAPI 項(xiàng)目中,我們可以借助 APScheduler 實(shí)現(xiàn)功能強(qiáng)大的 異步定時(shí)任務(wù)調(diào)度器。本文將帶你一步步封裝支持以下功能的調(diào)度模塊。

在實(shí)際開發(fā)中,我們常常會(huì)遇到需要周期性執(zhí)行任務(wù)的場景,例如:

  • 每晚 00:00 清理無效數(shù)據(jù)
  • 每分鐘拉取遠(yuǎn)程接口狀態(tài)
  • 每小時(shí)生成業(yè)務(wù)報(bào)表

在 FastAPI 項(xiàng)目中,我們可以借助 APScheduler 實(shí)現(xiàn)功能強(qiáng)大的 異步定時(shí)任務(wù)調(diào)度器。本文將帶你一步步封裝支持以下功能的調(diào)度模塊:

  • 同時(shí)支持同步函數(shù)和異步函數(shù)
  • 支持 cron、interval 等多種調(diào)度方式
  • 支持任務(wù)持久化(SQLite)
  • 支持動(dòng)態(tài)添加、移除、暫停任務(wù)
  • 支持任務(wù)狀態(tài)查看

項(xiàng)目結(jié)構(gòu)

fastapi_scheduler/
├── main.py
├── scheduler/
│   ├── __init__.py
│   ├── job_store.py       # 數(shù)據(jù)庫持久化
│   ├── tasks.py           # 所有任務(wù)函數(shù)
│   └── scheduler.py       # APScheduler 封裝

安裝依賴

pip install fastapi apscheduler aiosqlite

aiosqlite 用于異步訪問 SQLite,APScheduler 可用 SQLite 持久化任務(wù)。

(1) scheduler/job_store.py:配置數(shù)據(jù)庫持久化

通過配置 SQLAlchemyJobStore,我們可以讓任務(wù):

  • 重啟后自動(dòng)恢復(fù)(斷點(diǎn)續(xù)跑)
  • 不怕服務(wù)異常重啟
  • 管理定時(shí)任務(wù)變得可靠
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore

jobstores = {
    "default": SQLAlchemyJobStore(url="sqlite:///jobs.sqlite")
}

你也可以換成 PostgreSQL、MySQL:

url="mysql+pymysql://user:pass@localhost/dbname"

(2) scheduler/tasks.py:定義定時(shí)任務(wù)函數(shù)

import asyncio
from datetime import datetime

def print_sync():
    print(f"[{datetime.now()}] ? 同步任務(wù)執(zhí)行")

async def print_async():
    print(f"[{datetime.now()}] ?? 異步任務(wù)開始")
    await asyncio.sleep(1)
    print(f"[{datetime.now()}] ? 異步任務(wù)完成")

(3) scheduler/scheduler.py:封裝 APScheduler 管理器

from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.job import Job
from scheduler.job_store import jobstores

scheduler = AsyncIOScheduler(jobstores=jobstores)

defstart():
    print("?? 定時(shí)任務(wù)調(diào)度器啟動(dòng)")
    scheduler.start()

defadd_interval_job(func, seconds: int, job_id: str, **kwargs) -> Job:
    return scheduler.add_job(func, IntervalTrigger(seconds=seconds), id=job_id, **kwargs)

defadd_cron_job(func, cron_expr: str, job_id: str, **kwargs) -> Job:
    trigger = CronTrigger.from_crontab(cron_expr)
    return scheduler.add_job(func, trigger, id=job_id, **kwargs)

defremove_job(job_id: str):
    scheduler.remove_job(job_id)

defpause_job(job_id: str):
    scheduler.pause_job(job_id)

defresume_job(job_id: str):
    scheduler.resume_job(job_id)

defget_jobs():
    return scheduler.get_jobs()

(4) main.py:啟動(dòng) FastAPI 和定時(shí)任務(wù)

from fastapi import FastAPI
from scheduler.scheduler import start, add_interval_job, add_cron_job
from scheduler.tasks import print_sync, print_async

app = FastAPI()

@app.on_event("startup")
asyncdefstartup_event():
    start()
    add_interval_job(print_sync, seconds=30, job_id="job_sync")
    add_interval_job(print_async, seconds=45, job_id="job_async")
    add_cron_job(print_sync, cron_expr="*/1 * * * *", job_id="job_cron")

@app.get("/")
asyncdefindex():
    return {"message": "?? FastAPI + APScheduler 啟動(dòng)成功!"}

cron 表達(dá)式語法速查表

表達(dá)式位置

含義

示例

分鐘

0–59

*/5

(每5分鐘)

小時(shí)

0–23

0

(每天0點(diǎn))

日期

1–31

1

(每月1號(hào))

月份

1–12

*

(每月)

星期

0–6(周日為0)

0

(每周日)

?? 示例:

  • */10 * * * * → 每 10 分鐘執(zhí)行一次
  • 0 0 * * * → 每天凌晨 0 點(diǎn)執(zhí)行
  • 0 9 * * 1 → 每周一早上 9 點(diǎn)執(zhí)行

總結(jié)

我們完成了一個(gè)完整的 異步任務(wù)調(diào)度器封裝方案,支持:

  • 同步 / 異步任務(wù)統(tǒng)一管理
  • 靈活使用 cron 或 interval 觸發(fā)器
  • 任務(wù)持久化
  • 快速擴(kuò)展,方便維護(hù)
責(zé)任編輯:趙寧寧 來源: Ssoul肥魚
相關(guān)推薦

2023-12-19 08:09:06

Python定時(shí)任務(wù)Cron表達(dá)式

2020-04-01 16:10:02

PythonAPScheduler調(diào)度

2024-05-13 09:49:30

.NETQuartz庫Cron表達(dá)式

2023-09-26 11:34:56

Python

2022-04-11 15:56:51

Golang代碼框架

2021-06-28 06:00:11

systemd定時(shí)器系統(tǒng)運(yùn)維

2023-11-16 09:30:27

系統(tǒng)任務(wù)

2023-10-28 09:05:38

2024-09-09 08:11:12

2023-06-29 07:55:52

Quartz.Net開源

2023-08-08 08:35:28

web框架Hosting模塊

2023-12-11 09:50:35

Linux定時(shí)器

2009-10-28 10:05:29

Ubuntucrontab定時(shí)任務(wù)

2012-02-07 13:31:14

SpringJava

2010-03-10 15:47:58

crontab定時(shí)任務(wù)

2024-03-12 11:39:30

Python開發(fā)

2023-10-06 12:15:02

2025-06-03 08:15:00

微服務(wù)架構(gòu)異步任務(wù)隊(duì)列

2021-02-02 08:03:51

Linux圖形工具

2024-11-04 16:01:01

點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)

主站蜘蛛池模板: 亚洲欧美精品在线 | 亚洲免费观看视频 | 国产一区二区在线播放 | 日韩欧美中文字幕在线观看 | 国产精品国产a | 午夜电影网| 国产精品2 | 久久成人综合 | 国产综合久久 | av网站免费观看 | 亚洲第一av | 日本精品一区二区三区在线观看视频 | 久久夜视频 | 久久久网 | 国产情侣啪啪 | 精品日本中文字幕 | 播放一级黄色片 | 色婷婷综合久久久中字幕精品久久 | 色综合成人网 | 日本在线视频中文字幕 | 一级黄色片在线看 | 久久人人爽人人爽人人片av免费 | 天天射夜夜操 | 欧美嘿咻| 国产真实乱对白精彩久久小说 | 国产视频1 | 精品一二三区 | 成人免费视频 | 欧美日韩福利视频 | 欧洲亚洲一区 | 一区二区成人在线 | 视频一区二区在线观看 | 欧美色综合一区二区三区 | 精品视频一区二区 | 色片在线观看 | 99久久婷婷国产精品综合 | 成人影院在线视频 | 99pao成人国产永久免费视频 | 国产精品综合久久 | 久久99久久99精品免视看婷婷 | 视频一区二区在线观看 |