Java 8 引入的 Stream API 為開發者提供了強大且清楚易懂的方式來處理資料串流集合。然而,隨著時間的推移,開發者們發現 Stream API 在某些複雜場景下仍然有不少的局限性。本文將介紹 JDK 23 Stream API 的重大進化如何去填補這些缺失的拼圖:JEP 473 的新功能 Stream Gatherers 串流聚集器。
JEP 473 Stream Gatherers 串流聚集器目的在加強 Stream API 的功能,使其能支援自定義的中間操作。這項新特性將允許開發者以更靈活的方式轉換資料流,解決了許多現有 Stream API 難以處理的複雜場景。接下來,讓我們深入探討串流聚集器會如何改變 Java 開發者處理資料串流的方式。
目錄
前言
Java 8 新增專為 lambda 表達式設計的第一個 API:Stream API(java.util.stream
)。串流是一個惰性計算的、可能無邊界的資料序列,而 Stream API 支援循序或並行處理串流的能力。一個串流管道由三個部分組成:元素的來源、任意數量的中間操作和一個終端操作。例如:
long numberOfWords =
Stream.of("the", "", "fox", "jumps", "over", "the", "", "dog") // (1) 建立串流
.filter(Predicate.not(String::isEmpty)) // (2) 中間操作:去除空字串
.collect(Collectors.counting()); // (3) 終端操作:計算
這種程式設計風格既有表現力又高效:使用建構器風格的 API、每個中間操作都會返回新的串流,並且在呼叫終端操作時才會開始計算。以上述的例子說明:
- 首先建立新的串流,但沒有計算它
- 接著設定中間的
filter
操作,但仍然沒有計算 - 最後的終端
collect
操作才計算整個串流管道
因此自從 Stream API 推出以來,它簡潔的語法和強大的功能贏得了許多開發者的青睞。然而,隨著場景的日益複雜化,開發者逐漸發現 Stream API 存在著局限性。例如某些複雜的資料轉換操作無法直接通過現有的中間操作實現,導致開發者在某些場景時不得不放棄使用 Stream API,改回原本所使用、較為繁瑣的程式碼。
Stream API 的局限性
Stream API 提供了相當豐富的特定中間和終端操作:映射、過濾、去重、歸約、排序等等。特定的中間操作集意味著無法輕易且彈性地完成一些較複雜的任務,可能是所需的中間操作不存在,或是中間操作存在但不能直接支援該任務。
一個典型的例子是,當我們需要基於特定的元素屬性去刪除重複資料(deduplication)時,現有的 distinct
方法就顯得力不從心。因為它依據的是呼叫 equals
方法的傳回值來判斷是否應該要「去重」,無法依據物件中特殊或任意的欄位資訊條件來做比對。例如,我們想使用字串長度進行去重,使每個長度的字串最多只有一個。理想情況下,程式碼看起來像這樣:
var result = Stream.of("foo", "bar", "baz", "quux", "ab", "test", "cd", "ef")
.distinctBy(String::length) // 依字串長度去重,各長度都只留第一筆
.toList();
// result ==> [foo, quux, ab]
不幸的是,Stream API 並不支援這樣的中間操作。最接近的 distinct
是用物件相等性來比較元素,無法改用字串長度的相等性。一個可能的解法是建立 record
類別去包裝每個字串,並覆寫 equals
方法使得 distinct
間接地比對字串長度。然而,這種來回轉換的表達方式並不直觀,也使得程式碼難以維護:
record DistinctByLength(String str) {
@Override public boolean equals(Object obj) {
return obj instanceof DistinctByLength(String other)
&& str.length() == other.length();
}
@Override public int hashCode() {
return str == null ? 0 : Integer.hashCode(str.length());
}
}
var result = Stream.of("foo", "bar", "baz", "quux", "ab", "test", "cd", "ef")
.map(DistinctByLength::new)
.distinct()
.map(DistinctByLength::str)
.toList();
// result ==> [foo, quux, ab]
另一個常見的痛點是處理滑動窗口或固定大小的分組操作。假設任務是將所有元素分成三個一組,但只保留前兩組,因此 [0, 1, 2, 3, 4, 5, 6, ...]
應該輸出 [[0, 1, 2], [3, 4, 5]]
。理想情況下程式碼看起來像這樣:
var result = Stream.iterate(0, i -> i + 1)
.windowFixed(3) // 三個一組
.limit(2)
.toList();
// result ==> [[0, 1, 2], [3, 4, 5]]
然而,目前並沒有任何內建的中間操作支援此任務。其中一個解法是在 limit
操作時預先計算好要留下多少數量的元素,然後將固定窗口分組的邏輯放在終端 collect
中使用自定義的 Collector.of
去完成。此外,由於資料流是有順序的數字,無法並行處理,所以如果它的 combiner
被呼叫,那麼需要拋出異常。這樣一來使得程式碼更難以理解:
var result = Stream.iterate(0, i -> i + 1)
.limit(3 * 2) // 必須自行將元素總數計算好
.collect(Collector.of(
() -> new ArrayList<ArrayList<Integer>>(), // 建立最終結果
(groups, element) -> {
// 若最終結果為空或是其中最後一組的元素數量為3
if (groups.isEmpty() || groups.getLast().size() == 3) {
var current = new ArrayList<Integer>();
current.add(element);
groups.addLast(current); // 建立新的一組並加到最終結果裡
} else {
groups.getLast().add(element); // 否則將元素加到現有的最後一組
}
},
(left, right) -> { // combiner 不能被呼叫,因為不能並行處理
throw new UnsupportedOperationException("Cannot be parallelized");
}
));
// result ==> [[0, 1, 2], [3, 4, 5]]
這些局限性使得某些資料處理的任務變得複雜。開發人員往往需要額外撰寫程式碼處理,甚至放棄使用 Stream API,轉而採用傳統的集合迭代方式。這不僅增加了程式碼的複雜度,也降低了程式的可讀性和維護性。
JEP 473 概觀
Stream Gatherer 串流聚集器是 JEP 473 引入的核心概念:Stream::gather(Gatherer)
。它可以建構高效且並行的串流操作,並讓開發者自定義許多中間操作,實現複雜的邏輯以達成原本不易實作的資料轉換。聚集器的設計靈感來自於 Collector 收集器,但更加靈活與強大。Stream::gather(Gatherer)
之於中間操作,就像 Stream::collect(Collector)
之於終端操作。
聚集器的介面是 java.util.stream.Gatherer
。它能夠以一對一、一對多、多對一或多對多的方式轉換元素。聚集器可以記錄先前看過的元素,並用其來進行後續元素的轉換;也可以將無限串流中斷轉換為有限串流,並且使用並行處理。
優點
- 允許開發者定義幾乎任何複雜的中間操作,大幅地擴展了 Stream API 的應用範圍
- 通過使用自定義的聚集器可以讓串流管道的意圖更加清晰,並提高代碼的可讀性
- 支援並行處理,可以充分利用多核處理器的優勢,提高處理效率
缺點
- 對於不熟悉函數式程式設計或 Stream API 的開發者來說,理解並使用聚集器可能需要一定的學習成本
- 雖然它簡化了某些複雜操作,但自定義聚集器本身可能會引入新的複雜性,導致程式碼難以閱讀與維護
介紹
基礎組件與流程
JEP 473 中的聚集器由四個主要組件一起協同工作:
initializer
(可選):建立初始物件,用來在處理串流元素時維護內部狀態,例如暫存、計數、緩衝等。舉例來說,聚集器可以儲存當前的元素,用來和下一個新元素進行比較,如此一來可將兩個輸入元素轉換為一個輸出元素integrator
(必要):可以從輸入串流接收新元素、檢查或更新內部狀態、新增輸出元素到下游串流,甚至可以在輸入串流的末尾到達之前終止處理串流。例如:搜尋整數串流中最大值的聚集器,如果檢測到Integer.MAX_VALUE
時可以立即停止combiner
(可選):在輸入串流被標記為並行處理時用來檢查或合併聚集器內部狀態。即使聚集器不具備並行能力,它仍然可以成為串流管道的一部分並對串流做循序評估,這對於因循序操作而無法並行化的情況很有幫助finisher
(可選):當串流中沒有剩餘的輸入元素時做最終處理。它可以用來檢查內部狀態物件,並生成額外的輸出元素,或是拋出異常
當 Stream::gather
被呼叫時會執行下列的等效步驟:
- 創建一個
Downstream
物件,當給定聚集器的輸出類型的元素時,將其傳遞給管道中的下一個階段 - 呼叫 initializer 的
get()
方法以取得聚集器的內部狀態物件state
- 處理輸入元素並利用
integrator()
方法取得 integrator 後,呼叫其integrate(...)
方法,將state
、下一個元素和 downstream 物件傳遞給它- 如果返回
false
則終止處理
- 如果返回
- 當串流結束或終止處理時,取得 finisher 並傳遞
state
和downstream
物件給其accept()
方法
Gatherer.Downstream<? super R> downstream = ...; // 1
A state = gatherer.initializer().get(); // 2
for (T t : data) {
gatherer.integrator().integrate(state, t, downstream); // 3
}
gatherer.finisher().accept(state, downstream); // 4
目前 Stream
介面中宣告的每個中間操作都可以用聚集器來實現。例如,給定一個 T 型別的元素串流,Stream::map
可以通過某個函數將每個 T 元素轉換為 U 元素,並將 U 元素向下傳遞;它可以用一個無狀態的一對一聚集器來實作。再舉一個例子,Stream::filter
接受一個條件式並決定是否應將輸入元素向下傳遞;這也只是一個無狀態的一對多聚集器。事實上,每個串流管道在概念上都等同於下式:
source.gather(...).gather(...).gather(...).collect(...)
內建 Gatherer 聚集器
java.util.stream.Gatherers
類別中引入了以下內建的聚集器:
名稱 | 對應 | 功能 | 使用場景 |
---|---|---|---|
fold | 多對一 | 消化元素、更新內部狀態,並計算結果 | 平均值、總和、計數、最大值 |
scan | 一對一 | 消化元素、將其與內部狀態整合,並計算 累積中間值並產生新元素 | 前綴和、累積乘積、費式數列 |
mapConcurrent | 一對一 | 對每個輸入元素並行地呼叫函式, 最多達到設定的上限 | 下載、讀檔、圖片運算 |
windowFixed | 多對多 | 將元素依數量分組,在窗口滿時向下傳遞 | 批次處理、緩衝區讀取 |
windowSliding | 多對多 | 將元素依數量分組。每個後續窗口會將前 一窗口的首元素刪除後附加下個輸入元素 | 移動平均、局部最大值 |
fold
與 scan
的差別
這兩個聚集器的功能有些類似,主要差別是 fold
會將多個輸入元素整合後,產生單一輸出結果;而 scan
在處理每一個輸入元素時,會參考內部狀態,並產生新的輸出結果。以下的例子說明 fold
與 scan
的不同:
Stream.of(1, 2, 3, 4)
.gather(Gatherers.fold(()->0, (a,b)->a+b)) // 相加
.toList();
// 輸出單一結果:計算總和為 [10]
Stream.of(1, 2, 3, 4)
.gather(Gatherers.scan(()->0, (a,b)->a+b)) // 相加
.toList();
// 輸出連續結果:計算前綴和為 [1, 3, 6, 10]
windowFixed
與 windowSliding
的差別
兩個聚集器都是將輸入元素依給定的大小去分組,但是分組的邏輯有些不同。例如輸入串流是 [1, 2, 3, 4]
,窗口大小為 2 來說:windowFixed
是窗口滿額後就送出並清空,因此輸出串流為 [[1, 2], [3, 4]]
;而 windowSliding
雖然也是窗口滿額後送出,但它會滑動地刪除第一個元素後附加下一個輸入元素,因此輸出串流為 [[1, 2], [2, 3], [3, 4]]
。
Stream.of(1, 2, 3, 4)
.gather(Gatherers.scan(() -> 0, (a, b) -> a + b)) //scan: [1, 3, 6, 10]
.gather(Gatherers.windowFixed(2))
.map(w -> (w.get(0) + w.get(1)) / 2.0) // 需為偶數
.toList();
// 區間平均: [2.0, 8.0]
Stream.of(1, 2, 3, 4)
.gather(Gatherers.scan(() -> 0, (a, b) -> a + b)) //scan: [1, 3, 6, 10]
.gather(Gatherers.windowSliding(2))
.map(w -> (w.get(0) + w.get(1)) / 2.0)
.toList();
// 移動平均: [2.0, 4.5, 8.0]
聚集器組合
在上例中,我們連續使用了兩個聚集器(fold
和 windowSliding
)來完成任務。除此之外,我們也可以使用 andThen(Gatherer)
方法來連接多個聚合器,使得前一個聚集器的輸出串流成為下一個聚集器的輸入串流,以藉此建立更複雜的任務流程。
Stream.of(1, 2, 3, 4)
.gather( Gatherers.scan( () -> 0, (Integer a, Integer b) -> a + b ) )
.gather( Gatherers.windowSliding(2) )
.toList();
// 與上例等價的語法
Stream.of(1, 2, 3, 4)
.gather( Gatherers.scan( () -> 0, (Integer a, Integer b) -> a + b )
.andThen( Gatherers.windowSliding(2) )
)
.toList();
聚集器(Gatherer)與收集器(Collector)
Gatherer
介面的設計深受 Collector
設計的影響。主要區別在於:
Gatherer
使用Integrator
而不是BiConsumer
進行每個元素的處理,因為它需要一個額外的輸入參數用於Downstream
物件,並且它需要返回一個布林值來指示是否應該繼續處理。Gatherer
的finisher
使用BiConsumer
而不是Function
,因為它需要一個額外的輸入參數用於其Downstream
物件,並且它無法返回結果
擁抱串流
缺乏合適的中間操作會迫使我們無法使用 Stream API 去處理串流資料。例如我們有一個按時間順序排列的溫度讀數串流:
record Reading(Instant obtainedAt, int kelvins) { // 時間加溫度的記錄
Reading(String time, int kelvins) { this(Instant.parse(time), kelvins); }
}
// 可以從檔案、資料庫、服務或其他方式持續讀取資料
Stream<Reading> loadRecentReadings() {
return Stream.of(
new Reading("2023-09-21T10:15:33.00Z", 310)
new Reading("2023-09-21T10:15:30.00Z", 310),
new Reading("2023-09-21T10:15:31.00Z", 312),
new Reading("2023-09-21T10:15:32.00Z", 350), // 模擬異常資料
new Reading("2023-09-21T10:15:33.00Z", 310)
);
}
我們希望檢測此串流中的可疑變化。如果五秒鐘的時間窗口內,兩個連續讀數之間的溫度變化差超過 30° Kelvin 時能夠發出警示:
boolean isSuspicious(Reading previous, Reading next) {
return next.obtainedAt().isBefore(previous.obtainedAt().plusSeconds(5))
&& (next.kelvins() > previous.kelvins() + 30
|| next.kelvins() < previous.kelvins() - 30);
}
因為沒有合適的中間操作,所以我們必須放棄串流處理,改以命令式的方式實現我們的分析:
List<List<Reading>> findSuspicious(Stream<Reading> source) {
var suspicious = new ArrayList<List<Reading>>();
Reading previous = null;
for (Reading next : source.toList()) { // 必須把串流轉成 List 處理
if (previous != null && isSuspicious(previous, next)) { // 如果發現的話
suspicious.add(List.of(previous, next)); //加到結果中
}
previous = next;
}
return suspicious;
}
// 執行並取得結果
var result = findSuspicious(loadRecentReadings());
輸出結果如下:
[[Reading[obtainedAt=2023-09-21T10:15:31Z, kelvins=312],
Reading[obtainedAt=2023-09-21T10:15:32Z, kelvins=350]],
[Reading[obtainedAt=2023-09-21T10:15:32Z, kelvins=350],
Reading[obtainedAt=2023-09-21T10:15:33Z, kelvins=310]]]
然而,有了聚集器之後我們可以更簡潔且優雅地改寫原本的 findSuspicious
方法。首先使用內建的 windowSliding
建立大小為 2 的滑動窗口,然後用 filter
操作來保留滿足條件的窗口,最後將結果收集到一個 List
中。這樣的寫法更貼近串流處理的風格,也避免了前面手動迭代和狀態管理的複雜性。
List<List<Reading>> findSuspicious2(Stream<Reading> source) {
return source.gather(Gatherers.windowSliding(2)) // 建立大小為 2 的滑動窗口
.filter(window -> (window.size() == 2 // 篩選包含兩個讀數的窗口並檢查
&& isSuspicious(window.get(0), window.get(1))))
.toList(); // 將結果收集到一個 List 中
}
// 執行並取得結果
findSuspicious2(loadRecentReadings());
自定義聚集器
雖然聚集器中已經提供了幾個內建的中間操作,但仍無法覆蓋所有的使用場景。不過,依據 JEP 473 目前的規格,我們可以建立自定義的聚集器,以滿足需求並處理各式複雜的操作。讓我們回顧一下文章開頭時所提到的例子:依字串長度去重。
建立臨時性的聚集器
我們可以使用 Gatherer.of(...)
(平行處理)或 Gatherer.ofSequential(...)
(循序處理)等工廠方法去建立臨時性、一次性的聚集器。像是下面的程式碼可以快速地建立一個較為限縮、僅接受字串串流、並且僅能使用字串長度來去重的聚集器:
Gatherer<String, ?, String> deduplicatorByLength = Gatherer.ofSequential(
HashMap<Integer, String>::new, // 初始器:內部狀態,用來記錄每個字串長度所遇到的第一個字串
Gatherer.Integrator.ofGreedy((map, element, downstream) -> // 整合器
map.putIfAbsent(element.length(), element) == null // 如果該長度沒有值的話就暫存至內部狀態
? downstream.push(element) // 如果該長度沒有值的話就輸出到下游串流
: true
)
);
var result = Stream.of("foo", "bar", "baz", "quux", "ab", "test", "cd", "ef")
.gather(deduplicatorByLength) // 依字串長度來進行去重,各長度的字串都只留第一筆
.toList();
// result ==> [foo, quux, ab]
實作 Gatherer
介面的聚集器
根據上面的範例做延伸,我們可以實作 Gatherer 介面,使其可以接受更多元化型別的串流,並且能夠彈性地依據不同的條件來做去重處理:
record Deduplicator<T, U>(Function<T, U> fun) implements Gatherer<T, Map<U, T>, T> {
@Override
public Supplier<Map<U, T>> initializer() {
return HashMap<U, T>::new; // 內部狀態使用 map 儲存
}
@Override
public Gatherer.Integrator<Map<U, T>, T, T> integrator() { // 與上述程式碼相同
return Gatherer.Integrator.ofGreedy((map, element, downstream) ->
map.putIfAbsent(fun.apply(element), element) == null // 改用傳入的 lambda 判斷
? downstream.push(element)
: true
);
}
// combiner 和 finisher 已省略
// 建構器
public static <T, U> Deduplicator<T, U> of(Function<T, U> fun) {
return new Deduplicator<>(fun);
}
}
var result = Stream.of("foo", "bar", "baz", "quux", "ab", "test", "cd", "ef")
.gather(Deduplicator.of(String::length)) // 使用字串長度做去重
.toList();
// result ==> [foo, quux, ab]
var result = Stream.of("foo", "bar", "BAR", "BAZ", "baz", "Foo", "FOO", "Ab", "AB")
.gather(Deduplicator.of(String::toUpperCase)) // 轉成大寫字母後比對
.toList();
// result ==> [foo, bar, BAZ, Ab]
總結
JEP 473 引入的串流聚集器是非常重要的進展,為 Stream API 帶來了更靈活更強大的工具,讓開發者去處理複雜的資料轉換和處理邏輯,填補了現有 Stream API 的空白。聚集器對於處理串流進行精細控制和優化的場景特別有用,我們可以更優雅地實現諸如自定義去重、滑動窗口、複雜聚合等操作。這些操作在過去可能需要繁瑣的樣板程式碼,或是必須捨棄 Stream API 才能實現。
與任何強大的工具一樣,聚集器的使用需要謹慎。開發者應該權衡使用聚集器的複雜性與收益,並在適當的場景下合理使用。可以預期的是,串流聚集器將成為 Java 開發者工具箱中的重要工具,進一步提升 Java 在資料處理方面的能力和效率。雖然目前 JEP 473 仍在處於預覽階段,應該很快就會成為正式功能!
本篇文章的內容為老喬原創、二創或翻譯而來。雖已善盡校對、順稿與查核義務,但人非聖賢,多少仍會有疏漏之處難以避免。如果大家有任何問題、建議或指教,都歡迎在底下留言與老喬討論!