成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

分布式任務(wù)隊(duì)列 Celery 的實(shí)踐

開發(fā) 前端 分布式
筆者在近期工作中有接觸到 Celery,這是一個(gè)開源的分布式任務(wù)隊(duì)列(Distributed Task Queue),在 Github 上現(xiàn)有 18k star,主要可以用于實(shí)現(xiàn)應(yīng)用中的異步任務(wù)和定時(shí)任務(wù),雖然是用 Python 編寫,但協(xié)議可以用任何語(yǔ)言實(shí)現(xiàn),現(xiàn)已有 gocelery、nodecelery 和 celery-php 等。

[[432209]]

筆者在近期工作中有接觸到 Celery,這是一個(gè)開源的分布式任務(wù)隊(duì)列(Distributed Task Queue),在 Github 上現(xiàn)有 18k star,主要可以用于實(shí)現(xiàn)應(yīng)用中的異步任務(wù)和定時(shí)任務(wù),雖然是用 Python 編寫,但協(xié)議可以用任何語(yǔ)言實(shí)現(xiàn),現(xiàn)已有 gocelery、nodecelery 和 celery-php 等。

筆者寫下此文總結(jié)對(duì) Celery 的了解和在工作中的使用。本文的大概內(nèi)容如下:

  • 任務(wù)隊(duì)列是什么;
  • Celery 做了什么;
  • Celery 在工作中的實(shí)踐。

任務(wù)隊(duì)列是什么

“消息隊(duì)列(Message Queue)”,后端同學(xué)應(yīng)該都有了解,常見的有 RabbitMQ、RocketMQ、Kafka。而“任務(wù)隊(duì)列(Task Queue)”,筆者在接觸 Celery 之前是沒有聽過的。任務(wù)隊(duì)列是什么,而任務(wù)隊(duì)列和消息隊(duì)列,這兩者之間有何關(guān)系。帶著問題,先看看 Celery 的架構(gòu):

Celery

在 Celery 的架構(gòu)中,可看出由多臺(tái) Server 發(fā)起異步任務(wù)(Async Task),發(fā)送任務(wù)到 Broker 的隊(duì)列中,其中的 Celery Beat 進(jìn)程可負(fù)責(zé)發(fā)起定時(shí)任務(wù)。當(dāng) Task 到達(dá) Broker 后,會(huì)將其分發(fā)給相應(yīng)的 Celery Worker 進(jìn)行處理。當(dāng) Task 處理完成后,其結(jié)果存儲(chǔ)至 Backend。

在上述過程中的 Broker 和 Backend,Celery 沒有實(shí)現(xiàn),而是使用了現(xiàn)有開源實(shí)現(xiàn),例如 RabbitMQ 作為 Broker 提供消息隊(duì)列服務(wù),Redis 作為 Backend 提供結(jié)果存儲(chǔ)服務(wù)。Celery 就像是抽象了消息隊(duì)列架構(gòu)中 Producer、Consumer 的實(shí)現(xiàn),將消息隊(duì)列中基本單位“消息”抽象成了任務(wù)隊(duì)列中的“任務(wù)”,并將異步、定時(shí)任務(wù)的發(fā)起和結(jié)果存儲(chǔ)等操作進(jìn)行了封裝,讓開發(fā)者可以忽略 AMQP、RabbitMQ 等實(shí)現(xiàn)細(xì)節(jié),為開發(fā)帶來便利。

綜上所述,Celery 作為任務(wù)隊(duì)列是基于消息隊(duì)列的進(jìn)一步封裝,其實(shí)現(xiàn)依賴消息隊(duì)列。

接下來,通過一個(gè)簡(jiǎn)單的應(yīng)用來具體了解 Celery 做了什么。

Celery 做了什么

在應(yīng)用開發(fā)中,為了保證響應(yīng)速度,耗時(shí)且不影響流程的操作通常被做異步處理。例如在用戶注冊(cè)的處理過程中,通常會(huì)異步發(fā)送郵件通知用戶,下面看看 Celery 是如何實(shí)現(xiàn)該異步操作。

