JDK 23 功能:JEP 473 Stream Gatherer 串流聚集器

JEP 473: Stream Gatherers

Java 8 引入的 Stream API 為開發者提供了強大且清楚易懂的方式來處理資料串流集合。然而,隨著時間的推移,開發者們發現 Stream API 在某些複雜場景下仍然有不少的局限性。本文將介紹 JDK 23 Stream API 的重大進化如何去填補這些缺失的拼圖:JEP 473 的新功能 Stream Gatherers 串流聚集器

JEP 473 Stream Gatherers 串流聚集器目的在加強 Stream API 的功能,使其能支援自定義的中間操作。這項新特性將允許開發者以更靈活的方式轉換資料流,解決了許多現有 Stream API 難以處理的複雜場景。接下來,讓我們深入探討串流聚集器會如何改變 Java 開發者處理資料串流的方式。

前言

Java 8 新增專為 lambda 表達式設計的第一個 API:Stream API(java.util.stream)。串流是一個惰性計算的、可能無邊界的資料序列,而 Stream API 支援循序或並行處理串流的能力。一個串流管道由三個部分組成:元素的來源、任意數量的中間操作和一個終端操作。例如:

long numberOfWords =
    Stream.of("the", "", "fox", "jumps", "over", "the", "", "dog")  // (1) 建立串流
          .filter(Predicate.not(String::isEmpty))                   // (2) 中間操作:去除空字串
          .collect(Collectors.counting());                          // (3) 終端操作:計算

這種程式設計風格既有表現力又高效:使用建構器風格的 API、每個中間操作都會返回新的串流,並且在呼叫終端操作時才會開始計算。以上述的例子說明:

  1. 首先建立新的串流,但沒有計算它
  2. 接著設定中間的 filter 操作,但仍然沒有計算
  3. 最後的終端 collect 操作才計算整個串流管道

因此自從 Stream API 推出以來,它簡潔的語法和強大的功能贏得了許多開發者的青睞。然而,隨著場景的日益複雜化,開發者逐漸發現 Stream API 存在著局限性。例如某些複雜的資料轉換操作無法直接通過現有的中間操作實現,導致開發者在某些場景時不得不放棄使用 Stream API,改回原本所使用、較為繁瑣的程式碼。

Stream API 的局限性

Stream API 提供了相當豐富的特定中間和終端操作:映射、過濾、去重、歸約、排序等等。特定的中間操作集意味著無法輕易且彈性地完成一些較複雜的任務,可能是所需的中間操作不存在,或是中間操作存在但不能直接支援該任務。

一個典型的例子是,當我們需要基於特定的元素屬性去刪除重複資料(deduplication)時,現有的 distinct 方法就顯得力不從心。因為它依據的是呼叫 equals 方法的傳回值來判斷是否應該要「去重」,無法依據物件中特殊或任意的欄位資訊條件來做比對。例如,我們想使用字串長度進行去重,使每個長度的字串最多只有一個。理想情況下,程式碼看起來像這樣:

var result = Stream.of("foo", "bar", "baz", "quux", "ab", "test", "cd", "ef")
                   .distinctBy(String::length)   // 依字串長度去重,各長度都只留第一筆
                   .toList();
// result ==> [foo, quux, ab]

不幸的是,Stream API 並不支援這樣的中間操作。最接近的 distinct 是用物件相等性來比較元素,無法改用字串長度的相等性。一個可能的解法是建立 record 類別去包裝每個字串,並覆寫 equals 方法使得 distinct 間接地比對字串長度。然而,這種來回轉換的表達方式並不直觀,也使得程式碼難以維護:

record DistinctByLength(String str) {
    @Override public boolean equals(Object obj) {
        return obj instanceof DistinctByLength(String other)
               && str.length() == other.length();
    }

    @Override public int hashCode() {
        return str == null ? 0 : Integer.hashCode(str.length());
    }
}

var result = Stream.of("foo", "bar", "baz", "quux", "ab", "test", "cd", "ef")
                   .map(DistinctByLength::new)
                   .distinct()
                   .map(DistinctByLength::str)
                   .toList();
// result ==> [foo, quux, ab]

另一個常見的痛點是處理滑動窗口或固定大小的分組操作。假設任務是將所有元素分成三個一組,但只保留前兩組,因此 [0, 1, 2, 3, 4, 5, 6, ...] 應該輸出 [[0, 1, 2], [3, 4, 5]]。理想情況下程式碼看起來像這樣:

