使用Spring Boot + Quartz 實現分布式定時任務平臺
本文將從項目實戰出發來介紹分布式定時任務的實現。在某些應用場景下要求任務必須具備高可用性和可擴展性,單臺服務器不能滿足業務需求,這時就需要使用Quartz實現分布式定時任務。
一、分布式任務應用場景
定時任務系統在應用平臺中的重要性不言而喻,特別是互聯網電商、金融等行業更是離不開定時任務。在任務數量不多、執行頻率不高時,單臺服務器完全能夠滿足。
但是隨著業務逐漸增加,定時任務系統必須具備高可用和水平擴展的能力,單臺服務器已經不能滿足需求。因此需要把定時任務系統部署到集群中,實現分布式定時任務系統集群。
Quartz的集群功能通過故障轉移和負載平衡功能為調度程序帶來高可用性和可擴展性。
Quartz是通過數據庫表來存儲和共享任務信息的。獨立的Quartz節點并不與另一個節點或者管理節點通信,而是通過數據庫鎖機制來調度執行定時任務。
需要注意的是,在集群環境下,時鐘必須同步,否則執行時間不一致。
二、Quartz實現分布式定時任務
1. 添加Quartz依賴
首先,引入Quartz中提供分布式處理的JAR包以及數據庫和連接相關的依賴。示例代碼如下:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-quartz</artifactId>
</dependency>
<!-- mysql -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<!-- orm -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
在上面的示例中,除了添加Quartz依賴外,還需要添加mysql-connector-java和
spring-boot-starter-data-jpa兩個組件,這兩個組件主要用于JOB持久化到MySQL數據庫。
2. 初始化Quartz數據庫
分布式Quartz定時任務的配置信息存儲在數據庫中,數據庫初始化腳本可以在官方網站中查找,默認保存在quartz-2.2.3-distribution\src\org\quartz\impl\jdbcjobstore\tables-mysql.sql目錄下。首先創建quartz_jobs數據庫,然后在數據庫中執行tables-mysql.sql初始化腳本。具體示例如下:
DROP TABLE IF EXISTS QRTZ_FIRED_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_PAUSED_TRIGGER_GRPS;
DROP TABLE IF EXISTS QRTZ_SCHEDULER_STATE;
DROP TABLE IF EXISTS QRTZ_LOCKS;
DROP TABLE IF EXISTS QRTZ_SIMPLE_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_SIMPROP_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_CRON_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_BLOB_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_JOB_DETAILS;
DROP TABLE IF EXISTS QRTZ_CALENDARS;
CREATE TABLE QRTZ_JOB_DETAILS
(
SCHED_NAME VARCHAR(120) NOT NULL,
JOB_NAME VARCHAR(200) NOT NULL,
JOB_GROUP VARCHAR(200) NOT NULL,
DESCRIPTION VARCHAR(250) NULL,
JOB_CLASS_NAME VARCHAR(250) NOT NULL,
IS_DURABLE VARCHAR(1) NOT NULL,
IS_NONCONCURRENT VARCHAR(1) NOT NULL,
IS_UPDATE_DATA VARCHAR(1) NOT NULL,
REQUESTS_RECOVERY VARCHAR(1) NOT NULL,
JOB_DATA BLOB NULL,
PRIMARY KEY (SCHED_NAME,JOB_NAME,JOB_GROUP)
);
CREATE TABLE QRTZ_TRIGGERS
(
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_NAME VARCHAR(200) NOT NULL,
TRIGGER_GROUP VARCHAR(200) NOT NULL,
JOB_NAME VARCHAR(200) NOT NULL,
JOB_GROUP VARCHAR(200) NOT NULL,
DESCRIPTION VARCHAR(250) NULL,
NEXT_FIRE_TIME BIGINT(13) NULL,
PREV_FIRE_TIME BIGINT(13) NULL,
PRIORITY INTEGER NULL,
TRIGGER_STATE VARCHAR(16) NOT NULL,
TRIGGER_TYPE VARCHAR(8) NOT NULL,
START_TIME BIGINT(13) NOT NULL,
END_TIME BIGINT(13) NULL,
CALENDAR_NAME VARCHAR(200) NULL,
MISFIRE_INSTR SMALLINT(2) NULL,
JOB_DATA BLOB NULL,
PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
FOREIGN KEY (SCHED_NAME,JOB_NAME,JOB_GROUP)
REFERENCES QRTZ_JOB_DETAILS(SCHED_NAME,JOB_NAME,JOB_GROUP)
);
CREATE TABLE QRTZ_SIMPLE_TRIGGERS
(
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_NAME VARCHAR(200) NOT NULL,
TRIGGER_GROUP VARCHAR(200) NOT NULL,
REPEAT_COUNT BIGINT(7) NOT NULL,
REPEAT_INTERVAL BIGINT(12) NOT NULL,
TIMES_TRIGGERED BIGINT(10) NOT NULL,
PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
);
CREATE TABLE QRTZ_CRON_TRIGGERS
(
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_NAME VARCHAR(200) NOT NULL,
TRIGGER_GROUP VARCHAR(200) NOT NULL,
CRON_EXPRESSION VARCHAR(200) NOT NULL,
TIME_ZONE_ID VARCHAR(80),
PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
);
CREATE TABLE QRTZ_SIMPROP_TRIGGERS
(
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_NAME VARCHAR(200) NOT NULL,
TRIGGER_GROUP VARCHAR(200) NOT NULL,
STR_PROP_1 VARCHAR(512) NULL,
STR_PROP_2 VARCHAR(512) NULL,
STR_PROP_3 VARCHAR(512) NULL,
INT_PROP_1 INT NULL,
INT_PROP_2 INT NULL,
LONG_PROP_1 BIGINT NULL,
LONG_PROP_2 BIGINT NULL,
DEC_PROP_1 NUMERIC(13,4) NULL,
DEC_PROP_2 NUMERIC(13,4) NULL,
BOOL_PROP_1 VARCHAR(1) NULL,
BOOL_PROP_2 VARCHAR(1) NULL,
PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
);
CREATE TABLE QRTZ_BLOB_TRIGGERS
(
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_NAME VARCHAR(200) NOT NULL,
TRIGGER_GROUP VARCHAR(200) NOT NULL,
BLOB_DATA BLOB NULL,
PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
);
CREATE TABLE QRTZ_CALENDARS
(
SCHED_NAME VARCHAR(120) NOT NULL,
CALENDAR_NAME VARCHAR(200) NOT NULL,
CALENDAR BLOB NOT NULL,
PRIMARY KEY (SCHED_NAME,CALENDAR_NAME)
);
CREATE TABLE QRTZ_PAUSED_TRIGGER_GRPS
(
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_GROUP VARCHAR(200) NOT NULL,
PRIMARY KEY (SCHED_NAME,TRIGGER_GROUP)
);
CREATE TABLE QRTZ_FIRED_TRIGGERS
(
SCHED_NAME VARCHAR(120) NOT NULL,
ENTRY_ID VARCHAR(95) NOT NULL,
TRIGGER_NAME VARCHAR(200) NOT NULL,
TRIGGER_GROUP VARCHAR(200) NOT NULL,
INSTANCE_NAME VARCHAR(200) NOT NULL,
FIRED_TIME BIGINT(13) NOT NULL,
SCHED_TIME BIGINT(13) NOT NULL,
PRIORITY INTEGER NOT NULL,
STATE VARCHAR(16) NOT NULL,
JOB_NAME VARCHAR(200) NULL,
JOB_GROUP VARCHAR(200) NULL,
IS_NONCONCURRENT VARCHAR(1) NULL,
REQUESTS_RECOVERY VARCHAR(1) NULL,
PRIMARY KEY (SCHED_NAME,ENTRY_ID)
);
CREATE TABLE QRTZ_SCHEDULER_STATE
(
SCHED_NAME VARCHAR(120) NOT NULL,
INSTANCE_NAME VARCHAR(200) NOT NULL,
LAST_CHECKIN_TIME BIGINT(13) NOT NULL,
CHECKIN_INTERVAL BIGINT(13) NOT NULL,
PRIMARY KEY (SCHED_NAME,INSTANCE_NAME)
);
CREATE TABLE QRTZ_LOCKS
(
SCHED_NAME VARCHAR(120) NOT NULL,
LOCK_NAME VARCHAR(40) NOT NULL,
PRIMARY KEY (SCHED_NAME,LOCK_NAME)
);
使用tables-mysql.sql創建表的語句執行完成后,說明Quartz的數據庫和表創建成功,我們查看數據庫的ER圖,如下圖所示。
3. 配置數據庫和Quartz
修改application.properties配置文件,配置數據庫與Quartz。具體操作如下:
# server.port=8090
# Quartz 數據庫
spring.datasource.url=jdbc:mysql://localhost:3306/quartz_jobs?useSSL=false&serverTimezone=UTC
spring.datasource.username=root
spring.datasource.password=root
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.max-active=1000
spring.datasource.max-idle=20
spring.datasource.min-idle=5
spring.datasource.initial-size=10
# 是否使用properties作為數據存儲
org.quartz.jobStore.useProperties=false
# 數據庫中表的命名前綴
org.quartz.jobStore.tablePrefix=QRTZ_
# 是否是一個集群,是不是分布式的任務
org.quartz.jobStore.isClustered=true
# 集群檢查周期,單位為毫秒,可以自定義縮短時間。當某一個節點宕機的時候,其他節點等待多久后開始執行任務
org.quartz.jobStore.clusterCheckinInterval=5000
# 單位為毫秒,集群中的節點退出后,再次檢查進入的時間間隔
org.quartz.jobStore.misfireThreshold=60000
# 事務隔離級別
org.quartz.jobStore.txIsolationLevelReadCommitted=true
# 存儲的事務管理類型
org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX
# 使用的Delegate類型
org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate
# 集群的命名,一個集群要有相同的命名
org.quartz.scheduler.instanceName=ClusterQuartz
# 節點的命名,可以自定義。AUTO代表自動生成
org.quartz.scheduler.instanceId=AUTO
# rmi遠程協議是否發布
org.quartz.scheduler.rmi.export=false
# rmi遠程協議代理是否創建
org.quartz.scheduler.rmi.proxy=false
# 是否使用用戶控制的事務環境觸發執行任務
org.quartz.scheduler.wrapJobExecutionInUserTransaction=false
上面的配置主要是Quartz數據庫和Quartz分布式集群相關的屬性配置。分布式定時任務的配置存儲在數據庫中,所以需要配置數據庫連接和Quartz配置信息,為Quartz提供數據庫配置信息,如數據庫、數據表的前綴之類。
4. 定義定時任務
后臺定時任務與普通Quartz任務并無差異,只是增加了@
PersistJobDataAfterExecution注解和@DisallowConcurrentExecution注解。創建QuartzJob定時任務類并實現Quartz定時任務的具體示例代碼如下:
// 持久化
@PersistJobDataAfterExecution
// 禁止并發執行
@DisallowConcurrentExecution
public class QuartzJob extends QuartzJobBean {
private static final Logger log = LoggerFactory.getLogger(QuartzJob.class);
@Override
protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
String taskName = context.getJobDetail().getJobDataMap().getString("name");
log.info("---> Quartz job, time:{"+new Date()+"} ,name:{"+taskName+"}<----");
}
}
在上面的示例中,創建了QuartzJob定時任務類,使用@
PersistJobDataAfterExecution注解持久化任務信息。DisallowConcurrentExecution禁止并發執行,避免同一個任務被多次并發執行。
5. SchedulerConfig配置
創建SchedulerConfig配置類,初始化Quartz分布式集群相關配置,包括集群設置、數據庫等。示例代碼如下:
@Configuration
public class SchedulerConfig {
@Autowired
private DataSource dataSource;
/**
* 調度器
*
* @return
* @throws Exception
*/
@Bean
public Scheduler scheduler() throws Exception {
Scheduler scheduler = schedulerFactoryBean().getScheduler();
return scheduler;
}
/**
* Scheduler工廠類
*
* @return
* @throws IOException
*/
@Bean
public SchedulerFactoryBean schedulerFactoryBean() throws IOException {
SchedulerFactoryBean factory = new SchedulerFactoryBean();
factory.setSchedulerName("Cluster_Scheduler");
factory.setDataSource(dataSource);
factory.setApplicationContextSchedulerContextKey("applicationContext");
factory.setTaskExecutor(schedulerThreadPool());
//factory.setQuartzProperties(quartzProperties());
factory.setStartupDelay(10);// 延遲10s執行
return factory;
}
/**
* 配置Schedule線程池
*
* @return
*/
@Bean
public Executor schedulerThreadPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors());
executor.setQueueCapacity(Runtime.getRuntime().availableProcessors());
return executor;
}
}
在上面的示例中,主要是配置Schedule線程池、配置Quartz數據庫、創建Schedule調度器實例等初始化配置。
6. 觸發定時任務
配置完成之后,還需要觸發定時任務,創建JobStartupRunner類以便在系統啟動時觸發所有定時任務。示例代碼如下:
@Component
public class JobStartupRunner implements CommandLineRunner {
@Autowired
SchedulerConfig schedulerConfig;
private static String TRIGGER_GROUP_NAME = "test_trigger";
private static String JOB_GROUP_NAME = "test_job";
@Override
public void run(String... args) throws Exception {
Scheduler scheduler;
try {
scheduler = schedulerConfig.scheduler();
TriggerKey triggerKey = TriggerKey.triggerKey("trigger1", TRIGGER_GROUP_NAME);
CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
if (null == trigger) {
Class clazz = QuartzJob.class;
JobDetail jobDetail = JobBuilder.newJob(clazz).withIdentity("job1", JOB_GROUP_NAME).usingJobData("name","weiz QuartzJob").build();
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule("0/10 * * * * ?");
trigger = TriggerBuilder.newTrigger().withIdentity("trigger1", TRIGGER_GROUP_NAME)
.withSchedule(scheduleBuilder).build();
scheduler.scheduleJob(jobDetail, trigger);
System.out.println("Quartz 創建了job:...:" + jobDetail.getKey());
} else {
System.out.println("job已存在:{}" + trigger.getKey());
}
TriggerKey triggerKey2 = TriggerKey.triggerKey("trigger2", TRIGGER_GROUP_NAME);
CronTrigger trigger2 = (CronTrigger) scheduler.getTrigger(triggerKey2);
if (null == trigger2) {
Class clazz = QuartzJob2.class;
JobDetail jobDetail2 = JobBuilder.newJob(clazz).withIdentity("job2", JOB_GROUP_NAME).usingJobData("name","weiz QuartzJob2").build();
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule("0/10 * * * * ?");
trigger2 = TriggerBuilder.newTrigger().withIdentity("trigger2", TRIGGER_GROUP_NAME)
.withSchedule(scheduleBuilder).build();
scheduler.scheduleJob(jobDetail2, trigger2);
System.out.println("Quartz 創建了job:...:{}" + jobDetail2.getKey());
} else {
System.out.println("job已存在:{}" + trigger2.getKey());
}
scheduler.start();
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
}
在上面的示例中,為了適應分布式集群,我們在系統啟動時觸發定時任務,判斷任務是否已經創建、是否正在執行。如果集群中的其他示例已經創建了任務,則啟動時無須觸發任務。
三、 驗證測試
配置完成之后,接下來啟動任務,測試分布式任務配置是否成功。啟動一個實例,可以看到定時任務執行了,然后每10秒鐘打印輸出一次,如下圖所示。
接下來,模擬分布式部署的情況。我們再啟動一個測試程序實例,這樣就有兩個后臺定時任務實例。
實例1:
實例2:
從上面的日志中可以看到,Quartz Job和Quartz Job2交替地在兩個任務實例進程中執行,同一時刻同一個任務只有一個進程在執行,這說明已經達到了分布式后臺定時任務的效果。
接下來,停止任務實例1,測試任務實例2是否會接管所有任務繼續執行。如圖10-11所示,停止任務實例1后,任務實例2接管了所有的定時任務。這樣如果集群中的某個實例異常了,其他實例能夠接管所有的定時任務,確保任務集群的穩定運行。
最后
以上,我們就把Spring Boot集成Quartz實現分布式定時任務的功能介紹完了。分布式定時任務在應用開發中非常重要的功能模塊,希望大家能夠熟練掌握。