Java 異步編程本應(yīng)更簡(jiǎn)單才對(duì)
在過(guò)去的好多年里,多線程和異步一直作為 Java 技術(shù)里的高級(jí)部分,在技術(shù)序列中,一個(gè)語(yǔ)言分為入門部分、進(jìn)階部分和高級(jí)部分,所以,異步是作為其中的高級(jí)技術(shù)部分存在的。
關(guān)于異步和多線程這部分吧,常常存在于面試題、八股文當(dāng)中,但是在大多數(shù)的項(xiàng)目代碼中你根本看不著它。神奇嗎,就是這么神奇。
糾其原因可能有兩個(gè):
- 本身大多數(shù)項(xiàng)目就很簡(jiǎn)單,根本就用不著多線程和異步,畢竟平庸屬于大多數(shù);
- Java 中關(guān)于多線程和異步的部分確實(shí)對(duì)于新手不太友好,涉及到的類且多且亂,而且不符合我們正常的思考方式;
我就見過(guò)很多同學(xué),多次想入門多線程和異步,但是多次被勸退,或者在大門口反復(fù)橫跳。
一旁的 Node.js 、Go 憋了一眼:哼,不就會(huì)異步嗎,有那么難嗎?
在 Java 中實(shí)現(xiàn)異步編程有什么方式呢?
異步回調(diào)函數(shù)
最開始寫前端的時(shí)候最常用這種回調(diào)函數(shù)的方法,在 JavaScript 中,函數(shù)是一等公民,用法非常靈活。但是在 Java 中,回調(diào)方式并不常用。
在異步調(diào)用結(jié)束或者發(fā)生異常的時(shí)候主動(dòng)的調(diào)用回調(diào)方法,以此來(lái)達(dá)到異步通知的目的。首先定義一個(gè)回調(diào)接口,如下:
public interface ICallBackService {
/**
* 回調(diào)方法
* @param args 參數(shù)
*/
void callback(String ...args) throws InterruptedException;
}
然后在你的異步方法中加一個(gè)回調(diào)參數(shù),參數(shù)類型就是上面的 ICallBackService接口類型。
public class Work {
/**
* 業(yè)務(wù)邏輯
* @param callBackService
*/
public void doWork(ICallBackService callBackService) throws InterruptedException {
System.out.println("開始回調(diào)");
callBackService.callback("第一個(gè)參數(shù)","第二個(gè)參數(shù)");
System.out.println("回調(diào)結(jié)束");
}
}
之后在調(diào)用端調(diào)用doWork方法執(zhí)行異步調(diào)用。
public static void main(String[] args) throws InterruptedException {
System.out.println("準(zhǔn)備發(fā)起異步調(diào)用");
Thread thread = new Thread(() -> {
Work work = new Work();
try {
work.doWork(new ICallBackService() {
@Override
public void callback(String... args) throws InterruptedException {
Thread.sleep(1000);
System.out.printf("正在執(zhí)行回調(diào)動(dòng)作:%s%n",args==null?"無(wú)參數(shù)":String.join(",", args));
}
});
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
thread.start();
System.out.println("繼續(xù)干其他事兒");
}
執(zhí)行后,打印的結(jié)果,主線程該干什么干什么,異步方法執(zhí)行后,主動(dòng)調(diào)用回調(diào)方法。
準(zhǔn)備發(fā)起異步調(diào)用 繼續(xù)干其他事兒 異步執(zhí)行 正在執(zhí)行回調(diào)動(dòng)作:第一個(gè)參數(shù),第二個(gè)參數(shù) 回調(diào)結(jié)束
回調(diào)這種機(jī)制有個(gè)最要命的問題,它會(huì)導(dǎo)致代碼邏輯的割裂,本來(lái)是一個(gè)從開始到結(jié)束的完整執(zhí)行過(guò)程,但是回調(diào)方法脫離了代碼主流程,導(dǎo)致我們看代碼的時(shí)候產(chǎn)生跳躍感。
CompletableFuture 異步
自從 Java 8 出現(xiàn) Future 之后,異步編程就變得簡(jiǎn)單多了,回調(diào)函數(shù)完全可以不用了。再遇到需要異步的場(chǎng)景時(shí),可以直接祭出 CompletableFuture,CompletableFuture 除了有最基礎(chǔ)的異步調(diào)用功能外,還支持異步任務(wù)鏈、組合任務(wù)等等。
異步編程最繁雜的地方就是流程控制,對(duì)于 NodeJS 那種天生就是異步的語(yǔ)言來(lái)說(shuō),有豐富的第三方框架,而對(duì)于 Java 來(lái)說(shuō),到現(xiàn)在都比較少。
在不借助第三方框架的情況下,CompletableFuture 應(yīng)該是最優(yōu)解了。
下面這段代碼展示了異步調(diào)用兩個(gè)任務(wù),然后將兩個(gè)任務(wù)的返回結(jié)果合并到一起,用到了 CompletableFuture 的組合任務(wù)功能。
public static void main(String[] args) throws ExecutionException, InterruptedException {
//異步發(fā)起第一個(gè)任務(wù)
CompletableFuture<String> firstTask = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "第一個(gè)任務(wù)的結(jié)果";
}
);
//異步發(fā)起第二個(gè)任務(wù)
CompletableFuture<String> secondTask = CompletableFuture.supplyAsync(() -> "第二個(gè)任務(wù)的結(jié)果");
// 合并兩個(gè)任務(wù)的結(jié)果
CompletableFuture<String> combineTask = firstTask.thenCombineAsync(secondTask, (firstResult, secondResult) -> {
return firstResult + "&&" + secondResult;
});
combineTask.thenAccept((result) -> {
System.out.println("最終結(jié)果:" + result);
});
System.out.println("其他任務(wù),該干嘛干嘛");
combineTask.join();
}
}
Reactor 響應(yīng)式編程
用過(guò) Spring Boot 的同學(xué)一定看到過(guò) webFlux 這個(gè)東西,其實(shí)它就是 Reactor 中的功能。Reactor 的核心包是 reactor-core ,專為異步編程而生,已經(jīng)是 Spring Boot 的內(nèi)置框架了。
Reactor 是一個(gè)完全非阻塞的JVM響應(yīng)式編程框架。響應(yīng)式編程是一種涉及數(shù)據(jù)流和變化傳播的異步編程范式。這意味著可以通過(guò)編程語(yǔ)言輕松地表示靜態(tài)(如數(shù)組)或動(dòng)態(tài)(如事件發(fā)射器)數(shù)據(jù)流。
事件發(fā)射器可以理解為事件驅(qū)動(dòng),如果做過(guò) GUI 或客戶端開發(fā)的肯定對(duì)事件驅(qū)動(dòng)非常熟悉,事件驅(qū)動(dòng)其實(shí)就是順著人的思考模式來(lái)的,進(jìn)行什么操作就觸發(fā)什么事件。
下面是用 Reactor 實(shí)現(xiàn)的一個(gè)簡(jiǎn)單異步任務(wù),其中subscribe 方法可以理解為一個(gè)事件訂閱器,在里面可以訂閱 onNext (也就是正常執(zhí)行)、onError (發(fā)生錯(cuò)誤是執(zhí)行)以及onComplete(執(zhí)行完成)等事件。每命中一個(gè)事件,就可以驅(qū)動(dòng)這個(gè)事件做一些事情。
就是以順序?qū)懘a的方式,實(shí)現(xiàn)異步的邏輯。
public static void main(String[] args) {
Mono<String> asyncTask = Mono.fromCallable(() -> {
// 模擬異步操作
Thread.sleep(1000);
// 返回結(jié)果
return "任務(wù)執(zhí)行成功";
});
// 訂閱事件
asyncTask.subscribe(
result -> {
// onNext 事件,處理任務(wù)成功的情況
System.out.println("任務(wù)成功,結(jié)果:" + result);
},
error -> {
// onError 事件,處理任務(wù)出錯(cuò)的情況
System.err.println("任務(wù)出錯(cuò):" + error.getMessage());
},
() -> {
// onComplete 事件,處理任務(wù)完成的情況
System.out.println("任務(wù)完成");
}
);
// 使用 block 方法等待異步任務(wù)完成
String result = asyncTask.block();
System.out.println("主線程等待結(jié)果:" + result);
}
執(zhí)行以上代碼前,需要引入 reactor-core 依賴包。
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.6.0</version>
</dependency>
與 Reactor 類似的還有 RxJava,在 Android 開發(fā)上用的最多。