var result = Stream.iterate(0, i -> i + 1)
                   .windowFixed(3)  // 三個一組
                   .limit(2)
                   .toList();
// result ==> [[0, 1, 2], [3, 4, 5]]

然而,目前並沒有任何內建的中間操作支援此任務。其中一個解法是在 limit 操作時預先計算好要留下多少數量的元素,然後將固定窗口分組的邏輯放在終端 collect 中使用自定義的 Collector.of 去完成。此外,由於資料流是有順序的數字,無法並行處理,所以如果它的 combiner 被呼叫,那麼需要拋出異常。這樣一來使得程式碼更難以理解:

var result = Stream.iterate(0, i -> i + 1)
            .limit(3 * 2)    // 必須自行將元素總數計算好
            .collect(Collector.of(
                () -> new ArrayList<ArrayList<Integer>>(),    // 建立最終結果
                (groups, element) -> {
                    // 若最終結果為空或是其中最後一組的元素數量為3
                    if (groups.isEmpty() || groups.getLast().size() == 3) {
                         var current = new ArrayList<Integer>();
                         current.add(element);
                         groups.addLast(current);    // 建立新的一組並加到最終結果裡
                     } else {
                         groups.getLast().add(element);  // 否則將元素加到現有的最後一組
                     }
                },
                (left, right) -> {  // combiner 不能被呼叫,因為不能並行處理
                    throw new UnsupportedOperationException("Cannot be parallelized");
                }
            ));
// result ==> [[0, 1, 2], [3, 4, 5]]

這些局限性使得某些資料處理的任務變得複雜。開發人員往往需要額外撰寫程式碼處理,甚至放棄使用 Stream API,轉而採用傳統的集合迭代方式。這不僅增加了程式碼的複雜度,也降低了程式的可讀性和維護性。

JEP 473 概觀

Stream Gatherer 串流聚集器是 JEP 473 引入的核心概念:Stream::gather(Gatherer)。它可以建構高效且並行的串流操作,並讓開發者自定義許多中間操作,實現複雜的邏輯以達成原本不易實作的資料轉換。聚集器的設計靈感來自於 Collector 收集器,但更加靈活與強大。Stream::gather(Gatherer) 之於中間操作,就像 Stream::collect(Collector) 之於終端操作。

聚集器的介面是 java.util.stream.Gatherer。它能夠以一對一、一對多、多對一或多對多的方式轉換元素。聚集器可以記錄先前看過的元素,並用其來進行後續元素的轉換;也可以將無限串流中斷轉換為有限串流,並且使用並行處理。

優點

  • 允許開發者定義幾乎任何複雜的中間操作,大幅地擴展了 Stream API 的應用範圍
  • 通過使用自定義的聚集器可以讓串流管道的意圖更加清晰,並提高代碼的可讀性
  • 支援並行處理,可以充分利用多核處理器的優勢,提高處理效率

缺點

  • 對於不熟悉函數式程式設計或 Stream API 的開發者來說,理解並使用聚集器可能需要一定的學習成本
  • 雖然它簡化了某些複雜操作,但自定義聚集器本身可能會引入新的複雜性,導致程式碼難以閱讀與維護

介紹

基礎組件與流程

JEP 473 中的聚集器由四個主要組件一起協同工作:

  • initializer(可選):建立初始物件,用來在處理串流元素時維護內部狀態,例如暫存、計數、緩衝等。舉例來說,聚集器可以儲存當前的元素,用來和下一個新元素進行比較,如此一來可將兩個輸入元素轉換為一個輸出元素
  • integrator必要):可以從輸入串流接收新元素、檢查或更新內部狀態、新增輸出元素到下游串流,甚至可以在輸入串流的末尾到達之前終止處理串流。例如:搜尋整數串流中最大值的聚集器,如果檢測到 Integer.MAX_VALUE 時可以立即停止
  • combiner(可選):在輸入串流被標記為並行處理時用來檢查或合併聚集器內部狀態。即使聚集器不具備並行能力,它仍然可以成為串流管道的一部分並對串流做循序評估,這對於因循序操作而無法並行化的情況很有幫助
  • finisher(可選):當串流中沒有剩餘的輸入元素時做最終處理。它可以用來檢查內部狀態物件,並生成額外的輸出元素,或是拋出異常

