SpringBatch高階應用:大數據批處理框架實戰指南
本篇文章主要內容:通過Spring Batch從一個庫中讀取數據進過處理后寫入到另外一個庫中。
1. 環境準備
1.1 引入依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
2.2 配置Job
配置Job啟動器
@Bean
JobLauncher userJobLauncher(JobRepository userJobRepository) {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher() ;
jobLauncher.setJobRepository(userJobRepository) ;
return jobLauncher ;
}
配置任務Repository存儲元信息
@Bean
JobRepository userJobRepository(DataSource dataSource, PlatformTransactionManager transactionManager) {
JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean() ;
factory.setDatabaseType("mysql") ;
factory.setTransactionManager(transactionManager) ;
factory.setDataSource(dataSource) ;
try {
factory.afterPropertiesSet() ;
return factory.getObject() ;
} catch (Exception e) {
throw new RuntimeException(e) ;
}
}
配置ItemReader讀取器
@Bean
ItemReader<User> userReader(JobOperator jobOperator) throws Exception {
JpaPagingItemReaderBuilder<User> builder = new JpaPagingItemReaderBuilder<>() ;
builder.entityManagerFactory(entityManagerFactory) ;
// 每次分頁查詢多少條數據
builder.pageSize(10) ;
builder.queryString("select u from User u where u.uid <= 50") ;
builder.saveState(true) ;
builder.name("userReader") ;
return builder.build() ;
}
配置數據源,該數據源是用來寫入操作的
public DataSource dataSource() {
HikariDataSource dataSource = new HikariDataSource() ;
dataSource.setJdbcUrl("jdbc:mysql://localhost:3306/testjpa?serverTimezone=GMT%2B8&useSSL=false") ;
dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver") ;
dataSource.setUsername("root") ;
dataSource.setPassword("xxxooo") ;
return dataSource ;
}
配置ItemWriter用來寫入操作(當前庫的數據寫入到另外一個庫,上面的數據源)
@Bean
ItemWriter<User> userWriter() {
// 通過JDBC批量處理
JdbcBatchItemWriterBuilder<User> builder = new JdbcBatchItemWriterBuilder<>() ;
DataSource dataSource = dataSource() ;
builder.dataSource(dataSource) ;
builder.sql("insert into st (id, name, sex, mobile, age, birthday) values (?, ?, ?, ?, ?, ?)") ;
builder.itemPreparedStatementSetter(new ItemPreparedStatementSetter<User>() {
@Override
public void setValues(User item, PreparedStatement ps) throws SQLException {
ps.setInt(1, item.getUid()) ;
ps.setString(2, item.getName()) ;
ps.setString(3, item.getSex()) ;
ps.setString(4, item.getMobile()) ;
ps.setInt(5, item.getAge()) ;
ps.setObject(6, item.getBirthday()) ;
}
}) ;
return builder.build() ;
}
配置ItemProcessor處理器,數據從當前庫讀取處理后經過處理后再寫入另外的庫中
@Bean
ItemProcessor<User, User> userProcessor() {
return new ItemProcessor<User, User>() {
@Override
public User process(User item) throws Exception {
System.out.printf("%s - 開始處理數據:%s%n", Thread.currentThread().getName(), item.toString()) ;
// 模擬耗時操作
TimeUnit.SECONDS.sleep(1) ;
// 在這里你可以對數據進行相應的處理。
return item ;
}
} ;
}
配置Step將ItemReader、ItemProcessor、ItemWriter串聯在一起。
@Bean
Step userStep1(ItemReader<User> userReader, ItemProcessor<User, User> userProcessor, ItemWriter<User> userWriter) {
return steps.get("userStep1")
.<User, User>chunk(5)
.reader(userReader)
.processor(userProcessor)
.writer(userWriter)
.build() ;
}
配置Job,Job是封裝整個批處理流程的實體。在 Spring Batch 中,Job只是Step實例的容器。它將邏輯上屬于一個流程的多個步驟組合在一起,并允許對所有步驟的全局屬性(如可重啟性)進行配置。作業配置包含:
- 簡單的工作名稱。
- Step實例的定義和排序。
- Job是否可重新啟動。
@Bean
Job userJob(Step userStep1, Step userStep2) {
return jobs.get("userJob").start(userStep1).build();
}
以上是Spring Batch定義配置一個Job所需的核心組件。接下來會以上面的基礎配置進行高階知識點進行介紹。
2. 高階配置管理
2.1 通過Controller接口啟動Job
@RequestMapping("/userJob")
public class UserJobController {
@Resource
private JobLauncher userJobLauncher ;
@GetMapping("/start")
public Object start() throws Exception {
JobParameters jobParameters = new JobParameters() ;
this.userJobLauncher.run(userJob, jobParameters) ;
return "started" ;
}
}
通過JobLauncher#run方法啟動Job。當你調用該接口時,你會發現接口一直不會返回,一直阻塞,下圖是Job的啟動序列
圖片
根據上圖能知道,當你調用run方法后,會等待整個Job退出狀態為FINISHED或者FAILED后才能結束。所以,你需要異步完成,以便 SimpleJobLauncher 立即返回給調用者。而正確的序列應該是如下:
圖片
上圖通過異步方式啟動Job序列。
2.2 異步啟動Job
@Bean
TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor() ;
taskExecutor.setThreadNamePrefix("spring_batch_launcher") ;
taskExecutor.setCorePoolSize(10) ;
taskExecutor.setMaxPoolSize(10) ;
taskExecutor.initialize() ;
return taskExecutor ;
}
@Bean
JobLauncher userJobLauncher(JobRepository userJobRepository) {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher() ;
jobLauncher.setJobRepository(userJobRepository) ;
jobLauncher.setTaskExecutor(taskExecutor()) ;
return jobLauncher ;
}
通過上面配置后,Job啟動將是異步的會直接返回JobExecution。
2.3 重啟Job
當一個Job正在執行,由于斷電或者強制終止了程序。當程序恢復后你希望能夠接著程序終止前的進度繼續執行,這時候你需要進行如下的操作(本人沒有發現有什么API能夠操作的,可能文檔沒看仔細)。
當程序非正常終止是,下面兩張表的狀態都是STARTED,END_TIME為null
batch_job_execution表
圖片
batch_step_execution表
圖片
想要重新啟動必須將上面的狀態修改為STOPPED,END_TIME字段設置上值(是什么值無所謂)。
然后我們就可以繼續使用上面的Controller接口啟動任務繼續執行了。
2.4 多線程執行Step
為了加快程序的執行,我們可以為Step配置線程池
@Bean
Step userStep1(ItemReader<User> userReader, ItemProcessor<User, User> userProcessor, ItemWriter<User> userWriter) {
return steps.get("userStep1")
.<User, User>chunk(5)
.reader(userReader)
.processor(userProcessor)
.writer(userWriter)
// 配置線程池
.taskExecutor(taskExecutor())
.build() ;
}
注意:Step中使用的任何池化資源(如數據源)都可能對并發性設置限制。請確保這些資源池至少與步驟中所需的并發線程數一樣大。
通過上面配置線程池后,你將在控制臺看到如下輸出。
圖片
默認將有4個線程同時進行處理。可以通過如下配置進行調整
@Bean
Step userStep1(ItemReader<User> userReader, ItemProcessor<User, User> userProcessor, ItemWriter<User> userWriter) {
return steps.get("userStep1")
// ...
// 節流限制10,這里配置的大小應該與你的數據庫連接池大小及使用的線程池核心線程數一致。
.throttleLimit(10)
.build() ;
}
2.5 重復啟動Job
要想重復啟動Job,我們可以在啟動Job時設置不同的JobParameters參數,只要參數不同那么就可以重復的啟動Job。如下示例:
@GetMapping("/start/{page}")
public Object start(@PathVariable("page") Long page) throws Exception {
Map<String, JobParameter> parameters = new HashMap<>() ;
// 每次設置的參數值不同即可。
parameters.put("page", new JobParameter(page)) ;
JobParameters jobParameters = new JobParameters(parameters) ;
this.userJobLauncher.run(userJob, jobParameters) ;
return "started" ;
}
以上是本篇文章的全部內容,希望對你有幫助。