Stream API

September 29, 2022

先來看一個程式片段:

String fileName = args[0];
String prefix = args[1];
String firstMatchdLine = "no matched line";
for (String line : Files.readAllLines(Paths.get(fileName))) {
    if(line.startsWith(prefix)) {
        firstMatchdLine = line;
        break;
    }
}
out.println(firstMatchdLine);

Files.lines

這個程式片段會讀取指定的檔案,找到第一個符合條件的行,然後顯示小寫後離開迴圈,這類的需求,其實可以用以下的程式片段來完成:

String fileName = args[0];
String prefix = args[1];
Optional<String> firstMatchdLine =
                Files.lines(Paths.get(fileName))
                     .filter(line -> line.startsWith(prefix))
                     .findFirst();
out.println(firstMatchdLine.orElse("no matched line"));

一眼可見到最大的差別是沒有使用到 for 迴圈與 if 判斷式,以及使用了管線化(Pipeline)操作風格,而效能上也有所差異,如果讀取的檔案很大,第二個程式片段會比第一個程式片段來得有效率。

java.nio.file.Fileslines 方法,會傳回 java.util.stream.Stream 實例,就這個例子來說就是 Stream<String>,使用 Streamfilter 方法會過濾留下符合條件的元素,findFirst 方法會嘗試看看留下的元素有沒有首元素,因為也可能完全沒有元素,因此傳回 Optional<String> 實例。

效能上的差異性在於,第一個程式片段的 Files.readAllLines 方法傳回的是 List<String> 實例,當中包括了檔案中所有行,如果第一行就符合指定的條件了,那後續的行讀取就是多餘的;第二個程式片段的 lines 方法實際上沒有進行任何一行的讀取,filter 也沒有作任何一行的過濾,直到呼叫 findFirst 時,filter 指定的條件才會真正去執行,而此時才會要求 lines 傳回的 Stream 進行第一行讀取,如果第一行就符合,那後續的行就不會再讀取,效率的差異性就在於此。

之所以能夠達到這類惰性求值(Lazy evaluation)的效果,也就是需要時 findFirst 要求 filter ,而 filter 再要求讀取檔案下一行這種你需要我再給的行為,功臣就是 Stream 實例。第一個程式片段要取得 List 傳回的 Iterator,以搭配for迴圈進行外部迭代(External iteration),第二個程式片段則將迭代行為隱藏在 linesfilterfindFirst 方法之中,稱為內部迭代(Internal iteration),因為內部迭代的行為是被隱藏的,因此多了很多可以實現效率的可能性。

在 Stream 的 API 文件 中談到,Stream 繼承了 AutoClosable,而 BaseStream.close 實作了 close 方法,然而基本上,絕大多數的 Stream 並不需要呼叫 close 方法,除了一些 IO 操作之外,例如 Files.linesFiles.listFiles.walk 方法,建議這類操作可以搭配 try-with-resource 語法。

管線操作

Stream API 引入了管線操作風格,一個管線基本上包括了幾個部份:

  • 來源(Source):可能是檔案、陣列、群集(Collection)、產生器(Generator)等,在這個例子就是指定的檔案。
  • 零或多個中介操作(Intermediate operation):又稱為聚合操作(Aggregate operation),這些操作呼叫時,並不會立即進行手邊的資料處理,它們很懶惰(Lazy),只會在後續中介操作要求資料時才會動手處理下一筆資料,像是第二個程式片段中的 filter 方法。
  • 一個最終操作(Terminal operation):最後真正需要結果的操作,這個操作會要求之前懶惰的中介操作開始動手。 這就是 Stream API 之所以命名為 Stream 的原因,Stream 實例銜接了來源,提到中介操作方法,每個中介操作方法都會傳回 Stream 實例,但不會實際進行資料處理,每個中介操作後的 Stream 實例會串連在一起,Stream 亦提供最終操作方法,不是傳回 Stream 而是傳回真正需要的結果,最終操作方法會引發之前串連在一起的 Stream 實例進行資料處理。

