成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

項目終于用上了xxl-job,好起來了!

云計算 分布式
設計思想 是將調度行為抽象形成 調度中心 平臺,平臺本身不承擔業務邏輯,而是負責發起 調度請求 后,由 執行器 接收調度請求并執行 任務,這里的 任務 抽象為 分散的 JobHandler。通過這種方式即可實現 調度 與 任務 相互解耦,從而提高系統整體的穩定性和拓展性。

本篇文章主要記錄項目中遇到的 xxl-job 的實戰,希望能通過這篇文章告訴讀者們什么是 xxl-job 以及怎么使用 xxl-job 并分享一個實戰案例。

那么下面先說明什么是 xxl-job 以及為什么要使用它。

xxl-job 是什么?

XXL-JOB 是一個分布式任務調度平臺,其核心設計目標是開發迅速、學習簡單、輕量級、易擴展。

設計思想 是將調度行為抽象形成 調度中心 平臺,平臺本身不承擔業務邏輯,而是負責發起 調度請求 后,由 執行器 接收調度請求并執行 任務,這里的 任務 抽象為 分散的 JobHandler。通過這種方式即可實現 調度 與 任務 相互解耦,從而提高系統整體的穩定性和拓展性。

為了更好理解,這里放一張官網的架構圖:

圖片圖片

任務調度是什么?

在開發項目時大家是否也遇到過類似的場景問題:

  • 系統需要定時在每天0點進行數據備份。
  • 系統需要在活動開始前幾小時預熱執行一些前置業務。
  • 系統需要定時對 MQ 消息表的發送裝填,對發送失敗的 MQ 消息進行補償重新發送。

這些場景問題都可以通過 任務調度 來解決,任務調度指的是系統在約定的指定時間自動去執行指定的任務的過程。

單體系統 中有許多實現 任務調度 的方式,如多線程方式、Timer 類、Spring Tasks 等等。這里比較常用的是 Spring Tasks(通過 @EnableScheduling + @Scheduled 的注解可以自定義定時任務,有興趣的可以去了解一下)

為什么需要分布式任務調度平臺?

分布式下,每個服務都可以搭建為集群,這樣的好處是可以將任務切片分給每一個服務從而實現并行執行,提高任務調度的處理效率。那么為什么 分布式系統 不能使用 單體系統 的任務調度實現方式呢。

在集群服務下,如果還是使用每臺機器按照單體系統的任務調度實現方式實現的話,會出現下面這四個問題:

  1. 怎么做到對任務的控制(如何避免任務重復執行)。
  2. 如果某臺機器宕機了,會不會存在任務丟失。
  3. 如果要增加服務實例,怎么做到彈性擴容。
  4. 如何做到對任務調度的執行情況統一監測。

通過上面的問題可以了解到分布式系統下需要一個滿足高可用、容錯管理、負載均衡等功能的任務調度平臺來實現任務調度。分布式系統下,也有許多可以實現任務調度的第三方的分布式任務調度系統,如 xxl-job、Quartz、elastic-job 等等常用的分布式任務調度系統。

如何使用 xxl-job

作為開源軟件的 xxl-job,可以在 github 或 gitee上查看和下載 xxl-job 的源碼。

