成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

開源微服務(wù)編排框架:Netflix Conductor

開源
被worker服務(wù)執(zhí)行,執(zhí)行與引擎隔離開,worker通過隊列獲取任務(wù)后,執(zhí)行并更新結(jié)果狀態(tài)到引擎。Worker的實現(xiàn)是跨語言的,其使用Http協(xié)議與Server通信。

[[438055]]

本文主要介紹netflix conductor的基本概念和主要運行機制。

一 簡介

netflix conductor是基于JAVA語言編寫的開源流程引擎,用于架構(gòu)基于微服務(wù)的流程。它具備如下特性:

  • 允許創(chuàng)建復(fù)雜的業(yè)務(wù)流程,流程中每個獨立的任務(wù)都是由一個微服務(wù)所實現(xiàn)。
  • 基于JSON DSL 創(chuàng)建工作流,對任務(wù)的執(zhí)行進行編排。
  • 工作流在執(zhí)行的過程中可見、可追溯。
  • 提供暫停、恢復(fù)、重啟等多種控制模型。
  • 提供一種簡單的方式來最大限度重用微服務(wù)。
  • 擁有擴展到百萬流程并發(fā)運行的服務(wù)能力。
  • 通過隊列服務(wù)實現(xiàn)客戶端與服務(wù)端的分離。
  • 支持 HTTP 或其他RPC協(xié)議進行數(shù)據(jù)傳送

二 基本概念

1 Task

Task是最小執(zhí)行單元,承載了一段執(zhí)行邏輯,如發(fā)送HTTP請求等。

  • System Task:被conductor服務(wù)執(zhí)行,這些任務(wù)的執(zhí)行與引擎在同一個JVM中。
  • Worker Task:被worker服務(wù)執(zhí)行,執(zhí)行與引擎隔離開,worker通過隊列獲取任務(wù)后,執(zhí)行并更新結(jié)果狀態(tài)到引擎。Worker的實現(xiàn)是跨語言的,其使用Http協(xié)議與Server通信。

conductor提供了若干內(nèi)置SystemTask:

  • 功能性Task:
    • HTTP:發(fā)送http請求
    • JSON_JQ_TRANSFORM:jq命令執(zhí)行,一般用戶json的轉(zhuǎn)換,具體可見jq官方文檔
    • KAFKA_PUBLISH: 發(fā)布kafka消息
  • 流程控制Task:
  • SWITCH(原Decision):條件判斷分支,類似于代碼中的switch case
  • FORK:啟動并行分支,用于調(diào)度并行任務(wù)
  • JOIN:匯總并行分支,用于匯總并行任務(wù)
  • DO_WHILE:循環(huán),類似于代碼中的do while
  • WAIT:一直在運行中,直到外部時間觸發(fā)更新節(jié)點狀態(tài),可用于等待外部操作
  • SUB_WORKFLOW:子流程,執(zhí)行其他的流程
  • TERMINATE:結(jié)束流程,以指定輸出提前結(jié)束流程,可以與SWITCH節(jié)點配合使用,類似代碼中的提前return語句

自定義Task:

  • 對于System Task,Conductor提供了WorkflowSystemTask 抽象類,可以自定義擴展實現(xiàn)。
  • 對于Worker Task,可以實現(xiàn)conductor的client Worker接口實現(xiàn)執(zhí)行邏輯。

2 Workflow

  • Workflow由一系列需要執(zhí)行的Task組成,conductor采用json來描述Task的流轉(zhuǎn)關(guān)系。
  • 除基本的順序流程外,借助內(nèi)置的SWITCH、FORK、JOIN、DO_WIHLE、TERMINATE任務(wù),還能實現(xiàn)分支、并行、循環(huán)、提前結(jié)束等流程控制。

3 Input&Output

Task的輸入是一種映射,其作為工作流實例化的一部分或某些其他Task的輸出。允許將來自工作流或其他Task的輸入/輸出作為隨后執(zhí)行的Task的輸入。

  • Task有自己的輸入和輸出,輸入輸出都是jsonobject類型。
  • Task可以引用其他Task的輸入輸出,使用${taskxxx.output}的方式引用。引用語法為json-path,除最基礎(chǔ)的${taskxxx.output}的值解析方式外,還支持其他復(fù)雜操作,如過濾等,具體見json-path語法。
  • 啟動Workflow時可以傳入流程的輸入數(shù)據(jù),Task可以通過${workflow.input}的方式引用。

Task實現(xiàn)原子操作的處理以及流程控制操作,Workflow定義描述Task的流轉(zhuǎn)關(guān)系,Task引用Workflow或者其它Task的輸入輸出。通過這些機制,conductor實現(xiàn)了JSON DSL對流程的描述。