Stream::gather 被呼叫時會執行下列的等效步驟:

  1. 創建一個 Downstream 物件,當給定聚集器的輸出類型的元素時,將其傳遞給管道中的下一個階段
  2. 呼叫 initializer 的 get() 方法以取得聚集器的內部狀態物件 state
  3. 處理輸入元素並利用 integrator() 方法取得 integrator 後,呼叫其 integrate(...) 方法,將 state、下一個元素和 downstream 物件傳遞給它
    • 如果返回 false 則終止處理
  4. 當串流結束或終止處理時,取得 finisher 並傳遞 statedownstream 物件給其 accept() 方法
Gatherer.Downstream<? super R> downstream = ...;  // 1
A state = gatherer.initializer().get();  // 2
for (T t : data) {
    gatherer.integrator().integrate(state, t, downstream);  // 3
}
gatherer.finisher().accept(state, downstream);  // 4

目前 Stream 介面中宣告的每個中間操作都可以用聚集器來實現。例如,給定一個 T 型別的元素串流,Stream::map 可以通過某個函數將每個 T 元素轉換為 U 元素,並將 U 元素向下傳遞;它可以用一個無狀態的一對一聚集器來實作。再舉一個例子,Stream::filter 接受一個條件式並決定是否應將輸入元素向下傳遞;這也只是一個無狀態的一對多聚集器。事實上,每個串流管道在概念上都等同於下式:

source.gather(...).gather(...).gather(...).collect(...)

內建 Gatherer 聚集器

java.util.stream.Gatherers 類別中引入了以下內建的聚集器:

名稱對應功能使用場景
fold多對一消化元素、更新內部狀態,並計算結果平均值、總和、計數、最大值
scan一對一消化元素、將其與內部狀態整合,並計算
累積中間值並產生新元素
前綴和、累積乘積、費式數列
mapConcurrent一對一對每個輸入元素並行地呼叫函式,
最多達到設定的上限
下載、讀檔、圖片運算
windowFixed多對多將元素依數量分組,在窗口滿時向下傳遞批次處理、緩衝區讀取
windowSliding多對多將元素依數量分組。每個後續窗口會將前
一窗口的首元素刪除後附加下個輸入元素
移動平均、局部最大值

foldscan 的差別

這兩個聚集器的功能有些類似,主要差別是 fold 會將多個輸入元素整合後,產生單一輸出結果;而 scan 在處理每一個輸入元素時,會參考內部狀態,並產生新的輸出結果。以下的例子說明 foldscan 的不同:

Stream.of(1, 2, 3, 4)
      .gather(Gatherers.fold(()->0, (a,b)->a+b))  // 相加
      .toList();
// 輸出單一結果:計算總和為 [10]

Stream.of(1, 2, 3, 4)   
      .gather(Gatherers.scan(()->0, (a,b)->a+b))  // 相加
      .toList();
// 輸出連續結果:計算前綴和為 [1, 3, 6, 10]

windowFixedwindowSliding 的差別

兩個聚集器都是將輸入元素依給定的大小去分組,但是分組的邏輯有些不同。例如輸入串流是 [1, 2, 3, 4],窗口大小為 2 來說:windowFixed 是窗口滿額後就送出並清空,因此輸出串流為 [[1, 2], [3, 4]];而 windowSliding 雖然也是窗口滿額後送出,但它會滑動地刪除第一個元素後附加下一個輸入元素,因此輸出串流為 [[1, 2], [2, 3], [3, 4]]

Stream.of(1, 2, 3, 4)
      .gather(Gatherers.scan(() -> 0, (a, b) -> a + b))  //scan: [1, 3, 6, 10]
      .gather(Gatherers.windowFixed(2))
      .map(w -> (w.get(0) + w.get(1)) / 2.0) // 需為偶數
      .toList();
// 區間平均: [2.0, 8.0]

Stream.of(1, 2, 3, 4)
      .gather(Gatherers.scan(() -> 0, (a, b) -> a + b))  //scan: [1, 3, 6, 10]
      .gather(Gatherers.windowSliding(2))
      .map(w -> (w.get(0) + w.get(1)) / 2.0)
      .toList();
// 移動平均: [2.0, 4.5, 8.0]

聚集器組合

