成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

SpringBatch高階應用:大數據批處理框架實戰指南

開發 前端
配置Job,Job是封裝整個批處理流程的實體。在 Spring Batch 中,Job只是Step實例的容器。它將邏輯上屬于一個流程的多個步驟組合在一起,并允許對所有步驟的全局屬性(如可重啟性)進行配置。

本篇文章主要內容:通過Spring Batch從一個庫中讀取數據進過處理后寫入到另外一個庫中。

1. 環境準備

1.1 引入依賴

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>

2.2 配置Job

配置Job啟動器

@Bean
JobLauncher userJobLauncher(JobRepository userJobRepository) {
  SimpleJobLauncher jobLauncher = new SimpleJobLauncher() ;
  jobLauncher.setJobRepository(userJobRepository) ;
  return jobLauncher ;
}

配置任務Repository存儲元信息

@Bean
JobRepository userJobRepository(DataSource dataSource, PlatformTransactionManager transactionManager) {
  JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean() ;
  factory.setDatabaseType("mysql") ;
  factory.setTransactionManager(transactionManager) ;
  factory.setDataSource(dataSource) ;
  try {
    factory.afterPropertiesSet() ; 
    return factory.getObject() ;
  } catch (Exception e) {
    throw new RuntimeException(e) ;
  }
}

配置ItemReader讀取器

@Bean
ItemReader<User> userReader(JobOperator jobOperator) throws Exception {
  JpaPagingItemReaderBuilder<User> builder = new JpaPagingItemReaderBuilder<>() ;
  builder.entityManagerFactory(entityManagerFactory) ;
  // 每次分頁查詢多少條數據
  builder.pageSize(10) ;
  builder.queryString("select u from User u where u.uid <= 50") ;
  builder.saveState(true) ;
  builder.name("userReader") ;
  return builder.build() ;
}

配置數據源,該數據源是用來寫入操作的

public DataSource dataSource() {
  HikariDataSource dataSource = new HikariDataSource() ;
  dataSource.setJdbcUrl("jdbc:mysql://localhost:3306/testjpa?serverTimezone=GMT%2B8&useSSL=false") ;
  dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver") ;
  dataSource.setUsername("root") ;
  dataSource.setPassword("xxxooo") ;
  return dataSource ;
}

配置ItemWriter用來寫入操作(當前庫的數據寫入到另外一個庫,上面的數據源)

@Bean
ItemWriter<User> userWriter() {
  // 通過JDBC批量處理
  JdbcBatchItemWriterBuilder<User> builder = new JdbcBatchItemWriterBuilder<>() ;
  DataSource dataSource = dataSource() ;
  builder.dataSource(dataSource) ;
  builder.sql("insert into st (id, name, sex, mobile, age, birthday) values (?, ?, ?, ?, ?, ?)") ;
  builder.itemPreparedStatementSetter(new ItemPreparedStatementSetter<User>() {
    @Override
    public void setValues(User item, PreparedStatement ps) throws SQLException {
      ps.setInt(1, item.getUid()) ;
      ps.setString(2, item.getName()) ;
      ps.setString(3, item.getSex()) ;
      ps.setString(4, item.getMobile()) ;
      ps.setInt(5, item.getAge()) ;
      ps.setObject(6, item.getBirthday()) ;
    }
  }) ;
  return builder.build() ;
}

配置ItemProcessor處理器,數據從當前庫讀取處理后經過處理后再寫入另外的庫中

@Bean
ItemProcessor<User, User> userProcessor() {
  return new ItemProcessor<User, User>() {
    @Override
    public User process(User item) throws Exception {
      System.out.printf("%s - 開始處理數據:%s%n", Thread.currentThread().getName(), item.toString()) ;
      // 模擬耗時操作
      TimeUnit.SECONDS.sleep(1) ;
      // 在這里你可以對數據進行相應的處理。
      return item ;
    }
  } ;
}

配置Step將ItemReader、ItemProcessor、ItemWriter串聯在一起。

@Bean
Step userStep1(ItemReader<User> userReader, ItemProcessor<User, User> userProcessor, ItemWriter<User> userWriter) {
  return steps.get("userStep1")
    .<User, User>chunk(5)
    .reader(userReader)
    .processor(userProcessor)
    .writer(userWriter)
    .build() ;
}

配置Job,Job是封裝整個批處理流程的實體。在 Spring Batch 中,Job只是Step實例的容器。它將邏輯上屬于一個流程的多個步驟組合在一起,并允許對所有步驟的全局屬性(如可重啟性)進行配置。作業配置包含:

  • 簡單的工作名稱。
  • Step實例的定義和排序。
  • Job是否可重新啟動。
@Bean
Job userJob(Step userStep1, Step userStep2) {
  return jobs.get("userJob").start(userStep1).build();
}

以上是Spring Batch定義配置一個Job所需的核心組件。接下來會以上面的基礎配置進行高階知識點進行介紹。

2. 高階配置管理

2.1 通過Controller接口啟動Job

@RequestMapping("/userJob")
public class UserJobController {
  @Resource
  private JobLauncher userJobLauncher ;
  @GetMapping("/start")
  public Object start() throws Exception {
    JobParameters jobParameters = new JobParameters() ;
    this.userJobLauncher.run(userJob, jobParameters) ;
    return "started" ;
  }
}

通過JobLauncher#run方法啟動Job。當你調用該接口時,你會發現接口一直不會返回,一直阻塞,下圖是Job的啟動序列

圖片圖片

根據上圖能知道,當你調用run方法后,會等待整個Job退出狀態為FINISHED或者FAILED后才能結束。所以,你需要異步完成,以便 SimpleJobLauncher 立即返回給調用者。而正確的序列應該是如下:

圖片圖片

上圖通過異步方式啟動Job序列。

