深入淺出Java/Spring/Spring Boot異步多線程
1、Java的多線程
1.1 線程池模式
一個(gè)線程池可以維護(hù)多個(gè)線程,這些線程等待任務(wù)來進(jìn)行并發(fā)處理。線程池模式避免了頻繁創(chuàng)建和銷毀短期任務(wù)線程,復(fù)用池中的線程從而提高了性能。線程池中的線程在處理任務(wù)時(shí)是并發(fā)進(jìn)行的。
線程池(綠色方塊)/等待處理任務(wù)隊(duì)列(藍(lán)色)/處理完成任務(wù)(黃色)
該模式允許創(chuàng)建的線程數(shù)量及其生命周期。 我們還能夠安排任務(wù)的執(zhí)行并將傳入的任務(wù)保持在隊(duì)列(Task Queue)中。
線程池?cái)?shù)量的大小可根據(jù)程序可用的計(jì)算資源進(jìn)行調(diào)整,它通常是程序的可調(diào)參數(shù),經(jīng)過調(diào)整以優(yōu)化程序的性能。 確定最佳線程池大小對(duì)于優(yōu)化性能至關(guān)重要。
1.2 Java的線程池ThreadPoolExecutor
Java中的ThreadPoolExecutor是一個(gè)可擴(kuò)展的線程池的實(shí)現(xiàn),它提供了對(duì)很多參數(shù)的微調(diào)設(shè)置。這些參數(shù)包括:
ThreadPoolExecutor的構(gòu)造器:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
//...
}
示例:
ThreadPoolExecutor executorPool = new ThreadPoolExecutor(5, 10, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(50));
corePoolSize:第1個(gè)參數(shù)“5”。JVM會(huì)為前5個(gè)任務(wù)創(chuàng)建線程,后續(xù)的任務(wù)會(huì)放進(jìn)隊(duì)列里,直到隊(duì)列滿為止(第5個(gè)參數(shù)workQueue,50個(gè)任務(wù))。
maximumPoolSize:第2個(gè)參數(shù)“10”。JVM最多創(chuàng)建10個(gè)線程,這意味著當(dāng)當(dāng)前正有5個(gè)線程運(yùn)行著5個(gè)任務(wù),而這時(shí)任務(wù)隊(duì)列的50個(gè)任務(wù)已滿,此時(shí)如果有一個(gè)新的任務(wù)到達(dá)隊(duì)列,JVM將創(chuàng)建一個(gè)新的線程,最多創(chuàng)建5個(gè),直至10個(gè)。
keepAliveTime:第3個(gè)參數(shù)“3秒”,超出核心線程而小于最大線程的這些線程,在一定的空閑時(shí)間之后將被清除掉。
unit:第4個(gè)參數(shù)“秒”,keepAliveTime的時(shí)間單位。
workQueue:第5個(gè)參數(shù)“new ArrayBlockingQueue<Runnable>(50)”,任務(wù)隊(duì)列的大小。
JVM創(chuàng)建線程的規(guī)則如下:
- 如果線程數(shù)少于corePoolSize,創(chuàng)建新的線程來跑任務(wù)。
- 如何線程數(shù)等于或大于corePoolSize,將任務(wù)放進(jìn)隊(duì)列。
- 如果隊(duì)列滿了,且線程數(shù)小于maximumPoolSize,創(chuàng)建新的線程跑任務(wù)。
- 如果隊(duì)列滿了,且線程數(shù)量大于或等于maximumPoolSize,拒絕任務(wù)。
2、Spring的多線程
Spring/Spring Boot只需要在配置類上注解“@EnableAsync”,在需要使用單獨(dú)線程的方法上使用“@Async”注解即可。Spring會(huì)自動(dòng)檢索線程池的定義,可以是“org.springframework.core.task.TaskExecutor”或者是“java.util.concurrent.Executor”的名為“taskExecutor”的bean。若都未找到,則使用“org.springframework.core.task.SimpleAsyncTaskExecutor”來處理異步方法的調(diào)用。
我們最簡單可以通過自定一個(gè)名為“taskExecutor”的Bean即可。
@Configuration
@EnableAsync
class AsyncConfigurationByBean {
@Bean(name="taskExecutor")
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(50);
executor.setThreadNamePrefix("poolThread-");
executor.initialize();
return executor;
}
}
Spring也提供了“AsyncConfigurer”接口用來定制實(shí)現(xiàn)異步多線程相關(guān)的配置。
@Configuration
@EnableAsync
class AsyncConfigurationByConfigurer implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(50);
executor.setThreadNamePrefix("poolThread-");
executor.initialize();
return executor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return AsyncConfigurer.super.getAsyncUncaughtExceptionHandler();
}
}
3、Spring Boot的多線程
在Spring Boot下,通過“TaskExecutionAutoConfiguration”自動(dòng)配置類,Spring Boot給我們已經(jīng)自動(dòng)配置好了線程池,“TaskExecutionProperties”提供了相關(guān)的屬性配置。在Spring Boot下我們只需要在配置類上“@EnableAsync”,在“application.yml”上配置即可:
spring:
task:
execution:
pool:
core-size: 5
max-size: 10
queue-capacity: 50
thread-name-prefix: poolThread-
4、多線程演示
新建一個(gè)使用異步方法的類和方法:
@Service
@Slf4j
class AsyncService {
@Async
public void doSomething(Integer i){
log.info("當(dāng)前是循環(huán):" + i);
}
}
在Spring Boot入口類調(diào)用:
@SpringBootApplication
@EnableAsync
public class SpringAsyncApplication {
public static void main(String[] args) {
SpringApplication.run(SpringAsyncApplication.class, args);
}
@Bean
CommandLineRunner commandLineRunner(AsyncService asyncService){
return p -> {
for(int i = 0 ; i < 10 ; i ++){
asyncService.doSomething(i);
}
};
}
}
運(yùn)行結(jié)果:
從圖中可以看出我們的核心線程數(shù)量是5,你可以按照上節(jié)的“JVM創(chuàng)建線程的規(guī)則”來調(diào)整核心線程、最大線程、隊(duì)列數(shù)量的來嘗試一下觀察控制臺(tái)輸出的結(jié)果。
5、多線程異步結(jié)果演示
我們有時(shí)候會(huì)在單一的請(qǐng)求中調(diào)用多個(gè)方法,在同步的方法里,我們都是順序執(zhí)行,執(zhí)行完一個(gè)再執(zhí)行下一個(gè)。我們可以通過Spring的“AsyncResult”讓多個(gè)方法并發(fā)執(zhí)行并聚合他們的結(jié)果,并提高性能。
我們先看一下,在同步的情況下是什么樣的:
@Service
class Service1 {
public Integer doSomething() throws InterruptedException {
Thread.sleep(2000);
log.info("在Service1中");
return 1;
}
}
@Service
class Service2 {
public Integer doSomething() throws InterruptedException {
Thread.sleep(2000);
log.info("在Service2中");
return 2 ;
}
}
@Bean
CommandLineRunner commandLineRunner(Service1 service1, Service2 service2){
return p -> {
long start = System.currentTimeMillis();
Integer first = service1.doSomething();
Integer second = service2.doSomething();
Integer sum = first + second;
long end = System.currentTimeMillis();
Long cost = end - start;
log.info("結(jié)果為:" + sum + ",耗時(shí)" + cost);
};
}
示例中,一個(gè)線程順序執(zhí)行兩個(gè)方法,執(zhí)行結(jié)果為:
在一個(gè)線程中
我們現(xiàn)在使用“@Async”使方法變成異步,且使用“AsyncResult”包裝異步結(jié)果返回。
“CompletableFuture”是Java8引入的,以提供一種編寫異步、非阻塞和多線程代碼的簡單方法。我們使用“AsyncResult”包裝返回值,并用它的“.completable()”方法獲得“CompletableFuture”對(duì)象。
@Service
@Slf4j
class Service1 {
@Async
public CompletableFuture<Integer> doSomething() throws InterruptedException {
Thread.sleep(2000);
log.info("在Service1中");
return new AsyncResult<Integer>(1).completable();
}
}
@Service
@Slf4j
class Service2 {
@Async
public CompletableFuture<Integer> doSomething() throws InterruptedException {
Thread.sleep(2000);
log.info("在Service2中");
return new AsyncResult<Integer>(2).completable();
}
}
@Bean
CommandLineRunner commandLineRunner(Service1 service1, Service2 service2){
return p -> {
long start = System.currentTimeMillis();
CompletableFuture<Integer> firstData = service1.doSomething();
CompletableFuture<Integer> secondData = service2.doSomething();
CompletableFuture<Integer> mergeResult = firstData.thenCompose(
firstValue -> secondData.thenApply(
secondValue -> firstValue + secondValue
)
);
long end = System.currentTimeMillis();
Long cost = end - start;
log.info("結(jié)果為:" + mergeResult.get() + ",耗時(shí)" + cost);
};
}
本文轉(zhuǎn)載自今日「愛科學(xué)的衛(wèi)斯理」,可以通過以下二維碼關(guān)注。轉(zhuǎn)載本文請(qǐng)聯(lián)系愛科學(xué)的衛(wèi)斯理。