如果你要非同步(Asyncronous)讀取文字檔案,在檔案讀取完後做某些事,可以使用 ExecutorService 來 submit 一個 Runnable 物件,像是類似以下的流程:
public static Future readFileAsync(String file, Consumer<String> success,
                        Consumer<IOException> fail, ExecutorService service) {
    return service.submit(() -> {
        try {
            success.accept(new String(Files.readAllBytes(Paths.get(file))));
        } catch (IOException ex) {
            fail.accept(ex);
        }
    });
}
這麼一來,你就可使用以下非同步的風格來讀取一個文字檔案:
readFileAsync(args[0], 
    content -> out.println(content),  // 成功處理
    ex -> ex.printStackTrace(),       // 失敗處理
    Executors.newFixedThreadPool(10)
);
out.println(content) 與 ex.printStackTrace() 會在與讀取檔案的同一個執行緒中進行,如果你想要在不同執行緒中進行,得再額外作些設計;另一方面,這種非同步操作使用的回呼(Callback)風格,在每次回呼中若又再度進行非同步操作及回呼,很容易寫出回呼地獄(Callback hell),造成可讀性不佳。例如若有個類似 readFileAsync 風格的非同步 processContentAsync 方法,用來再繼續處理 readFileAsync 讀取的檔案內容,那麼可能撰寫出以下的程式碼:
readFileAsync(args[0],
    content -> processContentAsync(content,
                 processedContent -> out.println(processedContent) ,
                 ex -> ex.printStackTrace(), service),
    ex -> ex.printStackTrace(), service);
實際上非同步處理的組合需求很多,為此,JDK8 新增了 java.util.concurrent.CompletableFuture,你可以使用它來改寫 readFileAsync,例如:
package cc.openhome;
import java.io.IOException;
import static java.lang.System.out;
import java.nio.file.*;
import java.util.concurrent.*;
public class Async {
    public static CompletableFuture<String> readFileAsync(
                       String file, ExecutorService service) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                return new String(Files.readAllBytes(Paths.get(file)));
            } catch(IOException ex) {
                throw new RuntimeException(ex);
            }
        }, service);
    }
    public static void main(String[] args) throws Exception {
        ExecutorService poolService = Executors.newFixedThreadPool(10);
        readFileAsync(args[0], poolService).whenComplete((ok, ex) -> {
            if(ex == null) {
                out.println(ok);
            } else {
                ex.printStackTrace();
            }
        }).join(); // join 是為了避免 main 執行緒在任務完成前就關閉ExecutorService
        poolService.shutdown();
    }
}
CompletableFuture 的靜態方法 supplyAsync 接受 Supplier 實例,可指定非同步執行任務,它會傳回 CompletableFuture 實例,你可以呼叫 whenComplete 以 BiConsumer 實例指定任務完成如何處理,第一個參數是 Supplier 的傳回值,若有例外發生則會指定給第二個參數,想要在任務完成後繼續非同步地處理,則可以使用 whenCompleteAsync 方法。
如果第一個 CompletableFuture 任務完成後,想要繼續以非同步方式來處理結果,可以使用 thenApplyAsync。例如:
readFileAsync(args[0], poolService)
     .thenApplyAsync(String::toUpperCase)
     .whenComplete((ok, ex) -> {
         if(ex == null) {
            out.println(ok);
         } else {
            ex.printStackTrace();
         }
     });
CompletableFuture 實例的方法,基本上都會有同步與非同步兩個版本,可以用 Async 後置名稱來區分,例如,thenApplyAsync 的同步版本就是 thenApply 方法。
〈Optional 與 Stream 的 flatMap〉中談到,Optional 與 Stream 中各定義有 map 方法,可讓你指定 Optional 或 Stream 中的值 T 如何映射為值 U,然後傳回新的 Optional 或 Stream,CompletableFuture 的 thenApply(以及非同步的 thenApply 版本)其實就類似 Optional 或 Stream 的 map,可讓你指定前一個 CompletableFuture 處理後的結果 T 如何映射為值 U,然後傳回新的 CompletableFuture。
該份文件中也談到,Optional 與 Stream 中也各定義有 flatMap 方法,可讓你指定 Optional 或 Stream 中的值 T 與 Optional<U>、Stream<U> 之間的關係,CompletableFuture 也有個 thenCompose(以及非同步的 thenComposeAsnyc 版本),作用就類似 flatMap,可以讓你指定前一個 CompletableFuture 處理後的結果 T 如何映射為值 CompleteableFuture<U>,舉例來說,你想在 readFileAsync 傳回的 CompletableFuture<String> 處理完後,繼續組合 processContentAsync 方法傳回 CompletableFuture<String>,就可以如下撰寫:
readFileAsync(args[0], poolService)
        .thenCompose(content -> processContentAsync(content, poolService))
        .whenComplete((ok, ex) -> {
            if (ex == null) {
                out.println(ok);
            } else {
                ex.printStackTrace();
            }
        });
CompletableFuture 上還有許多方法可以使用,詳情除了參考 API 文件之中,還可以看看〈Java 8: Definitive guide to CompletableFuture〉這篇文章,有 JDK8 之前,可以使用 guava-libraries 的 ListenableFuture,有興趣的話可以參考〈ListenableFuture 聽取未來需求〉,其他各技術生態中的類似產物,可以參考〈Composable Future API〉的介紹。