下面將介紹我使用 xxl-job 的流程(如果有操作不當的,可以查看官方的中文文檔:https://www.xuxueli.com/xxl-job)

dokcer 下安裝 xxl-job:

1、docker 下拉取 xxl-job 的鏡像(這里使用 2.3.1 版本)

docker pull xuxueli/xxl-job-admin:2.3.1

2、創建映射容器的文件目錄

mkdir -p -m 777 /mydata/xxl-job/data/applogs

3、在 /mydata/xxl-job 的目錄下創建 application.properties 文件

由于 application.properties 的代碼過長,這里就不展示了,需要的可以去 gitee 上獲取,具體路徑如圖:

圖片圖片

這里需要注意數據庫位置的填寫:

圖片圖片

如果還需要更改端口的可以更改這里:

圖片圖片

這里還需要注意告警郵箱和訪問口令(后續Spring Boot配置用到):

圖片圖片

4、將 tables_xxl-job.sql 文件導入上面步驟3指定的數據庫(自己填寫的那個數據庫)

同樣由于文件代碼過長,這里展示 gitee 上獲取的路徑圖:

圖片圖片

5、執行 docker 命令

注意這里的 -p 8088:8088 是因為我更改了前面 application.porperties 文件的端口號為 8088,所以這里我執行的 docker 命令為 -p 8088:8088,如果沒有更改的這里一定要改為 -p 8080:8080。

docker run  -p 8088:8088 \
-d --name=xxl-job-admin --restart=always \
-v /mydata/xxl-job/application.properties:/application.properties \
-v /mydata/xxl-job/data/applogs:/data/applogs \
-e PARAMS='--spring.config.locatinotallow=/application.properties' xuxueli/xxl-job-admin:2.3.1

執行后通過 docker ps 查看是否成功運行,如果失敗可以通過 docker logs xxl-job-admin 查看具體錯誤日志。

6、通過 http://192.168.101.25:8088/xxl-job-admin/ 訪問(這里ip和端口是自己的)

圖片圖片

賬號:admin 密碼:123456

到這里就算是完成了 xxl-job 在 docker 的搭建。

Spring Boot 項目集成 xxl-job

xxl-job 由 調度中心 和 執行器 組成,上面已經完成了在 docker 上部署調度中心了,接下來介紹怎么配置部署執行器項目。

1、在 Spring Boot 項目中導入 maven 依賴

<dependency>
    <groupId>com.xuxueli</groupId>
    <artifactId>xxl-job-core</artifactId>
    <version>2.3.1</version>
</dependency>

這里需要注意版本號與 xxl-job 版本需要一致,這里我配置的都是 2.3.1 版本。

2、在 Spring Boot 項目中配置 application.yml 文件

xxl:
  job:
    admin:
      addresses: http://192.168.101.25:8088/xxl-job-admin
    executor:
      appname: media-process-service
      address:
      ip:
      port: 9999
      logpath: /data/applogs/xxl-job/jobhandler
      logretentiondays: 30
    accessToken: default_token
  • 這里的 xxl.job.admin.addresses 用于指定調度中心的地址。
  • 這里的 xxl.job.accessToken 用于指定訪問口令(也就是前面搭建 xxl-job 中步驟3指定的)。
  • 這里的 xxl.job.executor.appname 用于指定執行器的名稱(需要與后續配置執行器的名稱一致)。關注工眾號:碼猿技術專欄,回復關鍵詞:1111 獲取阿里內部Java性能調優手冊!
  • 這里的 xxl.job.executor.port 用于指定執行器的端口(執行器實際上是一個內嵌的 Server,默認端口為9999,配置多個同一服務實例時需要指定不同的執行器端口,否則會端口沖突)。
  • 其他屬性只需要照著配置即可(想要了解屬性的具體含義可以查看中文文檔中的2.4配置部署執行器項目章節)。

3、編寫配置類

/**
 * XXL-JOB配置類
 *
 * @author 公眾號:碼猿技術專欄
 */
@Slf4j
@Configuration
public class XxlJobConfig {
    @Value("${xxl.job.admin.addresses}")
    private String adminAddresses;

    @Value("${xxl.job.accessToken}")
    private String accessToken;

    @Value("${xxl.job.executor.appname}")
    private String appname;

    @Value("${xxl.job.executor.address}")
    private String address;

    @Value("${xxl.job.executor.ip}")
    private String ip;

    @Value("${xxl.job.executor.port}")
    private int port;

    @Value("${xxl.job.executor.logpath}")
    private String logPath;

    @Value("${xxl.job.executor.logretentiondays}")
    private int logRetentionDays;

    @Bean
    public XxlJobSpringExecutor xxlJobExecutor() {
        log.info(">>>>>>>>>>> xxl-job config init.");
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        xxlJobSpringExecutor.setAppname(appname);
        xxlJobSpringExecutor.setAddress(address);
        xxlJobSpringExecutor.setIp(ip);
        xxlJobSpringExecutor.setPort(port);
        xxlJobSpringExecutor.setAccessToken(accessToken);
        xxlJobSpringExecutor.setLogPath(logPath);
        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
        return xxlJobSpringExecutor;
    }
}

4、調度中心中新增執行器

圖片圖片

執行器的配置屬性:

  • AppName: 每個執行器集群的唯一標示 AppName,執行器會周期性以 AppName 為對象進行自動注冊。可通過該配置自動發現注冊成功的執行器,供任務調度時使用。
  • 名稱: 執行器的名稱(可以使用中文更好地體現該執行器是用來干嘛的)。
  • 注冊方式:調度中心獲取執行器地址的方式(一般為了方便可以選用自動注冊即可)。
  • 自動注冊:執行器自動進行執行器注冊,調度中心通過底層注冊表可以動態發現執行器機器地址。
  • 手動錄入:人工手動錄入執行器的地址信息,多地址逗號分隔,供調度中心使用。
  • 機器地址:"注冊方式"為"手動錄入"時有效,支持人工維護執行器的地址信息。

5、配置自定義任務

配置自定義任務有許多種模式,如 Bean模式(基于方法)、Bean模式(基于類)、GLUE模式等等。這里介紹通過 Bean模式(基于方法) 是如何自定義任務的(對于其余的模式可以參考官方文檔)。

Bean模式(基于方法)也就是每個任務對應一個方法,通過添加 @XxLJob(value="自定義JobHandler名稱", init = "JobHandler初始化方法", destroy = "JobHandler銷毀方法") 注解即可完成定義。

/**
 * 任務處理類
 *
 * @author 公眾號:碼猿技術專欄
 */
@Component
public class TestJob {
    /**
     * 測試任務
     */
    @XxlJob("testHandler")
    public void testHandler() {
        XxlJobHelper.handleSuccess("本次測試任務調度成功");
    }
}
  • 通過注解也可以指定 初始化方法和銷毀方法,如果不填寫可以直接寫一個 自定義的JobHandler名稱 用于后面在調度中心中配置任務時對應任務的 JobHandler 屬性值。
  • 可以通過 XxlJobHelper.log 來打印日志,通過調度中心可以查看執行日志的情況。
  • 可以通過 XxlJobHelper.handleFail 或 XxlJobHelper.handleSuccess 手動設置任務調度的結果(不設置時默認結果為成功狀態,除非任務執行時出現異常)。

6、調度中心中新增任務

圖片圖片

這里主要注意 Cron 表達式的時間配置以及 JobHandler 的值需要與自定義任務方法的注解上的 value 屬性值一致即可。

關于高級配置這里放一張中文文檔的詳細說明(也可以直接去看文檔):

圖片圖片

需要搭建集群或過期策略等高級玩法時可以進行配置。

到這里就完成了 SpringBoot 集成 xxl-job 實現分布式任務調度的全過程了,接下來會通過一個實戰案例來具體看看 xxl-job 的用處。

xxl-job 實戰

下面通過一個最近自己在跟著做的學習項目中使用到 xxl-job 的場景案例來具體了解一下如何利用 xxl-job 來實現任務調度。

實戰背景

當前項目需要對上傳到分布式文件系統 minio 中的視頻文件進行統一格式的視頻轉碼操作,由于本身視頻轉碼操作會帶了很大的時間消耗以及 CPU 的開銷,所以考慮集群服務下使用 xxl-job 的方式以任務調度的方式定時處理視頻轉碼操作。

這樣可以帶來兩個好處:① 以任務調度的方式,可以使得視頻轉碼操作不會阻塞主線程,避免影響主要業務的吞吐量;關注工眾號:碼猿技術專欄,回復關鍵詞:1111 獲取阿里內部Java性能調優手冊!② 以集群服務分片接收任務的方式,可以將任務均分給每個機器使得任務調度可以并行執行,提高總任務處理時間以及降低單臺機器 CPU 的開銷;

xxl-job 執行流程圖

xxl-job實戰.pngxxl-job實戰.png

怎么將任務均分給每臺服務器?

由于任務執行時間過長,需要搭建集群服務來做到并行任務調度,從而減小 CPU 的開銷,那么怎么均分任務呢?

利用 xxl-job 在集群部署時,配置路由策略中選擇 分片廣播 的方式,可以使一次任務調度會廣播觸發集群中所有的執行器執行一次任務,并且可以向系統傳遞分片參數。

圖片圖片

利用這一特性可以根據 當前執行器的分片序號和分片總數 來獲取對應的任務記錄。

先來看看 Bean 模式下怎么獲取分片序號和分片總數:

// 分片序號(當前執行器序號)
int shardIndex = XxlJobHelper.getShardIndex();
// 分片總數(執行器總數)
int shardTotal = XxlJobHelper.getShardTotal();

有了這兩個屬性,當執行器掃描數據庫獲取記錄時,可以根據 取模 的方式獲取屬于當前執行器的任務,可以這樣編寫 sql 獲取任務記錄:

select * from media_process m
where m.id % #{shareTotal} = #{shareIndex}  
  and (m.status = '1' or m.status = '3')
  and m.fail_count < 3
limit #{count}

掃描任務表,根據任務 id 對分片總數 取模 來實現對所有分片的均分任務,通過判斷是否是當前分片序號,并且當前任務狀態為 1(未處理)或 3(處理失敗)并且當前任務失敗次數小于3次時可以取得當前任務。每次掃描只取出 count 個任務數(批量處理)。

圖片圖片

因此通過 xxl-job 的分片廣播 + 取模 的方式即可實現對集群服務均分任務的操作。

怎么確保任務不會被重復消費?

由于視頻轉碼本身處理時間就會比較長,所以更不允許服務重復執行,雖然上面通過分片廣播+取模的方式提高了任務不會被重復執行的機率,但是依舊存在如下情況:

如下圖,有三臺集群機器和六個任務,剛開始分配好了每臺機器兩個任務,執行器0正準備執行任務3時,剛好執行器2宕機了,此時執行器1剛好執行一次任務,因為分片總數減小,導致執行器1重新分配到需要執行的任務正好也是任務3,那么此時就會出現執行器0和執行器1都在執行任務3的情況。

圖片圖片

那么這種情況就需要實現冪等性了,冪等性有很多種實現方法,有興趣了解的可以參考:接口冪等性的實現方案

這里使用樂觀鎖的方式實現冪等性,具體 sql 如下:

update media_process m
set m.status = '2'
where (m.status = '1' or m.status = '3')
  and m.fail_count < 3
  and m.id = #{id}

這里只需要依靠任務的狀態即可實現(未處理1;處理中2;處理失敗3;處理成功4),可以看到這里類似于 CAS 的方式通過比較和設置的方式只有在狀態為未處理或處理失敗時才能設置為處理中。這樣在并發場景下,即使多個執行器同時處理該任務,也只有一個任務可以設置成功進入處理任務階段。

為了真正達到冪等性,還需要設置一下 xxl-job 的調度過期策略和阻塞處理策略來保證真正的冪等性。分別設置為 忽略(調度過期后,忽略過期的任務,從當前時間開始重新計算下次觸發時間) 和 丟棄后續調度(調度請求進入單機執行器后,發現執行器存在運行的調度任務,本次請求將會被丟棄并標記為失敗)。

圖片圖片

編寫完成該功能所需的所有任務

1、分片視頻轉碼處理

代碼(這里的代碼只展示部分核心步驟代碼):

/**
 * 視頻轉碼處理任務
 */
@XxlJob("videoTranscodingHandler")
public void videoTranscodingHandler() throws InterruptedException {
    // 1. 分片獲取當前執行器需要執行的所有任務
    List<MediaProcess> mediaProcessList = mediaProcessService.getMediaProcessList(shardIndex, shardTotal, count);
    // 通過JUC工具類阻塞直到所有任務執行完
    CountDownLatch countDownLatch = new CountDownLatch(mediaProcessList.size());
    // 遍歷所有任務
    mediaProcessList.forEach(mediaProcess -> {
        // 以多線程的方式執行所有任務
        executor.execute(() -> {
            try {
                // 2. 嘗試搶占任務(通過樂觀鎖實現)
                boolean res = mediaProcessService.startTask(id);
                if (!res) {
                    XxlJobHelper.log("任務搶占失敗,任務id{}", id);
                    return;
                }
                
                // 3. 從minio中下載視頻到本地
                File file = mediaFileService.downloadFileFromMinIO(bucket, objectName);
                // 下載失敗
                if (file == null) {
                    XxlJobHelper.log("下載視頻出錯,任務id:{},bucket:{},objectName:{}", id, bucket, objectName);
                    // 出現異常重置任務狀態為處理失敗等待下一次處理
                    mediaProcessService.saveProcessFinishStatus(id, Constants.MediaProcessCode.FAIL.getValue(), fileId, null, "下載視頻到本地失敗");
                    return;
                }
                
                // 4. 視頻轉碼
                String result = videoUtil.generateMp4();
                if (!result.equals("success")) {
                    XxlJobHelper.log("視頻轉碼失敗,原因:{},bucket:{},objectName:{},", result, bucket, objectName);
                    // 出現異常重置任務狀態為處理失敗等待下一次處理
                    mediaProcessService.saveProcessFinishStatus(id, Constants.MediaProcessCode.FAIL.getValue(), fileId, null, "視頻轉碼失敗");
                    return;
                }
                
                // 5. 上傳轉碼后的文件
                boolean b1 = mediaFileService.addMediaFilesToMinIO(new_File.getAbsolutePath(), "video/mp4", bucket, objectNameMp4);
                if (!b1) {
                    XxlJobHelper.log("上傳 mp4 到 minio 失敗,任務id:{}", id);
                    // 出現異常重置任務狀態為處理失敗等待下一次處理
                    mediaProcessService.saveProcessFinishStatus(id, Constants.MediaProcessCode.FAIL.getValue(), fileId, null, "上傳 mp4 文件到 minio 失敗");
                    return;
                }

                // 6. 更新任務狀態為成功
                mediaProcessService.saveProcessFinishStatus(id, Constants.MediaProcessCode.SUCCESS.getValue(), fileId, url, "創建臨時文件異常");
                
            } finally {
                countDownLatch.countDown();
            }
        });
    });
    // 阻塞直到所有方法執行完成(30min后不再等待)
    countDownLatch.await(30, TimeUnit.MINUTES);
}

核心任務 - 分片獲取任務后執行視頻轉碼任務,步驟如下:

  • 通過 分片廣播拿到的參數以取模的方式 獲取當前執行器所屬的任務記錄集合
  • 遍歷集合,以 多線程的方式 并發地執行任務
  • 每次執行任務前需要先通過 數據庫樂觀鎖的方式 搶占當前任務,搶占到才能執行;關注工眾號:碼猿技術專欄,回復關鍵詞:1111 獲取阿里內部Java性能調優手冊!
  • 執行任務過程分為 分布式文件系統下載需要轉碼的視頻文件 -> 視頻轉碼 -> 上傳轉碼后的視頻 -> 更新任務狀態(處理成功)
  • 使用JUC工具類 CountDownLatch 實現所有任務執行完后才退出方法
  • 中間使用 xxl-job 的日志記錄錯誤信息和執行結果

2、清理任務表中轉碼成功的任務的記錄并將其插入任務歷史表

由于任務表處理完任務后只是更新任務狀態,這樣隨著任務增多會導致檢索起來時間消耗過大,所以使用任務調度的方式定期掃描任務表,將任務狀態為處理成功的任務刪除并重新插入任務歷史表中留存(由于代碼過于簡單,這里就不做展示了)。

主要實現兩個功能:① 清理任務表中已成功處理的任務;② 將處理成功的任務記錄插入歷史表中;

3、視頻補償機制

由于使用樂觀鎖會將任務狀態更新為處理中,如果此時執行任務的執行器(服務)宕機了,會導致該任務記錄一直存在,因為樂觀鎖的原因別的執行器也無法獲取,這個時候同樣需要使用任務調度的方式,定期掃描任務表,判斷任務是否處于處理中狀態并且任務創建時間遠大于30分鐘,則說明任務超時了,則是使用任務調度的方式重新更新任務的狀態為未處理,等待下一次視頻轉碼任務的調度處理。此外視頻補償機制任務調度還需要檢查是否存在任務最大次數已經大于3次的,如果存在則交付給人工處理(由于代碼過于簡單,這里就不做展示了)。

主要實現兩個功能:① 處理任務超時情況下的任務,做出補償;② 處理失敗次數大于3次的任務,做出補償;

測試并查看日志

準備好的任務表記錄:

圖片圖片

啟動三臺媒資服務器,并開啟任務:

圖片圖片

可以單獨查看每個任務的日志:

圖片圖片

通過日志中的執行日志查看具體日志信息:

圖片圖片

可以看到直接為了測試改錯的路徑導致下載視頻出錯:

圖片圖片

查看數據庫表的變化:

圖片圖片

到這里可以看到核心的視頻轉碼任務執行成功,并且邏輯正確,能夠起到分布式任務調度的作用。

總結

這就是本次 xxl-job 實戰的全部內容了,寫這篇文章主要是為了記錄一下項目中是如何使用 xxl-job 的,并且提供一種分片廣播均分任務的思路以及冪等性問題如何處理,具體使用 xxl-job 還需根據自己項目的需求,遇到問題可以參考官網。

責任編輯:武曉燕 來源: 碼猿技術專欄
相關推薦

2025-05-26 09:31:23

2024-08-27 09:34:24

2025-06-03 08:20:00

Feign微服務

2025-06-27 09:31:25

2022-12-19 08:32:57

項目Feign框架

2022-03-26 17:13:22

ElasticJobxxl-job分布式

2022-12-13 08:29:13

項目插入式注解

2024-09-14 09:59:04

2020-07-17 09:33:39

CPU內存調度

2022-09-23 13:57:11

xxl-job任務調度中間件

2023-01-04 09:23:58

2024-09-09 08:11:12

2023-11-30 22:06:43

2024-07-31 08:18:40

2024-12-04 10:47:26

2021-12-26 00:03:27

響應式編程異步

2022-01-27 08:44:58

調度系統開源

2020-07-23 10:51:29

NginxWebApache

2021-12-26 19:07:51

MySQL存儲容器

2022-12-29 08:32:50

xxl-job緩存Schedule
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 综合久久久 | 北条麻妃一区二区三区在线视频 | 欧美一区在线看 | 国产精品一区二区av | 久久精品久久久久久 | 男人天堂国产 | 日日操视频 | 狠狠久久 | 欧产日产国产精品视频 | 我想看一级黄色毛片 | 91九色视频 | 国产一级片免费看 | 国产探花在线精品一区二区 | 国产精品久久久久久一区二区三区 | 狠狠的日 | 成人高潮片免费视频欧美 | 国产高清一区二区三区 | 99这里只有精品视频 | 免费在线观看91 | 欧美日韩一区二区三区视频 | 精品少妇v888av | 亚洲免费观看视频网站 | 久久伊人免费视频 | 国产成人免费视频网站高清观看视频 | 国产精品久久久久久52avav | 日日操操操 | 日韩一区二区三区四区五区六区 | 一级毛片在线看 | 最新中文在线视频 | 国产在线观看一区二区三区 | 午夜影院在线观看免费 | 亚洲综合二区 | 97免费视频在线观看 | 亚洲一区免费视频 | 国产精品美女在线观看 | 国产精品视频网 | www.久草.com | 欧美激情精品久久久久久 | 亚洲精品在线视频 | 色综合一区 | 中文字幕乱码亚洲精品一区 |