Stream API
September 29, 2022先來看一個程式片段:
String fileName = args[0];
String prefix = args[1];
String firstMatchdLine = "no matched line";
for (String line : Files.readAllLines(Paths.get(fileName))) {
if(line.startsWith(prefix)) {
firstMatchdLine = line;
break;
}
}
out.println(firstMatchdLine);
Files.lines
這個程式片段會讀取指定的檔案,找到第一個符合條件的行,然後顯示小寫後離開迴圈,這類的需求,其實可以用以下的程式片段來完成:
String fileName = args[0];
String prefix = args[1];
Optional<String> firstMatchdLine =
Files.lines(Paths.get(fileName))
.filter(line -> line.startsWith(prefix))
.findFirst();
out.println(firstMatchdLine.orElse("no matched line"));
一眼可見到最大的差別是沒有使用到 for
迴圈與 if
判斷式,以及使用了管線化(Pipeline)操作風格,而效能上也有所差異,如果讀取的檔案很大,第二個程式片段會比第一個程式片段來得有效率。
java.nio.file.Files
的 lines
方法,會傳回 java.util.stream.Stream
實例,就這個例子來說就是 Stream<String>
,使用 Stream
的 filter
方法會過濾留下符合條件的元素,findFirst
方法會嘗試看看留下的元素有沒有首元素,因為也可能完全沒有元素,因此傳回 Optional<String>
實例。
效能上的差異性在於,第一個程式片段的 Files.readAllLines
方法傳回的是 List<String>
實例,當中包括了檔案中所有行,如果第一行就符合指定的條件了,那後續的行讀取就是多餘的;第二個程式片段的 lines
方法實際上沒有進行任何一行的讀取,filter
也沒有作任何一行的過濾,直到呼叫 findFirst
時,filter
指定的條件才會真正去執行,而此時才會要求 lines
傳回的 Stream
進行第一行讀取,如果第一行就符合,那後續的行就不會再讀取,效率的差異性就在於此。
之所以能夠達到這類惰性求值(Lazy evaluation)的效果,也就是需要時 findFirst
要求 filter
,而 filter
再要求讀取檔案下一行這種你需要我再給的行為,功臣就是 Stream
實例。第一個程式片段要取得 List
傳回的 Iterator
,以搭配for迴圈進行外部迭代(External iteration),第二個程式片段則將迭代行為隱藏在 lines
、filter
與 findFirst
方法之中,稱為內部迭代(Internal iteration),因為內部迭代的行為是被隱藏的,因此多了很多可以實現效率的可能性。
在 Stream 的 API 文件 中談到,Stream
繼承了 AutoClosable
,而 BaseStream.close
實作了 close
方法,然而基本上,絕大多數的 Stream
並不需要呼叫 close
方法,除了一些 IO 操作之外,例如 Files.lines
、Files.list
與 Files.walk
方法,建議這類操作可以搭配 try-with-resource 語法。
管線操作
Stream API 引入了管線操作風格,一個管線基本上包括了幾個部份:
- 來源(Source):可能是檔案、陣列、群集(Collection)、產生器(Generator)等,在這個例子就是指定的檔案。
- 零或多個中介操作(Intermediate operation):又稱為聚合操作(Aggregate operation),這些操作呼叫時,並不會立即進行手邊的資料處理,它們很懶惰(Lazy),只會在後續中介操作要求資料時才會動手處理下一筆資料,像是第二個程式片段中的
filter
方法。 - 一個最終操作(Terminal operation):最後真正需要結果的操作,這個操作會要求之前懶惰的中介操作開始動手。
這就是 Stream API 之所以命名為
Stream
的原因,Stream
實例銜接了來源,提到中介操作方法,每個中介操作方法都會傳回Stream
實例,但不會實際進行資料處理,每個中介操作後的Stream
實例會串連在一起,Stream
亦提供最終操作方法,不是傳回Stream
而是傳回真正需要的結果,最終操作方法會引發之前串連在一起的Stream
實例進行資料處理。
實際上從來源進行一些運算,以求得最終結果,正是程式設計時最常進行的動作,因此 JDK 在不少具有來源概念的 API上,都有可傳回 Stream
的方法,除了這邊看到的 BufferedReader
之外,你還可以使用 Stream
上的靜態方法來建立 Stream
實例,像是 of
方法,對於陣列,也可以使用 Arrays
的 stream
方法來建立 Stream
實例。
Collection
也是個例子,其定義了 stream
方法會傳回 Stream
實例,只要是 Collection
都可以進行中介操作。例如,原本有個程式片段:
List<Person> persons = ...;
List<String> names = new ArrayList<>();
for(Person person : persons) {
if(person.getAge() > 15) {
names.add(person.getName().toUpperCase());
}
}
可以改為以下的風格:
List<Person> persons = ...;
List<String> names = persons.stream()
.filter(person -> person.getAge() > 15)
.map(person -> person.getName().toUpperCase())
.collect(toList()); // 使用了 Collectors.toList() 方法
每個中介操作隱藏了細節,除了增加更多效率改進的空間之外,也鼓勵開發者多利用這類風格,來避免撰寫一些重複流程,或思考目前的複雜演算中,實際上會是由哪些小任務完成。
例如,如果你的程式在 for
迴圈中使用了 if
:
for(Person person : persons) {
if(person.getAge() > 15) {
// 這是下一個小任務
}
}
也許就有改用 filter
方法的可能性:
persons.stream()
.filter(person -> person.getAge() > 15);
如果你的程式在 for
迴圈中從一個型態對應至另一個型態:
for(Person person : persons) {
...
...person.getName().toUpperCase()...
...
}
也許就有改用 map
方法的可能性:
persons.stream()
.map(person -> person.getName().toUpperCase());
許多時候,for
迴圈中就是滲雜了許多小任務,從而使 for
迴圈中的程式碼艱澀難懂,辨識出這些小任務,運用中介操作,形成管線化操作風格,就能增加程式碼閱讀時的流暢性。
Stream
實際上繼承自 java.util.stream.BaseStream
,而 BaseStream
還有 DoubleStream
、IntStream
與 LongStream
這三個用於基本型態操作的子介面。
Stream
只能迭代一次,重複對 Stream
進行迭代,會引發 IllegalStateException
。
reduce 與 collect
從一組數據依條件求得一個數,或將一組數據依條件收集至另一個容器,程式設計中不少地方都存在這類需求,使用迴圈解決這類需求,也是許多開發者最常採用的動作。舉例來說,求得一組人的男性平均年齡:
List<Person> persons = ...;
int sum = 0;
for(Person person : persons) {
if(person.getGender() == Person.Gender.MALE) {
sum += person.getAge();
}
}
int average = sum / persons.size();
實際上,迴圈中進行的也是求得年齡加總,而若要求得一組人的男性最大年齡:
int max = 0;
for(Person person : persons) {
if(person.getGender() == Person.Gender.MALE) {
if(person.getAge() > max) {
max = person.getAge();
}
}
}
實際上,你的程式中這類需求都存在著類似地流程結構,而你也不斷重複撰寫著類似結構,而且從閱讀程式碼角度來看,無法一眼察覺程式意圖,在JDK8中,可以改寫為:
int sum = persons.stream()
.filter(person -> person.getGender() == Person.Gender.MALE)
.mapToInt(Person::getAge)
.sum();
int average = (int) persons.stream()
.filter(person -> person.getGender() == Person.Gender.MALE)
.mapToInt(Person::getAge)
.average()
.getAsDouble();
int max = persons.stream()
.filter(person -> person.getGender() == Person.Gender.MALE)
.mapToInt(Person::getAge)
.max()
.getAsInt();
IntStream
提供了 sum
、average
、max
、min
等方法,那麼如果有其它的計算需求呢?觀察先前的迴圈結構,實際上都是將一組數據逐步取出削減,然而透過指定運算以取得結果的結構,JDK 將這個流程結構通用化,定義了 reduce
方法來達到自訂運算需求。例如,以上三個流程,也可以使用 reduce
重新撰寫如下:
int sum = persons.stream()
.filter(person -> person.getGender() == Person.Gender.MALE)
.mapToInt(Person::getAge)
.reduce((total, age) -> total + age)
.getAsInt();
long males = persons.stream()
.filter(person -> person.getGender() == Person.Gender.MALE)
.count();
int average = persons.stream()
.filter(person -> person.getGender() == Person.Gender.MALE)
.mapToInt(Person::getAge)
.reduce((total, age) -> total + age)
.getAsInt() / males;
int max = persons.stream()
.filter(person -> person.getGender() == Person.Gender.MALE)
.mapToInt(Person::getAge)
.reduce(0, (currentMax, age) -> age > currentMax ? age : currentMax);
給 reduce
的 Lambda 表示式,必須接受兩個引數,第一個引數為走訪該組數據上一元素後的運算結果,第二個引數為目前走訪元素,Lambda 表示式本體就是你原先在迴圈中打算進行的運算;reduce
如果如上例中首兩個程式片段沒有指定初值,就會試著使用該組數據中第一個元素,作為第一次呼叫 Lambda 表示式時的第一個引數值,因為考量到數據組可能為空,因此 reduce
不指定初值的版本,會傳回 OptionalInt
(非基本型態數據組,則會是 Optional
)。
那麼!如果想將一組人的男性收集至另一個 List
呢?在 persons.stream().filter(person -> person.getGender() == Person.Gender.MALE)
之後,傳回的是 Stream<Person>
,因為 filter
是 Stream
的中介操作,不是最終操作,使用 reduce
的話,在處理完新元素後,每次都會傳回新的計算結果,作為下一次 Lambda 表示式接受的第一個引數,顯然不適合用來收集物件。
可以使用 Stream
的 collect
方法,以將一組人的男性收集至另一個 List
的需求來說,最簡單的方式就是:
List<Person> males = persons.stream()
.filter(person -> person.getGender() == Person.Gender.MALE)
.collect(toList()); // toList() 是 java.util.stream.Collectors 的靜態方法
Collectors
的 toList
方法傳回的並不是 List
,而是 java.util.stream.Collector
實例,Collector
主要的四個方法是:
suppiler
:傳回Suppiler
,定義收集結果的新容器如何建立accumulator
:傳回BiConsumer
,定義如何使用結果容器收集物件combiner
:傳回BinaryOperator
,定義若有兩個結果容器時,如何合併為一個結果容器finisher
:傳回Function
,選擇性地定義如何將結果轉換為最後的結果容器
來看看 Stream
的 collect
方法另一個版本,有助於瞭解 Collector
這幾個方法如何使用,以下的程式片段與上面的 collect
範例結果是相同的:
List<Person> males = persons.stream()
.filter(person -> person.getGender() == Person.Gender.MALE)
.collect(
() -> new ArrayList<>(),
(maleLt, person) -> maleLt.add(person),
(maleLt1, maleLt2) -> maleLt1.addAll(maleLt2)
);
當 collect
需要收集物件時,會使用第一個 Lambda 來取得容器物件,這相當於 Collector
的 suppiler
之作用,第二個 Lambda 定義了如何收集物件,也就是 Collector
的 accumulator
之作用,在使用具有平行處理能力的 Stream
時,有可能會使用多個容器對原數據組進行分而治之(Divide and conquer),當每個小任務完成時,該如何合併,就是第三個 Lambd a要定義的,喔!別忘了可以用方法參考,因此上面可以寫成以下比較簡潔:
List<Person> males = persons.stream()
.filter(person -> person.getGender() == Person.Gender.MALE)
.collect(ArrayList::new, ArrayList::add, ArrayList::addAll);
當然,使用這個版本的 collect
需要處理比較多的細節,你可以先看看 Collectors
提供了哪些 Collector
實作。舉例來說,如果想要依性別分組,那可以使用 Collectors
的 groupingBy
方法,告訴它要用哪個當作分組的鍵(Key),最後傳回的Map結果會以List作為值(Key):
Map<Person.Gender, List<Person>> males = persons.stream()
.collect(
groupingBy(Person::getGender));
有的方法也兼具另一種流暢風格,例如,想在依性別分組之後,取得分組下的姓名,那可以如下撰寫:
Map<Person.Gender, List<String>> males = persons.stream()
.collect(
groupingBy(Person::getGender,
mapping(Person::getName,
toList())));
例如,想在依性別分組之後,分別取得男女年齡加總,那可以如下撰寫:
Map<Person.Gender, Integer> males = persons.stream()
.collect(
groupingBy(Person::getGender,
reducing(0, Person::getAge, Integer::sum))
);
要求得各性別下平均年齡的話,Collectors
也有個 averagingInt
方法可以使用:
Map<Person.Gender, Double> males = persons.stream()
.collect(
groupingBy(Person::getGender,
averagingInt(Person::getAge))
);
平行化
方才提到,Collector
的 accumulator
之作用,在使用具有平行處理能力的 Stream
時…嗯?這表示 Stream
有辦法進行平行處理?是的,想獲得平行處理能力可以說很簡單,例如這段程式碼:
List<Person> males = persons.stream()
.filter(person -> person.getGender() == Person.Gender.MALE)
.collect(ArrayList::new, ArrayList::add, ArrayList::addAll);
只要改成以下,就可能擁有平行處理之能力:
List<Person> males = persons.parallelStream()
.filter(person -> person.getGender() == Person.Gender.MALE)
.collect(ArrayList::new, ArrayList::add, ArrayList::addAll);
Collection
的 parallelStream
方法,傳回的 Stream
實例在實作時,會在可能的情況下進行平行處理,Java 希望你想進行平行處理時,必須有明確的語義,這也是為什麼會有 stream
與 parallelStream
兩個方法,前者代表循序(Serial)處理,後者代表平行處理,想要知道 Stream
是否為平行處理,可以呼叫 isParallel
來得知。
使用了 parallelStream
,不代表一定會平行處理而使得執行必然變快,要呼叫哪個方法,必須思考你的處理過程是否能夠分而治之(Divide and conquer)而後合併結果,在這個例子中,filter
與 collect
方法基本上都有可能。
類似地,Collectors
有 groupingBy
與 groupingByConcurrent
兩個方法,前者代表循序處理,後者代表平行處理,是否呼叫後者,同樣你得思考處理過程是否能夠分而治之而後合併結果,如果可能,方能從中獲益。例如原先有段程式:
Map<Person.Gender, List<Person>> males = persons.stream()
.collect(
groupingBy(Person::getGender));
想要在可能的情況下進行平行處理,可以改為:
Map<Person.Gender, List<Person>> males = persons.parallelStream()
.collect(
groupingByConcurrent(Person::getGender));
Stream
實例若具有平行處理能力,處理過程會分而治之,也就是將任務切割為小任務,這表示每個小任務都是一個管線化操作,因此像以下的程式片段:
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
numbers.parallelStream()
.forEach(out::println);
你得到的顯示順序不會是 1、2、3、4、5、6、7、8、9,而可能是任意的順序,就 forEach
這個終結操作來說,如果於平行處理時,希望最後順序是照著原來 Stream
來源的順序,那可以呼叫 forEachOrdered
。例如:
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
numbers.parallelStream()
.forEachOrdered(out::println);
在管線化操作時,如果 forEachOrdered
中間有其他如 filter
的中介操作,會試著平行化處理,然後最終 forEachOrdered
會以來源順序處理,因此,使用 forEachOrdered
這類的有序的處理時,可能會(或完全失去)失去平行化的一些優勢,實際上中介操作亦有可能如此,例如 sorted
方法。
使用 Stream
的 reduce
與 collect
時,平行處理時也得留意一下順序,API 文件上基本上會記載終結操作時是否依來源順序,reduce
基本上是按照來源順序,而 collect
得視給予的 Collector
而定,在以下兩個例子,collect
都是依照來源順序處理:
List<Person> males = persons.parallelStream()
.filter(person -> person.getGender() == Person.Gender.MALE)
.collect(ArrayList::new, ArrayList::add, ArrayList::addAll);
List<Person> males = persons.parallelStream()
.filter(person -> person.getGender() == Person.Gender.MALE)
.collect(toList());
在 collect
操作時若想要有平行效果,必須符合以下三個條件:
Stream
必須有平行處理能力。- 傳入的
Collector
必須有Collector.Characteristics.CONCURRENT
特性。 Stream
是無序的或者是Collector
具有Collector.Characteristics.UNORDERED
特性。
想要知道 Collector
具有 Collector.Characteristics.UNORDERED
或 Collector.Characteristics.UNORDERED
,可以呼叫 Collector
的 characteristics
方法,平行處理的 Stream
基本上是無序的,如果不放心,可以呼叫 Stream
的 unordered
方法。
Colllector
具有 CONCURRENT
與 UNORDERED
特性的例子之一是 Collectors
的 groupingByConcurrent
方法傳回的實例,因此在最後順序不重要時,使用 groupingByConcurrent
來取代 groupingBy
方法,對效能上會有所幫助。
想要善用 JDK 提供的平行處理能力,你的資料處理過程必須能夠分而治之,而後將每個小任務的結果加以合併,這表示當 API 在處理小任務時,你不應該進行干預,例如:
numbers.parallelStream()
.filter(number -> {
numbers.add(7);
return number > 5;
})
.forEachOrdered(out::println);
無論是基於哪種理由,像這類對來源資料的干擾都令人困惑,實際上無論是否進行平行處理,這樣的程式都會引發 ConcurrentModifiedException
。
JDK 提供高階語義的管線化 API、在可能的情況下實現惰性、平行處理能力,目的之一是希望你思考處理的過程中,實際上是由哪些小任務組成,在過去,你可能基於(自我想像的)效能增進考量,在迴圈中做了很多件事,因而讓程式變得複雜,現在使用了高階 API,就要避免走回頭路。例如,過去你在寫 for
迴圈時,可能會順便做些動作,像是過濾元素做顯示的同時,將元素作個運算並收集在另一個清單中:
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
List<Integer> alsoLt = new ArrayList<>();
for(Integer number : numbers) {
if(number > 5) {
alsoLt.add(number + 10);
out.println(number);
}
}
使用高階 API 時,記得一次只做一件事:
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
List<Integer> biggerThan5 = numbers.stream()
.filter(number -> number > 5)
.collect(toList());
biggerThan5.forEach(out::println);
List<Integer> alsoLt = biggerThan5.stream()
.map(number -> number + 10)
.collect(toList());
避免寫出以下的程式:
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
List<Integer> alsoLt = new ArrayList<>();
numbers.stream()
.filter(number -> {
boolean isBiggerThan5 = number > 5;
if(isBiggerThan5) {
alsoLt.add(number + 10);
}
return isBiggerThan5;
})
.forEach(out::println);
這樣的程式不僅不易理解,如果你試圖進行平行化處理時:
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
List<Integer> alsoLt = new ArrayList<>();
numbers.parallelStream()
.filter(number -> {
boolean isBiggerThan5 = number > 5;
if(isBiggerThan5) {
alsoLt.add(number + 10);
}
return isBiggerThan5;
})
.forEachOrdered(out::println);
就會發現,alsoLt
的順序並不照著 numbers
的順序,然而上頭一次處理一個任務的版本,可以簡單地改為平行化版本:
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
List<Integer> biggerThan5 = numbers.parallelStream()
.filter(number -> number > 5)
.collect(toList());
biggerThan5.forEach(out::println);
List<Integer> alsoLt = biggerThan5.parallelStream()
.map(number -> number + 10)
.collect(toList());