2.2 異步啟動Job

@Bean
TaskExecutor taskExecutor() {
  ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor() ;
  taskExecutor.setThreadNamePrefix("spring_batch_launcher") ;
  taskExecutor.setCorePoolSize(10) ;
  taskExecutor.setMaxPoolSize(10) ;
  taskExecutor.initialize() ; 
  return taskExecutor ;
}
@Bean
JobLauncher userJobLauncher(JobRepository userJobRepository) {
  SimpleJobLauncher jobLauncher = new SimpleJobLauncher() ;
  jobLauncher.setJobRepository(userJobRepository) ;
  jobLauncher.setTaskExecutor(taskExecutor()) ;
  return jobLauncher ;
}

通過上面配置后,Job啟動將是異步的會直接返回JobExecution。

2.3 重啟Job

當一個Job正在執行,由于斷電或者強制終止了程序。當程序恢復后你希望能夠接著程序終止前的進度繼續執行,這時候你需要進行如下的操作(本人沒有發現有什么API能夠操作的,可能文檔沒看仔細)。

當程序非正常終止是,下面兩張表的狀態都是STARTED,END_TIME為null

batch_job_execution表

圖片圖片

batch_step_execution表

圖片圖片

想要重新啟動必須將上面的狀態修改為STOPPED,END_TIME字段設置上值(是什么值無所謂)。

然后我們就可以繼續使用上面的Controller接口啟動任務繼續執行了。

2.4 多線程執行Step

為了加快程序的執行,我們可以為Step配置線程池

@Bean
Step userStep1(ItemReader<User> userReader, ItemProcessor<User, User> userProcessor, ItemWriter<User> userWriter) {
  return steps.get("userStep1")
    .<User, User>chunk(5)
    .reader(userReader)
    .processor(userProcessor)
    .writer(userWriter)
    // 配置線程池
    .taskExecutor(taskExecutor())
    .build() ;
}

注意:Step中使用的任何池化資源(如數據源)都可能對并發性設置限制。請確保這些資源池至少與步驟中所需的并發線程數一樣大。

通過上面配置線程池后,你將在控制臺看到如下輸出。

圖片圖片

默認將有4個線程同時進行處理。可以通過如下配置進行調整

@Bean
Step userStep1(ItemReader<User> userReader, ItemProcessor<User, User> userProcessor, ItemWriter<User> userWriter) {
  return steps.get("userStep1")
      // ...
      // 節流限制10,這里配置的大小應該與你的數據庫連接池大小及使用的線程池核心線程數一致。
      .throttleLimit(10)
      .build() ;
}

2.5 重復啟動Job

要想重復啟動Job,我們可以在啟動Job時設置不同的JobParameters參數,只要參數不同那么就可以重復的啟動Job。如下示例:

@GetMapping("/start/{page}")
public Object start(@PathVariable("page") Long page) throws Exception {
  Map<String, JobParameter> parameters = new HashMap<>() ;
  // 每次設置的參數值不同即可。
  parameters.put("page", new JobParameter(page)) ;
  JobParameters jobParameters = new JobParameters(parameters) ;
  this.userJobLauncher.run(userJob, jobParameters) ;
  return "started" ;
}

以上是本篇文章的全部內容,希望對你有幫助。

責任編輯:武曉燕 來源: Spring全家桶實戰案例源碼
相關推薦

2024-12-27 14:45:59

2023-08-22 08:01:42

SpringBatch事務管理

2017-01-12 14:50:15

大數據Spring Batc框架

2012-02-20 09:49:42

ibmdw

2016-11-15 09:44:21

大數據批處理流處理

2022-08-02 20:47:38

Spring框架應用程序

2015-06-25 13:06:48

大數據從選擇到應用

2016-12-18 15:03:57

Python Scikit Lea數據

2020-10-26 07:05:02

大數據管道編排編排框架

2022-03-07 14:39:01

前端框架批處理

2016-12-20 16:07:13

Python數據預處理

2018-04-03 10:33:15

大數據

2022-03-01 08:40:34

StormHadoop批處理

2017-09-06 17:05:54

大數據處理流程處理框架

2019-05-29 10:42:06

大數據IT人工智能

2018-12-04 15:32:09

數據處理大數據數據分析

2018-11-05 15:15:38

大數據流式數據互聯網

2016-12-04 16:46:51

大數據架構機器學習

2017-09-18 17:59:23

Hadoop數據分析

2018-04-10 14:25:30

大數據高速公路數據存儲
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 久久久久无码国产精品一区 | 免费在线看a | 日本h片在线观看 | 伊人狠狠 | 粉嫩一区二区三区性色av | 日韩av一二三区 | 国产精品久久久久久久久久久免费看 | 欧美成视频 | 亚洲欧美日韩系列 | 成人国产精品久久 | 成人免费看片 | 日韩在线一区二区三区 | 欧美久久久久久久久 | 国产精品久久久久一区二区三区 | 久久一区二区三区四区 | 婷婷久久网 | 亚洲精品中文字幕在线观看 | 国产欧美一区二区三区在线看 | 欧美激情在线观看一区二区三区 | 乱一性一乱一交一视频a∨ 色爱av | 日韩美女一区二区三区在线观看 | 在线免费看黄 | 欧美片网站免费 | 亚洲国内精品 | 国产午夜亚洲精品不卡 | 日韩a视频 | 9191av| 久久伊人青青草 | 自拍偷拍欧美 | 亚洲美女一区 | 国产一在线 | 黑人性hd | 中文在线观看视频 | 国产精品久久久久久久免费观看 | 在线午夜 | 女同久久另类99精品国产 | 久久9999久久| 在线黄| 爱高潮www亚洲精品 中文字幕免费视频 | 男人天堂国产 | 日韩午夜 |