實際上從來源進行一些運算,以求得最終結果,正是程式設計時最常進行的動作,因此 JDK 在不少具有來源概念的 API上,都有可傳回 Stream 的方法,除了這邊看到的 BufferedReader 之外,你還可以使用 Stream 上的靜態方法來建立 Stream 實例,像是 of 方法,對於陣列,也可以使用 Arraysstream 方法來建立 Stream 實例。

Collection 也是個例子,其定義了 stream 方法會傳回 Stream 實例,只要是 Collection 都可以進行中介操作。例如,原本有個程式片段:

List<Person> persons = ...;
List<String> names = new ArrayList<>();
for(Person person : persons) {
    if(person.getAge() > 15) {
        names.add(person.getName().toUpperCase());
    }
}

可以改為以下的風格:

List<Person> persons = ...;
List<String> names = persons.stream()
                            .filter(person -> person.getAge() > 15)
                            .map(person -> person.getName().toUpperCase())
                            .collect(toList()); // 使用了 Collectors.toList() 方法

每個中介操作隱藏了細節,除了增加更多效率改進的空間之外,也鼓勵開發者多利用這類風格,來避免撰寫一些重複流程,或思考目前的複雜演算中,實際上會是由哪些小任務完成。

例如,如果你的程式在 for 迴圈中使用了 if

for(Person person : persons) {
    if(person.getAge() > 15) {
        // 這是下一個小任務
    }
}

也許就有改用 filter 方法的可能性:

persons.stream()
   .filter(person -> person.getAge() > 15);

如果你的程式在 for 迴圈中從一個型態對應至另一個型態:

for(Person person : persons) {
    ...
        ...person.getName().toUpperCase()...
    ...
}

也許就有改用 map 方法的可能性:

persons.stream()
       .map(person -> person.getName().toUpperCase());

許多時候,for 迴圈中就是滲雜了許多小任務,從而使 for 迴圈中的程式碼艱澀難懂,辨識出這些小任務,運用中介操作,形成管線化操作風格,就能增加程式碼閱讀時的流暢性。

Stream 實際上繼承自 java.util.stream.BaseStream,而 BaseStream 還有 DoubleStreamIntStreamLongStream 這三個用於基本型態操作的子介面。

Stream 只能迭代一次,重複對 Stream 進行迭代,會引發 IllegalStateException

reduce 與 collect

從一組數據依條件求得一個數,或將一組數據依條件收集至另一個容器,程式設計中不少地方都存在這類需求,使用迴圈解決這類需求,也是許多開發者最常採用的動作。舉例來說,求得一組人的男性平均年齡:

List<Person> persons = ...;
int sum = 0;
for(Person person : persons) {
    if(person.getGender() == Person.Gender.MALE) {
        sum += person.getAge();
    }
}
int average = sum / persons.size();

實際上,迴圈中進行的也是求得年齡加總,而若要求得一組人的男性最大年齡:

int max = 0;
for(Person person : persons) {
    if(person.getGender() == Person.Gender.MALE) {
        if(person.getAge() > max) {
            max = person.getAge();
        }
    }
}

實際上,你的程式中這類需求都存在著類似地流程結構,而你也不斷重複撰寫著類似結構,而且從閱讀程式碼角度來看,無法一眼察覺程式意圖,在JDK8中,可以改寫為:

int sum = persons.stream()
                 .filter(person -> person.getGender() == Person.Gender.MALE)
                 .mapToInt(Person::getAge)
                 .sum();
            
int average = (int) persons.stream()
                           .filter(person -> person.getGender() == Person.Gender.MALE)
                           .mapToInt(Person::getAge)
                           .average()
                           .getAsDouble();
            
int max = persons.stream()
                 .filter(person -> person.getGender() == Person.Gender.MALE)
                 .mapToInt(Person::getAge)
                 .max()
                 .getAsInt();

IntStream 提供了 sumaveragemaxmin 等方法,那麼如果有其它的計算需求呢?觀察先前的迴圈結構,實際上都是將一組數據逐步取出削減,然而透過指定運算以取得結果的結構,JDK 將這個流程結構通用化,定義了 reduce 方法來達到自訂運算需求。例如,以上三個流程,也可以使用 reduce 重新撰寫如下:

int sum = persons.stream()
                 .filter(person -> person.getGender() == Person.Gender.MALE)
                 .mapToInt(Person::getAge)
                 .reduce((total, age) ->  total + age)
                 .getAsInt();

long males = persons.stream()
                .filter(person -> person.getGender() == Person.Gender.MALE)
                .count();

int average = persons.stream()
                     .filter(person -> person.getGender() == Person.Gender.MALE)
                     .mapToInt(Person::getAge)
                     .reduce((total, age) ->  total + age)
                     .getAsInt() / males;
           
int max = persons.stream()
                 .filter(person -> person.getGender() == Person.Gender.MALE)
                 .mapToInt(Person::getAge)
                 .reduce(0, (currentMax, age) -> age > currentMax ? age : currentMax);

reduce 的 Lambda 表示式,必須接受兩個引數,第一個引數為走訪該組數據上一元素後的運算結果,第二個引數為目前走訪元素,Lambda 表示式本體就是你原先在迴圈中打算進行的運算;reduce 如果如上例中首兩個程式片段沒有指定初值,就會試著使用該組數據中第一個元素,作為第一次呼叫 Lambda 表示式時的第一個引數值,因為考量到數據組可能為空,因此 reduce 不指定初值的版本,會傳回 OptionalInt(非基本型態數據組,則會是 Optional)。

那麼!如果想將一組人的男性收集至另一個 List 呢?在 persons.stream().filter(person -> person.getGender() == Person.Gender.MALE) 之後,傳回的是 Stream<Person>,因為 filterStream 的中介操作,不是最終操作,使用 reduce 的話,在處理完新元素後,每次都會傳回新的計算結果,作為下一次 Lambda 表示式接受的第一個引數,顯然不適合用來收集物件。

可以使用 Streamcollect 方法,以將一組人的男性收集至另一個 List 的需求來說,最簡單的方式就是:

List<Person> males = persons.stream()
                            .filter(person -> person.getGender() == Person.Gender.MALE)
                            .collect(toList()); // toList() 是 java.util.stream.Collectors 的靜態方法

CollectorstoList 方法傳回的並不是 List,而是 java.util.stream.Collector 實例,Collector 主要的四個方法是:

  • suppiler:傳回 Suppiler,定義收集結果的新容器如何建立
  • accumulator:傳回 BiConsumer,定義如何使用結果容器收集物件
  • combiner:傳回 BinaryOperator,定義若有兩個結果容器時,如何合併為一個結果容器
  • finisher:傳回 Function,選擇性地定義如何將結果轉換為最後的結果容器

來看看 Streamcollect 方法另一個版本,有助於瞭解 Collector 這幾個方法如何使用,以下的程式片段與上面的 collect 範例結果是相同的:

List<Person> males = persons.stream()
                            .filter(person -> person.getGender() == Person.Gender.MALE)
                            .collect(
                                 () -> new ArrayList<>(),
                                 (maleLt, person) -> maleLt.add(person),
                                 (maleLt1, maleLt2) -> maleLt1.addAll(maleLt2)
                            );

collect 需要收集物件時,會使用第一個 Lambda 來取得容器物件,這相當於 Collectorsuppiler 之作用,第二個 Lambda 定義了如何收集物件,也就是 Collectoraccumulator 之作用,在使用具有平行處理能力的 Stream 時,有可能會使用多個容器對原數據組進行分而治之(Divide and conquer),當每個小任務完成時,該如何合併,就是第三個 Lambd a要定義的,喔!別忘了可以用方法參考,因此上面可以寫成以下比較簡潔:

List<Person> males = persons.stream()
                            .filter(person -> person.getGender() == Person.Gender.MALE)
                            .collect(ArrayList::new, ArrayList::add, ArrayList::addAll);

當然,使用這個版本的 collect 需要處理比較多的細節,你可以先看看 Collectors 提供了哪些 Collector 實作。舉例來說,如果想要依性別分組,那可以使用 CollectorsgroupingBy 方法,告訴它要用哪個當作分組的鍵(Key),最後傳回的Map結果會以List作為值(Key):