在上例中,我們連續使用了兩個聚集器(foldwindowSliding)來完成任務。除此之外,我們也可以使用 andThen(Gatherer) 方法來連接多個聚合器,使得前一個聚集器的輸出串流成為下一個聚集器的輸入串流,以藉此建立更複雜的任務流程。

Stream.of(1, 2, 3, 4)
      .gather( Gatherers.scan( () -> 0, (Integer a, Integer b) -> a + b ) )
      .gather( Gatherers.windowSliding(2) )
      .toList();

// 與上例等價的語法
Stream.of(1, 2, 3, 4)
      .gather( Gatherers.scan( () -> 0, (Integer a, Integer b) -> a + b )
                        .andThen( Gatherers.windowSliding(2) )
      )
      .toList();

聚集器(Gatherer)與收集器(Collector)

Gatherer 介面的設計深受 Collector 設計的影響。主要區別在於:

  • Gatherer 使用 Integrator 而不是 BiConsumer 進行每個元素的處理,因為它需要一個額外的輸入參數用於 Downstream 物件,並且它需要返回一個布林值來指示是否應該繼續處理。
  • Gathererfinisher 使用 BiConsumer 而不是 Function,因為它需要一個額外的輸入參數用於其 Downstream 物件,並且它無法返回結果

擁抱串流

缺乏合適的中間操作會迫使我們無法使用 Stream API 去處理串流資料。例如我們有一個按時間順序排列的溫度讀數串流:

record Reading(Instant obtainedAt, int kelvins) {  // 時間加溫度的記錄
    Reading(String time, int kelvins) { this(Instant.parse(time), kelvins); }
}

// 可以從檔案、資料庫、服務或其他方式持續讀取資料
Stream<Reading> loadRecentReadings() {
    return Stream.of(
            new Reading("2023-09-21T10:15:33.00Z", 310)
            new Reading("2023-09-21T10:15:30.00Z", 310),
            new Reading("2023-09-21T10:15:31.00Z", 312),
            new Reading("2023-09-21T10:15:32.00Z", 350),  // 模擬異常資料
            new Reading("2023-09-21T10:15:33.00Z", 310)
    );
}

我們希望檢測此串流中的可疑變化。如果五秒鐘的時間窗口內,兩個連續讀數之間的溫度變化差超過 30° Kelvin 時能夠發出警示:

boolean isSuspicious(Reading previous, Reading next) {
    return next.obtainedAt().isBefore(previous.obtainedAt().plusSeconds(5))
           && (next.kelvins() > previous.kelvins() + 30
               || next.kelvins() < previous.kelvins() - 30);
}

因為沒有合適的中間操作,所以我們必須放棄串流處理,改以命令式的方式實現我們的分析:

List<List<Reading>> findSuspicious(Stream<Reading> source) {
    var suspicious = new ArrayList<List<Reading>>();
    Reading previous = null;
    for (Reading next : source.toList()) {  // 必須把串流轉成 List 處理
        if (previous != null && isSuspicious(previous, next)) {  // 如果發現的話
            suspicious.add(List.of(previous, next));  //加到結果中
        }
        previous = next;
    }
    return suspicious;
}

// 執行並取得結果
var result = findSuspicious(loadRecentReadings());

輸出結果如下:

[[Reading[obtainedAt=2023-09-21T10:15:31Z, kelvins=312],
  Reading[obtainedAt=2023-09-21T10:15:32Z, kelvins=350]],
 [Reading[obtainedAt=2023-09-21T10:15:32Z, kelvins=350],
  Reading[obtainedAt=2023-09-21T10:15:33Z, kelvins=310]]]

然而,有了聚集器之後我們可以更簡潔且優雅地改寫原本的 findSuspicious 方法。首先使用內建的 windowSliding 建立大小為 2 的滑動窗口,然後用 filter 操作來保留滿足條件的窗口,最後將結果收集到一個 List 中。這樣的寫法更貼近串流處理的風格,也避免了前面手動迭代和狀態管理的複雜性。

List<List<Reading>> findSuspicious2(Stream<Reading> source) {
    return source.gather(Gatherers.windowSliding(2)) // 建立大小為 2 的滑動窗口
                 .filter(window -> (window.size() == 2 // 篩選包含兩個讀數的窗口並檢查
                                    && isSuspicious(window.get(0), window.get(1))))
                 .toList(); // 將結果收集到一個 List 中
}

// 執行並取得結果
findSuspicious2(loadRecentReadings());