三 整體架構(gòu)

主要分為幾個部分:

  • Orchestrator: 負責流程的流轉(zhuǎn)調(diào)度工作;
  • Management/Execution Service: 提供流程、任務(wù)的管理更新等操作;
  • TaskQueues: 任務(wù)隊列,Orchestrator解析出來的待執(zhí)行Task會放到隊列中;
  • Worker: 任務(wù)執(zhí)行worker,從TaskQueues中獲取任務(wù),通過Execution Service更新任務(wù)狀態(tài)與結(jié)果數(shù)據(jù);
  • Database: 元數(shù)據(jù)&運行時數(shù)據(jù)庫,用于保存運行時的Workflow、Task等狀態(tài)信息,以及流程任務(wù)定義的等原信息;
  • Index: 索引數(shù)據(jù)庫,用于存儲執(zhí)行歷史;

四 運行模型

1 Task狀態(tài)轉(zhuǎn)移

  • SCHEDULED:待調(diào)度,task放到隊列中還沒有被poll出來執(zhí)行時的狀態(tài)
  • IN_PROGRESS:執(zhí)行中,被poll出來執(zhí)行但還沒有完成時的狀態(tài)
  • COMPLETED:執(zhí)行完成
  • FAILED:執(zhí)行失敗
  • CANCELLED:被中止時為此狀態(tài),一般出現(xiàn)在兩種情況:
    • 1.手動中止流程時,正在運行中的task會被置為此狀態(tài);
    • 2.多個fork分支,當某個分支的task失敗時,其它分支中正在運行的task會被置為此狀態(tài);

2 任務(wù)隊列

任務(wù)的執(zhí)行(同步的系統(tǒng)任務(wù)除外)都會先添加到任務(wù)隊列中,是典型的生產(chǎn)者消費者模式。

  • 任務(wù)隊列,是一個帶有延遲、優(yōu)先級功能的隊列;
  • 每種類型的Task是一個單獨的隊列,此外,如果配置了domain、isolationGroup,還會拆分成多個隊列實現(xiàn)執(zhí)行隔離;
  • decider service是生產(chǎn)者,其根據(jù)流程配置與當前執(zhí)行情況,解析出可執(zhí)行的task后,添加到隊列;
  • 任務(wù)執(zhí)行器(SystemTaskWorker、Worker)是消費者,其長輪詢對應(yīng)的隊列,從隊列中獲取任務(wù)執(zhí)行;

隊列接口可插拔,conductor提供了Dynomite 、MySQL、PostgreSQL的實現(xiàn)。

3 核心功能實現(xiàn)機制

conductor調(diào)度的核心是decider service,其根據(jù)當前流程運行的狀態(tài),解析出將要執(zhí)行的任務(wù)列表,將任務(wù)入隊交給worker執(zhí)行。

decide主要流程簡化如下,詳細代碼見WorkflowExecutor.java的decide方法:

其中,調(diào)度任務(wù)處理流程簡化如下,詳細代碼見WorkflowExecutor.java的scheduleTask方法:

decide的觸發(fā)時機

最主要的觸發(fā)時機:

新啟動執(zhí)行時,會觸發(fā)decide操作

系統(tǒng)任務(wù)執(zhí)行完成時,會觸發(fā)decide操作

Workder任務(wù)通過ExecutionService更新任務(wù)狀態(tài)時,會觸發(fā)decide操作

流程控制節(jié)點的實現(xiàn)機制

1)Task & TaskMapper

對于每一個Task來說,都有Task和TaskMapper兩部分:

Task:任務(wù)的執(zhí)行邏輯代碼,它的作用是Task的執(zhí)行

TaskMapper:任務(wù)的映射邏輯代碼,它通過Task的定義配置、當前實例的執(zhí)行狀態(tài)等信息,返回實際需要執(zhí)行的Task列表

對于一般的任務(wù)來說,TaskMapper返回的是就是Task本身,補充一些執(zhí)行實例的狀態(tài)信息。但是對于控制節(jié)點來說,會有不同的邏輯。

2)條件分支(SWITCH)的實現(xiàn)機制

SWITCH用于根據(jù)條件判斷,執(zhí)行不同的分支。

實際上,該節(jié)點的Task不做任何操作,TaskMapper根據(jù)分支條件,判斷出要走的分之后,返回對應(yīng)分支的第一個Task。

