Java新的結構化并行模式入門指南
譯文譯者 | 布加迪
審校 | 重樓
結構化并發是Java中使用多線程的一種新方式。它允許開發人員在充分利用傳統線程和虛擬線程的同時考慮邏輯組中的工作。結構化并發出現在Java 21的預覽版中,它是決定Java未來的一個關鍵方面,所以現在是開始使用它的好時機。
為什么我們需要結構化并發?
編寫并發軟件是軟件開發者面臨的最大挑戰之一。Java的線程模式使其成為并發語言中的有力競爭者,但是多線程一直天生很棘手。結構化并發允許您使用具有結構化編程語法的多線程。實質上,它提供了一種使用熟悉的程序流程和構件編寫并發軟件的方法。這讓開發者可以專注于手頭的事務,而不是線程編排。正如結構化并發性的JEP所說:“如果一個任務分成并發子任務,它們都回到相同的位置,即任務的代碼塊。”
虛擬線程現在是Java的一項正式特性,它可以低成本生成線程,從而獲得并發性能。結構化并發提供了這么做的簡單語法。因此,Java現在有了一個獨特的、高度優化的線程系統,而且易于理解。
新的StructuredTaskScope類
結構化并發中的主要類是java.util.concurrent.StructuredTaskScope。Java 21文檔包含如何使用結構化并發的示例。截止本文發稿時,您需要使用--enable-preview和--source 21或--source 22來啟用Java程序中的結構化并發。我的$java --version是openjdk 22-ea,所以我們使用Maven的示例將為編譯步驟指定--enable-preview --source 22,為執行步驟指定--enable-preview。注意,SDKMan對于管理多個JDK安裝是一個很好的選擇。
您可以在本文的GitHub代碼存儲庫中找到示例代碼。注意為執行設置—enable-preview的.mvn/jvm.config文件。若要運行代碼,使用$mvn clean compile exec:java。
具有結構化并發的多線程
就本文示例而言,我們將向Star Wars API(SWAPI)發出幾個請求,通過行星的ID獲取有關行星的信息。如果我們在標準的同步Java中執行此操作,可能會使用Apache HTTPClient執行類似代碼片段1的操作。
代碼片段1. 類似傳統方法的多API調用
package com.infoworld;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
public class App {
public String getPlanet(int planetId) throws Exception {
System.out.println("BEGIN getPlanet()");
String url = "https://swapi.dev/api/planets/" + planetId + "/";
String ret = "?";
CloseableHttpClient httpClient = HttpClients.createDefault();
HttpGet request = new HttpGet(url);
CloseableHttpResponse response = httpClient.execute(request);
// Check the response status code
if (response.getStatusLine().getStatusCode() != 200) {
System.err.println("Error fetching planet information for ID: " + planetId);
throw new RuntimeException("Error fetching planet information for ID: " + planetId);
} else {
// Parse the JSON response and extract planet information
ret = EntityUtils.toString(response.getEntity());
System.out.println("Got a Planet: " + ret);
}
// Close the HTTP response and client
response.close();
httpClient.close();
return ret;
}
void sync() throws Exception {
int[] planetIds = {1,2,3,4,5};
for (int planetId : planetIds) {
getPlanet(planetId);
}
}
public static void main(String[] args) {
var myApp = new App();
System.out.println("\n\r-- BEGIN Sync");
try {
myApp.sync();
} catch (Exception e){
System.err.println("Error: " + e);
}
}
}
在代碼片段1中,我們有一個調用sync()方法的主方法,該方法在調用“https://swapi.dev/api/planets/”+ planetId端點時,只是對一組ID進行迭代處理。這些調用通過getPlanet()方法發出,該方法使用Apache HTTP庫來處理樣板請求、響應和錯誤處理。實際上,該方法接收每個響應,如果正確(200),輸出到控制臺,否則拋出錯誤。(這些示例使用了最少的錯誤,所以在這種情況下我們只拋出RuntimeException。)
輸出是這樣的:
-- BEGIN Sync
BEGIN getPlanet()
Got a Planet: {"name":"Tatooine"}
BEGIN getPlanet()
Got a Planet: {"name":"Alderaan"}
BEGIN getPlanet()
Got a Planet: {"name":"Yavin”}
BEGIN getPlanet()
Got a Planet: {"name":"Hoth"}
BEGIN getPlanet()
Got a Planet: {"name":"Dagobah"}
現在不妨使用結構化并發嘗試同一個示例。如代碼片段2所示,結構化并發允許我們將調用分解成并發請求,并將所有內容放在相同的代碼空間中。在代碼片段2中,我們添加了必要的StructuredTaskScope導入,然后使用其核心方法fork()和join(),將每個請求分解成各自的線程,然后等待它們全部完成。
代碼片段2. 使用StructuredTaskScopeNow的多API調用
package com.infoworld;
import java.util.concurrent.*;
import java.util.concurrent.StructuredTaskScope.*;
//...
public class App {
public String getPlanet(int planetId) throws Exception {
// ... same ...
}
void sync() throws Exception {
int[] planetIds = {1,2,3,4,5};
for (int planetId : planetIds) {
getPlanet(planetId);
}
}
void sc() throws Exception {
int[] planetIds = {1,2,3,4,5};
try (var scope = new StructuredTaskScope<Object>()) {
for (int planetId : planetIds) {
scope.fork(() -> getPlanet(planetId));
}
scope.join();
}catch (Exception e){
System.out.println("Error: " + e);
}
}
public static void main(String[] args) {
var myApp = new App();
// ...
System.out.println("\n\r-- BEGIN Structured Concurrency");
try {
myApp.sc();
} catch (Exception e){
System.err.println("Error: " + e);
}
}
}
如果我們運行代碼片段2,將得到類似的輸出,但速度要快不少,這是由于請求是同時發出、并發進行的。不妨考慮sc()方法(使用多線程)與sync()方法(使用同步代碼)之間的區別。結構化并發方法沒有想象的那么難,提供結果的速度卻快得多。
處理任務和子任務
默認情況下,StructuredTaskScope被創建時,它使用虛擬線程,所以我們實際上并沒有在這里配置操作系統線程;相反,我們告訴JVM以最有效的方式編排請求。(StructuredTaskScope的構造函數也接受ThreadFactory。)
在代碼片段2中,我們在try-with-resource塊中創建StructuredTaskScope對象,這是它原本的使用方式。我們可以使用fork()創建任意數量的作業。fork()方法接受任何實現Callable的程序,也就是說,任何方法或函數。這里,我們將getPlanet()方法包裝在一個匿名函數中:()-> getPlanet(planetId)——這是一種向目標函數傳遞參數的實用語法。
當我們調用join()時,我們告訴作用域等待所有被分叉的作業。實質上,join()將我們帶回到同步模式。分叉的作業將按照TaskScope的配置進行處理。
關閉任務作用域
由于我們在try-with-resource塊中創建了TaskScope,因此當該塊結束時,作用域將自動關閉。這為作用域調用shutdown()進程,作用域可以定制,以便根據需要來處理運行中線程的處置。如果需要在作用域關閉之前關閉它,也可以手動調用shutdown()方法。
StructuredTaskScope包括兩個實現內置關閉策略的類:ShutDownOnSuccess和ShutDownOnFailure。這些類監視成功或出錯的子任務,然后取消其余運行中的線程。使用目前的設置,我們可以這樣使用這些類:
代碼片段3. 內置關閉策略
void failFast() throws ExecutionException, InterruptedException {
int[] planetIds = {1,2,3,-1,4};
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
for (int planetId : planetIds) {
scope.fork(() -> getPlanet(planetId));
}
scope.join();
}
}
void succeedFast() throws ExecutionException, InterruptedException {
int[] planetIds = {1,2};
try (var scope = new StructuredTaskScope.ShutdownOnSuccess()) {
for (int planetId : planetIds) {
scope.fork(() -> getPlanet(planetId));
}
scope.join();
} catch (Exception e){
System.out.println("Error: " + e);
}
}
public static void main(String[] args) {
var myApp = new App();
System.out.println("\n\r-- BEGIN succeedFast");
try {
myApp. succeedFast();
} catch (Exception e) {
System.out.println(e.getMessage());
}
System.out.println("\n\r-- BEGIN failFast");
try {
myApp.failFast();
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
這些策略將給出類似以下的輸出:
-- BEGIN succeedFast
BEGIN getPlanet()
BEGIN getPlanet()
Got a Planet: {"name":"Alderaan"}
org.apache.http.impl.execchain.RetryExec execute
INFO: I/O exception (java.net.SocketException) caught when processing request to {s}->https://swapi.dev:443: Closed by interrupt
-- BEGIN failFast
BEGIN getPlanet()
BEGIN getPlanet()
BEGIN getPlanet()
BEGIN getPlanet()
BEGIN getPlanet()
Got a Planet: {"name":"Hoth"}
Got a Planet: {"name":"Tatooine"}
Error fetching planet information for ID: -1
org.apache.http.impl.execchain.RetryExec execute
INFO: I/O exception (java.net.SocketException) caught when processing request to {s}->https://swapi.dev:443: Closed by interrupt
因此,我們擁有的是一種簡單的機制,可以并發啟動所有請求,然后在一個請求成功或失敗時取消其余的請求。這里,可以進行任何定制。結構化并發文檔包括一個示例,在子任務成功或失敗時收集子任務結果,然后返回結果。這很容易完成,只需通過覆蓋join()方法,并觀察每個任務的結果。
StructuredTaskScope.Subtask
在我們的示例中沒有看到的一件事是觀察子任務的返回值。每次StructuredTaskScope.fork()被調用時,就返回StructuredTaskScope.SubTask對象。我們可以利用它來觀察任務的狀態。比如在sc()方法中,我們可以這么做:
代碼片段4. 使用StructuredTaskScope.Subtask觀察狀態
import java.util.concurrent.StructuredTaskScope.Subtask;
import java.util.ArrayList;
void sc() throws Exception {
int[] planetIds = {1,2,3,4,5};
ArrayList<Subtask> tasks = new ArrayList<Subtask>(planetIds.length);
try (var scope = new StructuredTaskScope<Object>()) {
for (int planetId : planetIds) {
tasks.add(scope.fork(() -> getPlanet(planetId)));
}
scope.join();
}catch (Exception e){
System.out.println("Error: " + e);
}
for (Subtask t : tasks){
System.out.println("Task: " + t.state());
}
}
在這個示例中,我們將每個任務保存在ArrayList中,然后在進行join()操作之后輸出它們的狀態。注意,Subtask的可用狀態被定義為enum。這個新方法將輸出類似以下的內容:
-- BEGIN Structured Concurrency
BEGIN getPlanet()
BEGIN getPlanet()
BEGIN getPlanet()
BEGIN getPlanet()
BEGIN getPlanet()
Got a Planet: {"name":"Dagobah"}
Got a Planet: {"name":"Hoth"}
Got a Planet: {"name":"Tatooine"}
Got a Planet: {"name":"Yavin IV"}
Got a Planet: {"name":"Alderaan"}
Task: SUCCESS
Task: SUCCESS
Task: SUCCESS
Task: SUCCESS
Task: SUCCESS
結論
在虛擬線程和結構化并發之間,Java開發者擁有一種引人注目的新機制,可以將幾乎所有代碼分解成并發任務,不會有太大的開銷。上下文和需求很重要,所以不要僅僅因為存在這些新的并發工具就使用它們。與此同時,這種組合確實提供了一些強大的力量。一旦您遇到出現許多任務的瓶頸時,您可以輕松地將它們全部交給虛擬線程引擎,該引擎將找到編排它們的最佳方法。具有結構化并發的新線程模式還使您易于定制和微調這種行為。
至于開發者將來如何在我們的應用程序、框架和服務器中使用這些新的并發功能,值得我們拭目以待。
小知識:結構化并發中的線程樹
結構化并發包括對調試和理解線程之間關系的支持。特別是,結構化并發將所有線程關聯到樹結構中,作用域位于根。這樣一來,查看線程之間的關系就變得很簡單,即便使用嵌套作用域也是如此。說明文檔提供了一個好的示例,表明如何使用Java診斷命令(jcmd)實用程序,將線程的運行時布局轉儲到控制臺。
原文標題:Get started with Java's new structured concurrency model,作者:Matthew Tyson