自定義聚集器

雖然聚集器中已經提供了幾個內建的中間操作,但仍無法覆蓋所有的使用場景。不過,依據 JEP 473 目前的規格,我們可以建立自定義的聚集器,以滿足需求並處理各式複雜的操作。讓我們回顧一下文章開頭時所提到的例子:依字串長度去重

建立臨時性的聚集器

我們可以使用 Gatherer.of(...) (平行處理)或 Gatherer.ofSequential(...) (循序處理)等工廠方法去建立臨時性、一次性的聚集器。像是下面的程式碼可以快速地建立一個較為限縮、僅接受字串串流、並且僅能使用字串長度來去重的聚集器:

Gatherer<String, ?, String> deduplicatorByLength = Gatherer.ofSequential(
	HashMap<Integer, String>::new,  // 初始器:內部狀態,用來記錄每個字串長度所遇到的第一個字串
    Gatherer.Integrator.ofGreedy((map, element, downstream) ->  // 整合器
        	map.putIfAbsent(element.length(), element) == null  // 如果該長度沒有值的話就暫存至內部狀態
        		? downstream.push(element)  // 如果該長度沒有值的話就輸出到下游串流
        		: true
    )
);

var result = Stream.of("foo", "bar", "baz", "quux", "ab", "test", "cd", "ef")
                   .gather(deduplicatorByLength)      // 依字串長度來進行去重,各長度的字串都只留第一筆
                   .toList();
// result ==> [foo, quux, ab]

實作 Gatherer 介面的聚集器

根據上面的範例做延伸,我們可以實作 Gatherer 介面,使其可以接受更多元化型別的串流,並且能夠彈性地依據不同的條件來做去重處理:

record Deduplicator<T, U>(Function<T, U> fun) implements Gatherer<T, Map<U, T>, T> {
    @Override
    public Supplier<Map<U, T>> initializer() {
        return HashMap<U, T>::new;  // 內部狀態使用 map 儲存
    }

    @Override
    public Gatherer.Integrator<Map<U, T>, T, T> integrator() {  // 與上述程式碼相同
	    return Gatherer.Integrator.ofGreedy((map, element, downstream) ->
	        	map.putIfAbsent(fun.apply(element), element) == null   // 改用傳入的 lambda 判斷
	        		? downstream.push(element)
	        		: true
        );
    }

    // combiner 和 finisher 已省略

    // 建構器
    public static <T, U> Deduplicator<T, U> of(Function<T, U> fun) {
    	return new Deduplicator<>(fun);
    }
}

var result = Stream.of("foo", "bar", "baz", "quux", "ab", "test", "cd", "ef")
                   .gather(Deduplicator.of(String::length))  // 使用字串長度做去重
                   .toList();
// result ==> [foo, quux, ab]

var result = Stream.of("foo", "bar", "BAR", "BAZ", "baz", "Foo", "FOO", "Ab", "AB")
                   .gather(Deduplicator.of(String::toUpperCase))  // 轉成大寫字母後比對
                   .toList();
// result ==> [foo, bar, BAZ, Ab]

總結

JEP 473 引入的串流聚集器是非常重要的進展,為 Stream API 帶來了更靈活更強大的工具,讓開發者去處理複雜的資料轉換和處理邏輯,填補了現有 Stream API 的空白。聚集器對於處理串流進行精細控制和優化的場景特別有用,我們可以更優雅地實現諸如自定義去重、滑動窗口、複雜聚合等操作。這些操作在過去可能需要繁瑣的樣板程式碼,或是必須捨棄 Stream API 才能實現。

與任何強大的工具一樣,聚集器的使用需要謹慎。開發者應該權衡使用聚集器的複雜性與收益,並在適當的場景下合理使用。可以預期的是,串流聚集器將成為 Java 開發者工具箱中的重要工具,進一步提升 Java 在資料處理方面的能力和效率。雖然目前 JEP 473 仍在處於預覽階段,應該很快就會成為正式功能!

本篇文章的內容為老喬原創、二創或翻譯而來。雖已善盡校對、順稿與查核義務,但人非聖賢,多少仍會有疏漏之處難以避免。如果大家有任何問題、建議或指教,都歡迎在底下留言與老喬討論!

發佈留言

發佈留言必須填寫的電子郵件地址不會公開。 必填欄位標示為 *

20 − fifteen =

返回頂端