我靠(call) ,我的未來(Future)在哪里???
本篇文章是讀者投稿,來和你一起聊一聊 Future ~
我們大家都知道,在 Java 中創建線程主要有三種方式:
- 繼承 Thread 類;
- 實現 Runnable 接口;
- 實現 Callable 接口。
而后兩者的區別在于 Callable 接口中的 call() 方法可以異步地返回一個計算結果 Future,并且一般需要配合 ExecutorService 來執行。這一套操作在代碼實現上似乎也并不難,可是對于call()方法具體怎么(被ExecutorService)執行的,以及 Future 這個結果是怎么獲取的,卻又不是很清楚了。
那么本篇文章,我們就一起來學習下 Callable 接口以及 Future 的使用,主要面向兩個問題:
- 承載著具體任務的 call() 方法如何被執行的?
- 任務的執行結果如何得到?
你可能會說,這兩個難道不是一個問題嗎?任務執行了就會有返回結果,而返回結果也一定是任務執行了才返回的,難道還能返回一個其他任務的結果么??不要著急,耐心的看下去,你就會發現,這兩個還真的就是一個問題。
本文將分為兩個部分,第一部分分別介紹 任務、執行、以及結果這三個概念在 Java API 中的實體和各自的繼承關系,第二部分通過一個簡單的例子回顧他們的用法,再理解下這兩個問題的答案。
Callable、Executor 與 Future
既然是一個任務被執行并返回結果,那么我們先來看看具體的任務,也就是 Callable 接口。
任務:Callable
非常簡單,只包含一個有泛型「返回值」的 call() 方法,需要在最后返回定義類型的結果。如果任務沒有需要返回的結果,那么將泛型 V 設為 void 并return null;就可以了。對比的是 Runnable,另一個明顯的區別則是 Callable可以拋出異常。
- public interface Callable<V> {
- V call() throws Exception;
- }
- public interface Runnable {
- public abstract void run();
- }
執行:ExecutorService
說到線程就少不了線程池,而說到線程池肯定離不開 Executor 接口。下面這幅圖是 Executor 的框架,我們常用的是其中的兩個具體實現類 ThreadPoolExecutor 以及 ScheduledThreadPoolExecutor,在 Executors 類中通過靜態方法獲取。Executors 中包含了線程池以及線程工廠的構造,與 Executor 接口的關系類似于 Collection 接口和 Collections 類的關系。
那么我們自頂向下,從源碼上了解一下 Executor 框架,學習學習任務是如何被執行的。首先是 Executor 接口,其中只定義了 execute() 方法。
- public interface Executor {
- void execute(Runnable command);
- }
ExecutorService 接口繼承了 Executor 接口,主要擴展了一系列的 submit() 方法以及對 executor 的終止和判斷狀態。以第一個 Future submit(Callable task);為例,其中 task 為用戶定義的執行的異步任務,Future 表示了任務的執行結果,泛型 T 代表任務結果的類型。
- public interface ExecutorService extends Executor {
- void shutdown(); // 現有任務完成后停止線程池
- List<Runnable> shutdownNow(); // 立即停止線程池
- boolean isShutdown(); // 判斷是否已停止
- boolean isTerminated();
- <T> Future<T> submit(Callable<T> task); // 提交Callale任務
- <T> Future<T> submit(Runnable task, T result);
- Future<?> submit(Runnable task);
- // 針對Callable集合的invokeAll()等方法
- }
抽象類AbstractExecutorService 是 ThreadPoolExecutor 的基類,在下面的代碼中,它實現了ExecutorService 接口中的 submit() 方法。注釋中是對應的 newTaskFor() 方法的代碼,非常簡單,就是將傳入的Callable 或 Runnable 參數封裝成一個 FutureTask 對象。
- // 1.第一個重載方法,參數為Callable
- public <T> Future<T> submit(Callable<T> task) {
- if (task == null) throw new NullPointerException();
- RunnableFuture<T> ftask = newTaskFor(task);
- // return new FutureTask<T>(callable);
- execute(ftask);
- return ftask;
- }
- // 2.第二個重載方法,參數為Runnable
- public Future<?> submit(Runnable task) {
- if (task == null) throw new NullPointerException();
- RunnableFuture<Void> ftask = newTaskFor(task, null);
- // return new FutureTask<T>(task, null);
- execute(ftask);
- return ftask;
- }
- // 3.第三個重載方法,參數為Runnable + 返回對象
- public <T> Future<T> submit(Runnable task, T result) {
- if (task == null) throw new NullPointerException();
- RunnableFuture<T> ftask = newTaskFor(task, result);
- // return new FutureTask<T>(task, result);
- execute(ftask);
- return ftask;
- }
那么也就是說,無論傳入的是 Callable 還是 Runnable,submit() 方法其實就做了三件事
具體來說,submit() 中首先生成了一個 RunnableFuture 引用的 FutureTask 實例,然后調用 execute() 方法來執行它,那么我們可以推測 FutureTask 繼承自 RunnableFuture,而 RunnableFuture 又實現了 Runnable,因為execute() 的參數應為 Runnable 類型。上面還涉及到了 FutureTask 的構造函數,也來看一下。
- public FutureTask(Callable<V> callable) {
- this.callable = callable;
- this.state = NEW;
- }
- public FutureTask(Runnable runnable, V result) {
- this.callable = Executors.callable(runnable, result); // 通過適配器將runnable在call()中執行并返回result
- this.state = NEW;
- }
FutureTask 共有兩個構造方法。第一個構造方法比較簡單,對應上面的第一個 submit(),采用組合的方式封裝Callable 并將狀態設為NEW;而第二個構造方法對應上面的后兩個 submit() 重載,不同之處是首先使用了Executors.callable來將 Runnable 和 result 組合成 Callable,這里采用了適配器RunnableAdapter implements Callable,巧妙地在 call() 中執行 Runnable 并返回結果。
- static final class RunnableAdapter<T> implements Callable<T> {
- final Runnable task;
- final T result; // 返回的結果;顯然:需要在run()中賦值
- RunnableAdapter(Runnable task, T result) {
- this.task = task;
- this.result = result;
- }
- public T call() {
- task.run();
- return result;
- }
- }
在適配器設計模式中,通常包含目標接口 Target、適配器 Adapter 和被適配者 Adaptee 三類角色,其中目標接口代表客戶端(當前業務系統)所需要的功能,通常為借口或抽象類;被適配者為現存的不能滿足使用需求的類;適配器是一個轉換器,也稱 wrapper,用于給被適配者添加目標功能,使得客戶端可以按照目標接口的格式正確訪問。對于 RunnableAdapter 來說,Callable 是其目標接口,而 Runnable 則是被適配者。RunnableAdapter 通過覆蓋 call() 方法使其可按照 Callable 的要求來使用,同時其構造方法中接收被適配者和目標對象,滿足了 call() 方法有返回值的要求。
那么總結一下 submit() 方法執行的流程,就是:「Callable 被封裝在 Runnable 的子類中傳入 execute() 得以執行」。
結果:Future
要說 Future 就是異步任務的執行結果其實并不準確,因為它代表了一個任務的執行過程,有狀態、可以被取消,而 get() 方法的返回值才是任務的結果。
- public interface Future<V> {
- boolean cancel(boolean mayInterruptIfRunning);
- boolean isCancelled();
- boolean isDone();
- V get() throws InterruptedException, ExecutionException;
- V get(long timeout, TimeUnit unit)
- throws InterruptedException, ExecutionException, TimeoutException;
- }
我們在上面中還提到了 RuunableFuture 和 FutureTask。從官方的注釋來看,RuunableFuture 就是一個可以 run的 future,實現了 Runnable 和 Future 兩個接口,在 run() 方法中執行完計算時應該將結果保存起來以便通過 get()獲取。
- public interface RunnableFuture<V> extends Runnable, Future<V> {
- /**
- * Sets this Future to the result of its computation unless it has been cancelled.
- */
- void run();
- }
FutureTask 直接實現了 RunnableFuture 接口,作為執行過程,共有下面這幾種狀態,其中 COMPLETING 為一個暫時狀態,表示正在設置結果或異常,對應的,設置完成后狀態變為 NORMAL 或 EXCEPTIONAL;CANCELLED、INTERRUPTED 表示任務被取消或中斷。在上面的構造方法中,將 state 初始化為 NEW。
- private volatile int state;
- private static final int NEW = 0;
- private static final int COMPLETING = 1;
- private static final int NORMAL = 2;
- private static final int EXCEPTIONAL = 3;
- private static final int CANCELLED = 4;
- private static final int INTERRUPTING = 5;
- private static final int INTERRUPTED = 6;
然后是 FutureTask 的主要內容,主要是 run() 和 get()。注意 outcome 的注釋,無論是否發生異常返回的都是這個 outcome,因為在執行中如果執行成功就將結果設置給了它(set()),而發生異常時將異常賦給了他(setException()),而在獲取結果時也都返回了 outcome(通過report())。
- public class FutureTask<V> implements RunnableFuture<V> {
- private Callable<V> callable; // target,待執行的任務
- /** 保存執行結果或異常,在get()方法中返回/拋出 */
- private Object outcome; // 非volatile,通過CAS保證線程安全
- public void run() {
- ......
- Callable<V> c = callable;
- if (c != null && state == NEW) {
- V result;
- boolean ran;
- try {
- result = c.call(); // 調用call()執行用戶任務并獲取結果
- ran = true; // 執行完成,ran置為true
- } catch (Throwable ex) { // 調用call()出現異常,而run()方法繼續執行
- result = null;
- ran = false;
- setException(ex);
- // setException(Throwable t): compareAndSwapInt(NEW, COMPLETING); outcome = t;
- }
- if (ran)
- set(result);
- // set(V v): compareAndSwapInt(NEW, COMPLETING); outcome = v;
- }
- }
- public V get() throws InterruptedException, ExecutionException {
- int s = state;
- if (s <= COMPLETING)
- s = awaitDone(false, 0L); // 加入隊列等待COMPLETING完成,可響應超時、中斷
- return report(s);
- }
- public V get(long timeout, TimeUnit unit)
- throws InterruptedException, ExecutionException, TimeoutException {
- // 超時等待
- }
- private V report(int s) throws ExecutionException {
- Object x = outcome;
- if (s == NORMAL) // 將outcome作為執行結果返回
- return (V)x;
- if (s >= CANCELLED)
- throw new CancellationException();
- throw new ExecutionException((Throwable)x); // 將outcome作為捕獲的返回
- }
- }
FutureTask 實現了 RunnableFuture 接口,所以有兩方面的作用。
第一,作為 Runnable 傳入 execute() 方法來執行,同時封裝 Callable 對象并在 run() 中調用其 call() 方法;
第二,作為 Future 管理任務的執行狀態,將 call() 的返回值保存在 outcome 中以通過 get() 獲取。這似乎就能回答開頭的兩個問題,并且渾然天成,就好像是一個問題,除非發生異常的時候返回的不是任務的結果而是異常對象。
總結一下繼承關系:
二、使用舉例
文章的標題有點唬人,說到底還是講 Callable 的用法。現在我們知道了 Future 代表了任務執行的過程和結果,作為 call() 方法的返回值來獲取執行結果;而 FutureTask 是一個 Runnable 的 Future,既是任務執行的過程和結果,又是 call 方法最終執行的載體。下面通過一個例子看看他們在使用上的區別。
首先創建一個任務,即定義一個任務類實現 Callable 接口,在 call() 方法里添加我們的操作,這里用耗時三秒然后返回 100 模擬計算過程。
- class MyTask implements Callable<Integer> {
- @Override
- public Integer call() throws Exception {
- System.out.println("子線程開始計算...");
- for (int i=0;i<3;++i){
- Thread.sleep(1000);
- System.out.println("子線程計算中,用時 "+(i+1)+" 秒");
- }
- System.out.println("子線程計算完成,返回:100");
- return 100;
- }
- }
然后呢,創建一個線程池,并實例化一個 MyTask 備用。
- ExecutorService executor = Executors.newCachedThreadPool();
- MyTask task = new MyTask();
現在,分別使用 Future 和 FutureTask 來獲取執行結果,看看他們有什么區別。
使用Future
Future 一般作為 submit() 的返回值使用,并在主線程中以阻塞的方式獲取異步任務的執行結果。
- System.out.println("主線程啟動線程池");
- Future<Integer> future = executor.submit(task);
- System.out.println("主線程得到返回結果:"+future.get());
- executor.shutdown();
看看輸出結果:
- 主線程啟動線程池
- 子線程開始計算...
- 子線程計算中,用時 1 秒
- 子線程計算中,用時 2 秒
- 子線程計算中,用時 3 秒
- 子線程計算完成,返回:100
- 主線程得到返回結果:100
主線程啟動線程池子線程開始計算...子線程計算中,用時 1 秒子線程計算中,用時 2 秒子線程計算中,用時 3 秒子線程計算完成,返回:100主線程得到返回結果:100
由于 get() 方法阻塞獲取結果,所以輸出順序為子線程計算完成后主線程輸出結果。
使用FutureTask
由于 FutureTask 集「任務與結果」于一身,所以我們可以使用 FutureTask 自身而非返回值來管理任務,這需要首先利用 Callable 對象來構造 FutureTask,并調用不同的submit()重載方法。
- System.out.println("主線程啟動線程池");
- FutureTask<Integer> futureTask = new FutureTask<>(task);
- executor.submit(futureTask); // 作為Ruunable傳入submit()中
- System.out.println("主線程得到返回結果:"+futureTask.get()); // 作為Future獲取結果
- executor.shutdown();
這段程序的輸出與上面中完全相同,其實兩者在實際執行中的區別也不大,雖然前者調用了submit(Callable task)而后者調用了submit(Runnable task),但最終都通過execute(futuretask)來把任務加入線程池中。
總結
上面大費周章其實只是盡可能細致地講清楚了 Callable 中的任務是如何執行的,總結起來就是:
線程池中,submit() 方法實際上將 Callable 封裝在 FutureTask 中,將其作為 Runnable 的子類傳給 execute()真正執行;
FutureTask 在 run() 中調用 Callable 對象的 call() 方法并接收返回值或捕獲異常保存在Object outcome中,同時管理執行過程中的狀態state;
FutureTask 同時作為 Future 的子類,通過 get() 返回任務的執行結果,若未執行完成則通過等待隊列進行阻塞等待完成;
FutureTask 作為一個 Runnable 的 Future,其中最重要的兩個方法如下。
本文轉載自微信公眾號「Java建設者」,可以通過以下二維碼關注。轉載本文請聯系Java建設者公眾號。