在 task.py 中聲明了發(fā)送郵件的方法 send_mail,并為其加上 Celery 提供的 @app.task 裝飾器。通過該裝飾器,可以將 send_mail 函數(shù)變成一個(gè) celery.app.task:Task 實(shí)例對(duì)象。而該 Task 實(shí)例可提供了兩個(gè)核心功能:

  • 將消息發(fā)送給隊(duì)列;
  • 聲明 Worker 接收到消息后需要執(zhí)行的具體函數(shù)。
  1. from celery import Celery 
  2.  
  3. app = Celery('tasks', broker='amqp://guest@localhost//'
  4.  
  5. @app.task 
  6. def send_mail(email): 
  7.     print("send mail to ", email) 
  8.     import time 
  9.     time.sleep(5) 
  10.     return "success" 

Task 已經(jīng)定義完成,若要發(fā)起異步任務(wù),可通過調(diào)用 Task 的 delay 方法,該方法會(huì)將消息發(fā)送至隊(duì)列,例如在用戶注冊(cè)完成時(shí),發(fā)起發(fā)郵件的異步任務(wù):

  1. user.py 
  2. from tasks import send_mail 
  3.  
  4. def register(): 
  5.   print("1. 插入記錄到數(shù)據(jù)庫(kù)"
  6.   print("2. 通過celery異步發(fā)郵件"
  7.   send_mail.delay("chaycao@gmail.com"
  8.   print("3. 告訴用戶注冊(cè)成功"
  9.  
  10. if __name__ =='__main__'
  11.     register() 

運(yùn)行以上程序后,消息已經(jīng)發(fā)送至 RabbitMQ 的隊(duì)列中,可觀察到其消息格式如下:

Task in RabbitMQ

可看出 Celery 封裝后的消息包含了 task 標(biāo)識(shí)和運(yùn)行參數(shù)等內(nèi)容。

接著,啟動(dòng) Worker 消費(fèi) RabbitMQ 中的消息:

  1. celery -A tasks worker --loglevel=info 

Worker 啟動(dòng)后,可以看到下面打印信息:

Worker Start

首先是 Worker 的配置信息,然后是 Worker 所執(zhí)行的 Task 列表,接著是從 RabbitMQ 中成功獲取消息并執(zhí)行相應(yīng)的 Task。

通過以上示例,可以進(jìn)一步明白 Celery 作為任務(wù)隊(duì)列框架所做的工作,而“分布式任務(wù)隊(duì)列”中的”分布式“指的則是 Producer、Consumer 可以有多個(gè),即多個(gè)進(jìn)程向 Broker 發(fā)送任務(wù),多個(gè) Worker 從 Broker 中獲取 Task 并執(zhí)行。

以上只是一個(gè)簡(jiǎn)單的示例,接著再看下筆者在工作中所接觸到的關(guān)于 Celery 使用的一些實(shí)踐經(jīng)驗(yàn)。

Celery 在工作中的實(shí)踐

根據(jù)業(yè)務(wù)場(chǎng)景劃分隊(duì)列

在筆者所工作的項(xiàng)目中,Celery 用于處理下單、解析軌跡、推送上游等異步任務(wù)和定時(shí)任務(wù)。根據(jù)每個(gè) Task 的業(yè)務(wù)場(chǎng)景,可為其指定對(duì)應(yīng)的隊(duì)列,例如:

  1. DEFAULT_CELERY_ROUTES = { 
  2.  'celery_task.pending_create': {'queue''create'}, 
  3.  'celery_task.multi_create': {'queue''create'}, 
  4.  'celery_task.pull_tracking': {'queue''pull'}, 
  5.  'celery_task.pull_branch': {'queue''pull'}, 
  6.  'celery_task.push_tracking': {'queue''push'}, 
  7.  'celery_task.push_weight': {'queue''push'}, 
  8.  
  9. CELERY_ROUTES = { 
  10.  DEFAULT_CELERY_ROUTES 

根據(jù)業(yè)務(wù)場(chǎng)景,在 DEFAULT_CELERY_ROUTES 配置中指定 6 個(gè) Task 對(duì)應(yīng)的 Queue,共有 3 個(gè)隊(duì)列 create、pull、push,并將該路由規(guī)則加入到 CELERY_ROUTES 中以生效。這樣設(shè)計(jì)的目的是為了不同場(chǎng)景彼此之間互不影響,例如解析任務(wù)阻塞不應(yīng)該影響下單任務(wù)。

進(jìn)一步劃分隊(duì)列

在根據(jù)業(yè)務(wù)場(chǎng)景粗略劃分后,對(duì)于某個(gè)場(chǎng)景,可能需要更細(xì)致的劃分,例如在向上游推送時(shí),為了避免一個(gè)上游的阻塞影響向其他上游推送,需要做到不同上游彼此之間互不影響。所以需要針對(duì)不同上游使用不同隊(duì)列,例如:

  1. CLIENT_CELERY_ROUTES = { 
  2.   # {0} 為 client 的占位符,在 ClientRouter 中進(jìn)行格式化 
  3.  'celery_task.push_tracking_retry': {'queue''push_tracking_retry_{0}'}, 
  4.  'celery_task.push_weight_retry': {'queue''push_weight_retry_{0}'}, 
  5.  
  6. class ClientRouter(object): 
  7.  
  8.  def route_for_task(self, task, args=None, kwargs=None): 
  9.    if task not in CLIENT_CELERY_ROUTES: 
  10.     return None 
  11.    client_id = kwargs('client_id'
  12.       # 根據(jù) client_id 獲取隊(duì)列名 
  13.    queue_name = CLIENT_CELERY_ROUTES[task]['queue'].format(client_id) 
  14.    return {'queue': queue_name} 
  15.  
  16. CELERY_ROUTES = { 
  17.  'ClientRouter' 
  18.  DEFAULT_CELERY_ROUTES, 

在 CLIENT_CELERY_ROUTES 中指定了需要根據(jù) Client 隔離隊(duì)列的 Task 和其對(duì)應(yīng)的 Queue 名稱格式,隊(duì)列名中含有一個(gè)占位符,為的是根據(jù)不同 Client 得到不同的隊(duì)列名。

接著實(shí)現(xiàn)了一個(gè)路由器 ClientRouter ,其中定義了 router_for_task 方法,其作用是為 task 指定對(duì)應(yīng)的隊(duì)列名。可看出其中的邏輯是如果 task 在 CLIENT_CELERY_ROUTES 中,將會(huì)用 kwargs 中的 client_id 格式化隊(duì)列名,得到最終發(fā)送消息的隊(duì)列名,達(dá)到根據(jù)入?yún)?client_id 來決定具體使用的隊(duì)列,從而起到隔離不同 Client 使用不同隊(duì)列的效果。

除了在 Client 的維度上劃分,若需要在其他維度進(jìn)一步劃分隊(duì)列以達(dá)到隔離的效果,也可參考該方法來設(shè)計(jì)路由規(guī)則。

動(dòng)態(tài)隊(duì)列

再來說說動(dòng)態(tài)隊(duì)列,其本質(zhì)是預(yù)備隊(duì)列,其目的是為了在線上環(huán)境減輕某些隊(duì)列消息堆積的壓力,起到快速支援的作用。通過配置來定義動(dòng)態(tài)隊(duì)列需要支援哪些隊(duì)列,例如當(dāng) push 隊(duì)列的壓力較大,可配置 json 如下,將 push_tracking 和 push_weight 兩個(gè) Task 路由到預(yù)備的動(dòng)態(tài)隊(duì)列中。

  1. celery_dynamic_router 配置 
  2.  
  3.  "celery_task.push_tracking": { 
  4.   "dynamic_queue": [1,2], 
  5.   "dynamic_percentage": 0.7, 
  6.  }, 
  7.  "celery_task.push_weight": { 
  8.   "dynamic_queue": [3,4], 
  9.   "dynamic_percentage": 0.7, 
  10.  }  

上述配置的作用是將 70% 的 celery_task.push_tracking Task 路由到動(dòng)態(tài)隊(duì)列 1、2 上,70% 的 celery_task.push_weight Task 路由到動(dòng)態(tài)隊(duì)列 3、4 上。

動(dòng)態(tài)隊(duì)列的路由器 DynamicRouter 大致實(shí)現(xiàn)如下:

  1. class DynamicRouter(object): 
  2.  
  3.  def route_for_task(self, task, args=None, kwargs=None): 
  4.   # 獲取配置 
  5.   task_config = get_conf_dict('celery_dynamic_router').get(task, None) 
  6.   # task如果沒在配置中,則直接返回 
  7.   if not task_config: 
  8.    return None 
  9.   # 獲取task對(duì)應(yīng)的動(dòng)態(tài)隊(duì)列配置 
  10.   dynamic_queue = task_config.get('dynamic_queue', []) 
  11.   dynamic_percentage = task_config.get('dynamic_percentage', 0.0) 
  12.   # 將一定比例的task路由到動(dòng)態(tài)隊(duì)列中 
  13.   if random.random() <= dynamic_percentage: 
  14.    # 決定使用哪個(gè)動(dòng)態(tài)隊(duì)列 
  15.    queue_name = router_load_balance(dynamic_queue, task_name) 
  16.    log.data('get_router| task_name:%s, queue:%s', task_name, queue_name) 
  17.    return {'queue': queue_name} 
  18.   else
  19.    return None 

動(dòng)態(tài)配置的定時(shí)任務(wù)

前文提到 Celery 不僅能實(shí)現(xiàn)異步任務(wù),還能通過 Celery Beat 實(shí)現(xiàn)定時(shí)任務(wù),首先看一個(gè)例子:

  1. from celery.schedules import crontab 
  2.  
  3. app.conf.beat_schedule = { 
  4.    # 每30秒發(fā)送一次郵件 
  5.     'sendmail-every-30-seconds': { 
  6.         'task''asks.send_mail'
  7.         'schedule': 30.0, 
  8.         'args': ['chaycao@gmail.com'
  9.     }, 

完成上述配置后,執(zhí)行 Celery Beat 命令:

celery beat

即根據(jù)配置每 30 秒執(zhí)行一次 send_email 任務(wù)。

上述示例是在代碼中配置定時(shí)任務(wù)。而在筆者的工作中使用了 djcelery 提供的數(shù)據(jù)庫(kù)調(diào)度模型,通過結(jié)合 django 提供的 ORM 功能來動(dòng)態(tài)設(shè)置,更為方便。下面敘述如何實(shí)現(xiàn),首先在 Celery 配置中新增:

  1. CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'  

設(shè)置使用 DatabaseScheduler,然后再生成定時(shí)任務(wù)的配置表:

  1. python manage.py migrate 

可以看到數(shù)據(jù)庫(kù)中多出了以下表:

  1. | celery_taskmeta            | 
  2. | celery_tasksetmeta         | 
  3. | djcelery_crontabschedule   | 
  4. | djcelery_intervalschedule  | 
  5. | djcelery_periodictask      | 
  6. | djcelery_periodictasks     | 
  7. | djcelery_taskstate         | 
  8. | djcelery_workerstate       | 

完成以上操作,最后只用執(zhí)行 Celery Beat 命令,則會(huì)去數(shù)據(jù)庫(kù)中讀取配置發(fā)起定時(shí)任務(wù)。這樣的好處是可以通過修改數(shù)據(jù)庫(kù)中的記錄來實(shí)現(xiàn)動(dòng)態(tài)配置定時(shí)任務(wù),例如調(diào)整任務(wù)的周期或者參數(shù)。

以上便是筆者在工作中接觸到 Celery 所收獲的內(nèi)容,如果有需要實(shí)現(xiàn)異步任務(wù)、定時(shí)任務(wù)的場(chǎng)景,可以考慮使用 Celery。

我是草捏子,一只熱愛技術(shù)和生活的草魚,我們下期見!

參考

Message Queue vs Task Queue difference (https://newbedev.com/message-queue-vs-task-queue-difference)

高性能異步框架Celery入坑指南 (https://juejin.cn/post/6844903689103081480)

 

分布式任務(wù)隊(duì)列 Celery—深入 Task (https://www.cnblogs.com/jmilkfan-fanguiju/p/10589779.html)

 

責(zé)任編輯:武曉燕 來源: 草捏子
相關(guān)推薦

2021-03-05 08:52:00

Celery在Windows分布式

2021-04-14 13:32:50

Redis輕量級(jí)分布式

2020-09-29 19:20:05

鴻蒙

2023-06-26 00:14:28

Openjob分布式任務(wù)

2023-02-28 07:01:11

分布式緩存平臺(tái)

2024-09-27 09:19:30

2022-03-21 19:44:30

CitusPostgreSQ執(zhí)行器

2023-11-14 08:36:15

Celery工具

2024-11-14 11:56:45

2024-09-12 14:50:08

2022-06-28 08:37:07

分布式服務(wù)器WebSocket

2013-03-22 14:44:52

大規(guī)模分布式系統(tǒng)飛天開放平臺(tái)

2024-04-08 11:04:03

2019-06-19 15:40:06

分布式鎖RedisJava

2022-12-13 09:19:26

分布式消息隊(duì)列

2020-11-06 12:12:35

HarmonyOS

2016-09-23 10:51:23

騰訊云

2022-09-07 08:18:26

分布式灰度方案分支號(hào)

2024-01-05 07:28:50

分布式事務(wù)框架

2019-11-15 10:16:27

分布式任務(wù)框架
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)

主站蜘蛛池模板: 91中文字幕 | 久久99蜜桃综合影院免费观看 | 免费看一区二区三区 | 亚洲国产免费 | 69精品久久久久久 | 狠狠躁天天躁夜夜躁婷婷老牛影视 | 免费在线视频a | 黄篇网址 | 日韩精品1区2区3区 成人黄页在线观看 | 国产午夜精品久久久久免费视高清 | 国产在线精品一区二区 | 亚洲一区二区三区四区五区中文 | 国产精品久久午夜夜伦鲁鲁 | 中文在线一区二区 | 免费午夜视频在线观看 | 精品96久久久久久中文字幕无 | 91色视频在线观看 | 欧美一区二区在线看 | 亚洲人成在线播放 | 亚洲精品一区二区在线观看 | 欧美在线观看黄色 | 国产成人免费视频网站高清观看视频 | 一区免费看| 成年网站在线观看 | 国产成人精品区一区二区不卡 | 超碰精品在线 | 黄色片网站国产 | 欧美日韩亚洲国产 | 精品欧美一区二区三区免费观看 | 在线亚洲免费视频 | 99色综合| 国产高清在线 | 国产在线观看一区二区 | 盗摄精品av一区二区三区 | 久久久无码精品亚洲日韩按摩 | 久久不射电影网 | 日韩国产三区 | 久久亚洲经典 | 国产精品s色 | 欧美日韩一区二区三区四区五区 | 黄色一级片视频 |