SwitchTaskMapper.java getMappedTasks方法關(guān)鍵代碼:

  1. // 待調(diào)度的Task list,最終返回結(jié)果 
  2. List<Task> tasksToBeScheduled = new LinkedList<>(); 
  3. // evalResult是分支條件變量的值(case
  4. // decisionCases是一個Map結(jié)構(gòu),key為分支的case值,value為對應(yīng)分支的任務(wù)定義list(分支內(nèi)的任務(wù)定義會有多個) 
  5. // 根據(jù)分支變量的實際值,獲取對應(yīng)分支的任務(wù)定義list 
  6. List<WorkflowTask> selectedTasks = taskToSchedule.getDecisionCases().get(evalResult); 
  7. // default的邏輯:如果獲取不到對應(yīng)的分支或者分支為空,則用默認的分支 
  8. if (selectedTasks == null || selectedTasks.isEmpty()) { 
  9.   selectedTasks = taskToSchedule.getDefaultCase(); 
  10. if (selectedTasks != null && !selectedTasks.isEmpty()) { 
  11.   // 獲取分支的第一個(下標0)task,返回給decider service去做調(diào)度(decider會把任務(wù)添加到隊列里,交給worker去執(zhí)行) 
  12.   WorkflowTask selectedTask = selectedTasks.get(0); 
  13.   // 調(diào)用了deciderService的getTasksToBeScheduled方法,此方法里又獲取到TaskMapper調(diào)用了getMappedTasks。這里采用了遞歸調(diào)用的方式,解析嵌套的Task 
  14.   List<Task> caseTasks = taskMapperContext.getDeciderService() 
  15.     .getTasksToBeScheduled(workflowInstance, selectedTask, retryCount, taskMapperContext.getRetryTaskId()); 
  16.   tasksToBeScheduled.addAll(caseTasks); 
  17.   switchTask.getInputData().put("hasChildren""true"); 
  18. return tasksToBeScheduled; 

3)并行(FORK)的實現(xiàn)機制

FORK用于開啟多個并行分支。

實際上,該節(jié)點的Task不做任何操作,TaskMapper返回所有并行分支的第一個Task。

ForkJoinTaskMapper.java getMappedTasks關(guān)鍵代碼:

  1. // 待調(diào)度的Task list,最終返回結(jié)果 
  2. List<Task> tasksToBeScheduled = new LinkedList<>(); 
  3. // 配置中的所有fork分支 
  4. List<List<WorkflowTask>> forkTasks = taskToSchedule.getForkTasks(); 
  5. for (List<WorkflowTask> wfts : forkTasks) { 
  6.   // 每個分支取第一個Task 
  7.   WorkflowTask wft = wfts.get(0); 
  8.   // 調(diào)用了deciderService的getTasksToBeScheduled方法,此方法里又獲取到TaskMapper調(diào)用了getMappedTasks。這里采用了遞歸調(diào)用的方式,解析嵌套的Task 
  9.   List<Task> tasks2 = taskMapperContext.getDeciderService() 
  10.     .getTasksToBeScheduled(workflowInstance, wft, retryCount); 
  11.   tasksToBeScheduled.addAll(tasks2); 
  12. return tasksToBeScheduled; 

總的來說,分支(SWITCH)、并行(FORK)節(jié)點本身沒有執(zhí)行邏輯,其通過TaskMapper返回到實際要執(zhí)行的Task,然后交給Decider Service處理。

重試的實現(xiàn)機制

重試和其延遲時間設(shè)置,都是借助任務(wù)隊列的功能實現(xiàn)的。

重試:將任務(wù)重新添加到任務(wù)隊列

重試的延遲時間:添加到任務(wù)隊列時設(shè)置延遲時間,延遲時間過后,任務(wù)才能在隊列中被poll出來執(zhí)行

五 完整性保障機制

由于調(diào)度過程中可能會出現(xiàn)因機器重啟、網(wǎng)絡(luò)異常、JVM崩潰等偶發(fā)情況,這些會導(dǎo)致的decide過程意外終止,流程執(zhí)行不完整,展現(xiàn)出如流程一直運行中(實際已經(jīng)沒有在調(diào)度),或者其它狀態(tài)錯誤等異常現(xiàn)象。

1 WorkflowReconciler

針對這種情況,conductor有一個WorkflowReconciler,會定期嘗試decide所有正在運行中的流程,修復(fù)流程執(zhí)行的一致性。此外,它還有一個作用是校驗流程超時時間。

2 decideQueue

那么WorkflowReconciler是如何獲取到當前運行中的流程呢,答案是decideQueue。

decideQueue和任務(wù)隊列相同,也是一個具有延遲功能的隊列,其存放的是正在執(zhí)行中的流程的實例id。在任務(wù)開始執(zhí)行時(包括新啟動執(zhí)行、重試執(zhí)行、恢復(fù)執(zhí)行、重跑執(zhí)行等),會將實例id push到decideQueue中;在執(zhí)行結(jié)束(成功、失敗)時,會從decideQueue中刪除實例id。

3 ExecutionLockService

WorkflowReconciler會定期嘗試decide所有正在運行中的流程用于超時判斷、維護流程一致性。但是流程本身正常執(zhí)行也會觸發(fā)decide,如果同一個執(zhí)行同時觸發(fā)兩個decide,可能會導(dǎo)致狀態(tài)混亂,執(zhí)行卡住等問題。

conductor采用了鎖來解決這個問題,其提供了單機LocalOnlyLock(基于信號量實現(xiàn))、redis分布式鎖(基于redission實現(xiàn))、zookeeper分布式鎖三種實現(xiàn)。

decide方法中最開始會嘗試獲取鎖,如果獲取失敗則直接返回。通過鎖來保障不會對同一個流程實例并發(fā)執(zhí)行decide。

  1. if (!executionLockService.acquireLock(workflowId)) { 
  2.  
  3. return false
  4.  

由于鎖是可配置的,可能會導(dǎo)致一個誤區(qū):單臺機器的話不用配置鎖。其實單機也是需要配置鎖的,因為WorkflowReconciler和流程正常執(zhí)行會產(chǎn)生沖突,可能會導(dǎo)致偶發(fā)的流程狀態(tài)混亂問題。

參考:

Github: https://github.com/Netflix/conductor

官方文檔:https://netflix.github.io/conductor/

WorkflowReconciler:https://github.com/Netflix/conductor/blob/main/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowReconciler.java

WorkflowSystemTask:https://github.com/Netflix/conductor/blob/main/core/src/main/java/com/netflix/conductor/core/execution/tasks/WorkflowSystemTask.java?spm=ata.21736010.0.0.2b501a3cYnrSfT&file=WorkflowSystemTask.java

 

責任編輯:武曉燕 來源: 阿里技術(shù)
相關(guān)推薦

2024-01-05 16:46:26

2022-10-08 07:31:26

微服務(wù)編排體系

2019-07-12 08:45:07

開源微服務(wù)框架

2024-07-09 10:57:29

2021-02-20 17:25:18

Netflix開源GraphQL

2024-10-15 17:28:05

2022-06-08 08:52:04

Tars微服務(wù)開發(fā)C++開發(fā)

2023-10-26 23:35:02

SSH登錄部署

2020-09-19 17:54:04

Netflix

2022-07-01 08:36:44

流編排主流框架

2023-10-27 18:47:35

微服務(wù)底層機制

2017-01-19 11:10:20

5G網(wǎng)絡(luò)Open Baton虛擬網(wǎng)絡(luò)

2021-01-12 09:38:02

微服務(wù)服務(wù)組合編排

2020-10-26 07:05:02

大數(shù)據(jù)管道編排編排框架

2021-03-02 16:25:52

微服務(wù)架構(gòu)技術(shù)

2023-02-17 18:06:33

微服務(wù)架構(gòu)

2020-11-27 10:50:06

微服務(wù)架構(gòu)框架

2017-06-08 13:31:40

NetflixEureka服務(wù)器

2021-01-08 10:45:40

框架微服務(wù)架構(gòu)

2022-02-20 22:10:20

微服務(wù)框架gRPC
點贊
收藏

51CTO技術(shù)棧公眾號

主站蜘蛛池模板: 91xxx在线观看| 国内久久精品 | 国产精品99视频 | 成人午夜视频在线观看 | 国产激情视频在线 | 亚洲一区久久 | 国产精品一区二区不卡 | 国产乱码精品一区二区三区五月婷 | 一区视频在线 | 亚洲精品99 | 一级毛片在线看 | av片网站 | 亚洲成人免费视频 | 欧美日韩精品 | 91久久精品国产 | 91在线视频观看 | 亚洲福利一区二区 | 色姑娘综合网 | 天堂av免费观看 | 卡通动漫第一页 | 亚洲综合色视频在线观看 | 黄色福利 | 91成人免费电影 | 国产精品免费一区二区三区四区 | 欧美精品一二区 | 欧美日韩免费一区二区三区 | 国产精品亚洲综合 | 欧美日本一区二区 | 国产精品视频偷伦精品视频 | 精品综合 | 成人福利电影 | 99精品视频在线观看 | 黄色片大全在线观看 | 精品91久久| 亚洲精品久久久一区二区三区 | 日本一区二区三区四区 | 午夜成人免费视频 | 自拍视频一区二区三区 | 精品成人免费一区二区在线播放 | 国产精品美女www爽爽爽 | 色一情一乱一伦一区二区三区 |