Map<Person.Gender, List<Person>> males = persons.stream()
                  .collect(
                      groupingBy(Person::getGender));

有的方法也兼具另一種流暢風格,例如,想在依性別分組之後,取得分組下的姓名,那可以如下撰寫:

Map<Person.Gender, List<String>> males = persons.stream()
                  .collect(
                      groupingBy(Person::getGender,
                      mapping(Person::getName,
                  toList())));

例如,想在依性別分組之後,分別取得男女年齡加總,那可以如下撰寫:

Map<Person.Gender, Integer> males = persons.stream()
                  .collect(
                          groupingBy(Person::getGender,
                               reducing(0, Person::getAge, Integer::sum))
                  );

要求得各性別下平均年齡的話,Collectors 也有個 averagingInt 方法可以使用:

Map<Person.Gender, Double> males = persons.stream()
                  .collect(
                          groupingBy(Person::getGender,
                               averagingInt(Person::getAge))
                  );

平行化

方才提到,Collectoraccumulator 之作用,在使用具有平行處理能力的 Stream 時…嗯?這表示 Stream 有辦法進行平行處理?是的,想獲得平行處理能力可以說很簡單,例如這段程式碼:

List<Person> males = persons.stream()
                            .filter(person -> person.getGender() == Person.Gender.MALE)
                            .collect(ArrayList::new, ArrayList::add, ArrayList::addAll);

只要改成以下,就可能擁有平行處理之能力:

List<Person> males = persons.parallelStream()
                            .filter(person -> person.getGender() == Person.Gender.MALE)
                            .collect(ArrayList::new, ArrayList::add, ArrayList::addAll);

CollectionparallelStream 方法,傳回的 Stream 實例在實作時,會在可能的情況下進行平行處理,Java 希望你想進行平行處理時,必須有明確的語義,這也是為什麼會有 streamparallelStream 兩個方法,前者代表循序(Serial)處理,後者代表平行處理,想要知道 Stream 是否為平行處理,可以呼叫 isParallel 來得知。

使用了 parallelStream,不代表一定會平行處理而使得執行必然變快,要呼叫哪個方法,必須思考你的處理過程是否能夠分而治之(Divide and conquer)而後合併結果,在這個例子中,filtercollect 方法基本上都有可能。

類似地,CollectorsgroupingBygroupingByConcurrent 兩個方法,前者代表循序處理,後者代表平行處理,是否呼叫後者,同樣你得思考處理過程是否能夠分而治之而後合併結果,如果可能,方能從中獲益。例如原先有段程式:

Map<Person.Gender, List<Person>> males = persons.stream()
                  .collect(
                      groupingBy(Person::getGender));

想要在可能的情況下進行平行處理,可以改為:

Map<Person.Gender, List<Person>> males = persons.parallelStream()
                  .collect(
                      groupingByConcurrent(Person::getGender));

Stream 實例若具有平行處理能力,處理過程會分而治之,也就是將任務切割為小任務,這表示每個小任務都是一個管線化操作,因此像以下的程式片段:

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
numbers.parallelStream()
       .forEach(out::println);

你得到的顯示順序不會是 1、2、3、4、5、6、7、8、9,而可能是任意的順序,就 forEach 這個終結操作來說,如果於平行處理時,希望最後順序是照著原來 Stream 來源的順序,那可以呼叫 forEachOrdered。例如:

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
numbers.parallelStream()
       .forEachOrdered(out::println);

在管線化操作時,如果 forEachOrdered 中間有其他如 filter 的中介操作,會試著平行化處理,然後最終 forEachOrdered 會以來源順序處理,因此,使用 forEachOrdered 這類的有序的處理時,可能會(或完全失去)失去平行化的一些優勢,實際上中介操作亦有可能如此,例如 sorted 方法。

使用 Streamreducecollect 時,平行處理時也得留意一下順序,API 文件上基本上會記載終結操作時是否依來源順序,reduce 基本上是按照來源順序,而 collect 得視給予的 Collector 而定,在以下兩個例子,collect都是依照來源順序處理:

