如果你要非同步(Asyncronous)讀取文字檔案,在檔案讀取完後做某些事,可以使用 ExecutorService 來 submit 一個 Runnable 物件,像是類似以下的流程:
public static Future readFileAsync(String file, Consumer<String> success,
Consumer<IOException> fail, ExecutorService service) {
return service.submit(() -> {
try {
success.accept(new String(Files.readAllBytes(Paths.get(file))));
} catch (IOException ex) {
fail.accept(ex);
}
});
}
這麼一來,你就可使用以下非同步的風格來讀取一個文字檔案:
readFileAsync(args[0],
content -> out.println(content), // 成功處理
ex -> ex.printStackTrace(), // 失敗處理
Executors.newFixedThreadPool(10)
);
out.println(content) 與 ex.printStackTrace() 會在與讀取檔案的同一個執行緒中進行,如果你想要在不同執行緒中進行,得再額外作些設計;另一方面,這種非同步操作使用的回呼(Callback)風格,在每次回呼中若又再度進行非同步操作及回呼,很容易寫出回呼地獄(Callback hell),造成可讀性不佳。例如若有個類似 readFileAsync 風格的非同步 processContentAsync 方法,用來再繼續處理 readFileAsync 讀取的檔案內容,那麼可能撰寫出以下的程式碼:
readFileAsync(args[0],
content -> processContentAsync(content,
processedContent -> out.println(processedContent) ,
ex -> ex.printStackTrace(), service),
ex -> ex.printStackTrace(), service);
實際上非同步處理的組合需求很多,為此,JDK8 新增了 java.util.concurrent.CompletableFuture,你可以使用它來改寫 readFileAsync,例如:
package cc.openhome;
import java.io.IOException;
import static java.lang.System.out;
import java.nio.file.*;
import java.util.concurrent.*;
public class Async {
public static CompletableFuture<String> readFileAsync(
String file, ExecutorService service) {
return CompletableFuture.supplyAsync(() -> {
try {
return new String(Files.readAllBytes(Paths.get(file)));
} catch(IOException ex) {
throw new RuntimeException(ex);
}
}, service);
}
public static void main(String[] args) throws Exception {
ExecutorService poolService = Executors.newFixedThreadPool(10);
readFileAsync(args[0], poolService).whenComplete((ok, ex) -> {
if(ex == null) {
out.println(ok);
} else {
ex.printStackTrace();
}
}).join(); // join 是為了避免 main 執行緒在任務完成前就關閉ExecutorService
poolService.shutdown();
}
}
CompletableFuture 的靜態方法 supplyAsync 接受 Supplier 實例,可指定非同步執行任務,它會傳回 CompletableFuture 實例,你可以呼叫 whenComplete 以 BiConsumer 實例指定任務完成如何處理,第一個參數是 Supplier 的傳回值,若有例外發生則會指定給第二個參數,想要在任務完成後繼續非同步地處理,則可以使用 whenCompleteAsync 方法。
如果第一個 CompletableFuture 任務完成後,想要繼續以非同步方式來處理結果,可以使用 thenApplyAsync。例如:
readFileAsync(args[0], poolService)
.thenApplyAsync(String::toUpperCase)
.whenComplete((ok, ex) -> {
if(ex == null) {
out.println(ok);
} else {
ex.printStackTrace();
}
});
CompletableFuture 實例的方法,基本上都會有同步與非同步兩個版本,可以用 Async 後置名稱來區分,例如,thenApplyAsync 的同步版本就是 thenApply 方法。
〈Optional 與 Stream 的 flatMap〉中談到,Optional 與 Stream 中各定義有 map 方法,可讓你指定 Optional 或 Stream 中的值 T 如何映射為值 U,然後傳回新的 Optional 或 Stream,CompletableFuture 的 thenApply(以及非同步的 thenApply 版本)其實就類似 Optional 或 Stream 的 map,可讓你指定前一個 CompletableFuture 處理後的結果 T 如何映射為值 U,然後傳回新的 CompletableFuture。
該份文件中也談到,Optional 與 Stream 中也各定義有 flatMap 方法,可讓你指定 Optional 或 Stream 中的值 T 與 Optional<U>、Stream<U> 之間的關係,CompletableFuture 也有個 thenCompose(以及非同步的 thenComposeAsnyc 版本),作用就類似 flatMap,可以讓你指定前一個 CompletableFuture 處理後的結果 T 如何映射為值 CompleteableFuture<U>,舉例來說,你想在 readFileAsync 傳回的 CompletableFuture<String> 處理完後,繼續組合 processContentAsync 方法傳回 CompletableFuture<String>,就可以如下撰寫:
readFileAsync(args[0], poolService)
.thenCompose(content -> processContentAsync(content, poolService))
.whenComplete((ok, ex) -> {
if (ex == null) {
out.println(ok);
} else {
ex.printStackTrace();
}
});
CompletableFuture 上還有許多方法可以使用,詳情除了參考 API 文件之中,還可以看看〈Java 8: Definitive guide to CompletableFuture〉這篇文章,有 JDK8 之前,可以使用 guava-libraries 的 ListenableFuture,有興趣的話可以參考〈ListenableFuture 聽取未來需求〉,其他各技術生態中的類似產物,可以參考〈Composable Future API〉的介紹。