List<Person> males = persons.parallelStream()
                            .filter(person -> person.getGender() == Person.Gender.MALE)
                            .collect(ArrayList::new, ArrayList::add, ArrayList::addAll);

List<Person> males = persons.parallelStream()
                            .filter(person -> person.getGender() == Person.Gender.MALE)
                            .collect(toList());

collect 操作時若想要有平行效果,必須符合以下三個條件:

  • Stream 必須有平行處理能力。
  • 傳入的 Collector必須有 Collector.Characteristics.CONCURRENT 特性。
  • Stream 是無序的或者是 Collector 具有 Collector.Characteristics.UNORDERED 特性。

想要知道 Collector 具有 Collector.Characteristics.UNORDEREDCollector.Characteristics.UNORDERED,可以呼叫 Collectorcharacteristics 方法,平行處理的 Stream 基本上是無序的,如果不放心,可以呼叫 Streamunordered 方法。

Colllector 具有 CONCURRENTUNORDERED 特性的例子之一是 CollectorsgroupingByConcurrent 方法傳回的實例,因此在最後順序不重要時,使用 groupingByConcurrent 來取代 groupingBy 方法,對效能上會有所幫助。

想要善用 JDK 提供的平行處理能力,你的資料處理過程必須能夠分而治之,而後將每個小任務的結果加以合併,這表示當 API 在處理小任務時,你不應該進行干預,例如:

numbers.parallelStream()
       .filter(number -> {
            numbers.add(7);
            return number > 5;
       })
       .forEachOrdered(out::println);

無論是基於哪種理由,像這類對來源資料的干擾都令人困惑,實際上無論是否進行平行處理,這樣的程式都會引發 ConcurrentModifiedException

JDK 提供高階語義的管線化 API、在可能的情況下實現惰性、平行處理能力,目的之一是希望你思考處理的過程中,實際上是由哪些小任務組成,在過去,你可能基於(自我想像的)效能增進考量,在迴圈中做了很多件事,因而讓程式變得複雜,現在使用了高階 API,就要避免走回頭路。例如,過去你在寫 for 迴圈時,可能會順便做些動作,像是過濾元素做顯示的同時,將元素作個運算並收集在另一個清單中:

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
List<Integer> alsoLt = new ArrayList<>();
       
for(Integer number : numbers) {
    if(number > 5) {
        alsoLt.add(number + 10);
        out.println(number);
    }
}

使用高階 API 時,記得一次只做一件事:

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);

List<Integer> biggerThan5 = numbers.stream()
                 .filter(number -> number > 5)
                 .collect(toList());
       
biggerThan5.forEach(out::println);
       
List<Integer> alsoLt = biggerThan5.stream()
                .map(number -> number + 10)
                .collect(toList());

避免寫出以下的程式:

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
List<Integer> alsoLt = new ArrayList<>();
       
numbers.stream()
       .filter(number -> {
           boolean isBiggerThan5 = number > 5;
           if(isBiggerThan5) {
               alsoLt.add(number + 10);
           }
           return isBiggerThan5;
        })
        .forEach(out::println);

這樣的程式不僅不易理解,如果你試圖進行平行化處理時:

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
List<Integer> alsoLt = new ArrayList<>();

numbers.parallelStream()
        .filter(number -> {
            boolean isBiggerThan5 = number > 5;
            if(isBiggerThan5) {
                 alsoLt.add(number + 10);
            }
            return isBiggerThan5;
        })
        .forEachOrdered(out::println);

就會發現,alsoLt 的順序並不照著 numbers 的順序,然而上頭一次處理一個任務的版本,可以簡單地改為平行化版本:

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);

List<Integer> biggerThan5 = numbers.parallelStream()
                 .filter(number -> number > 5)
                 .collect(toList());
       
biggerThan5.forEach(out::println);
       
List<Integer> alsoLt = biggerThan5.parallelStream()
                .map(number -> number + 10)
                .collect(toList());

分享到 LinkedIn 分享到 Facebook 分享到 Twitter