博客 / 詳情

返回

Java 中的結構化併發模式

併發編程長期以來一直是 Java 的阿喀琉斯之踵。儘管 ExecutorServiceFuture 為我們提供了良好的服務,但它們允許不受限制的模式,其中子任務可能比其父任務存活更久、線程可能泄漏,而取消操作則變成了一場噩夢。結構化併發通過將運行在不同線程中的相關任務組視為一個單一的工作單元,改變了這一現狀,它簡化了錯誤處理和取消操作,同時提高了可靠性和可觀測性。

非結構化併發的問題

考慮一個使用 ExecutorService 的典型模式:一個線程創建執行器,另一個線程提交工作,而執行任務的線程與前兩者都沒有關係。在一個線程提交工作之後,一個完全不同的線程可以等待結果——任何持有 Future 引用的代碼都可以連接它,甚至可以是與獲取該 Future 的線程不同的線程中的代碼。

這種非結構化方法帶來了實際問題。當父任務未能正確關閉子任務時,就會發生線程泄漏。由於沒有協調的方式來通知多個子任務,取消操作會出現延遲。並且由於任務和子任務之間的關係在運行時未被跟蹤,可觀測性會受到影響。

// 非結構化:關係是隱式且脆弱的
ExecutorService executor = Executors.newCachedThreadPool();
Future<User> userFuture = executor.submit(() -> fetchUser(id));
Future<Orders> ordersFuture = executor.submit(() -> fetchOrders(id));

// 如果 fetchUser 失敗會發生什麼?
// 誰負責關閉執行器?
// 如果我們忘記清理,線程會泄漏嗎?

引入 StructuredTaskScope

結構化併發 API 的主要類是 java.util.concurrent 包中的 StructuredTaskScope,它使您能夠將一個併發子任務組作為一個單元進行協調。使用 StructuredTaskScope,您可以在各自的線程中分叉每個子任務,然後將它們作為一個單元進行匯合,確保在主任務繼續之前子任務完成。

該 API 遵循一個清晰的模式:

  1. 使用 try-with-resources 創建一個 StructuredTaskScope
  2. 將子任務定義為 Callable 實例
  3. 在各自的線程中分叉每個子任務
  4. 匯合以等待完成
  5. 處理子任務的結果

以下是一個獲取天氣數據的真實示例:

WeatherReport getWeatherReport(String location)
        throws ExecutionException, InterruptedException {
    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
        Supplier<Temperature> temperature =
            scope.fork(() -> getTemperature(location));
        Supplier<Humidity> humidity =
            scope.fork(() -> getHumidity(location));
        Supplier<WindSpeed> windSpeed =
            scope.fork(() -> getWindSpeed(location));

        scope.join()           // 匯合所有子任務
             .throwIfFailed(); // 如果有任何失敗,傳播錯誤

        // 全部成功,組合結果
        return new WeatherReport(
            location,
            temperature.get(),
            humidity.get(),
            windSpeed.get()
        );
    }
}

try-with-resources 代碼塊至關重要——它確保作用域被正確關閉,取消任何未完成的子任務並防止線程泄漏。

使用關閉策略實現短路

短路模式通過使主任務能夠中斷和取消那些不再需要其結果子任務,來促使子任務快速完成。兩個內置策略處理了常見場景:

ShutdownOnFailure:"調用所有"模式

當您需要所有子任務都成功時,ShutdownOnFailure 會在一個任務失敗後立即取消剩餘的任務:

Response handleRequest(String userId) throws Exception {
    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
        Subtask<User> user = scope.fork(() -> fetchUser(userId));
        Subtask<Profile> profile = scope.fork(() -> fetchProfile(userId));
        Subtask<Settings> settings = scope.fork(() -> fetchSettings(userId));

        scope.join().throwIfFailed();

        // 如果有任何失敗,我們永遠不會到達這裏
        return new Response(user.get(), profile.get(), settings.get());
    }
}

如果 fetchUser() 拋出異常,作用域會立即取消配置文件和設置的獲取。沒有浪費的工作,沒有線程泄漏。

ShutdownOnSuccess:"調用任一"模式

有時您只需要第一個成功的結果——例如查詢多個數據中心或嘗試備用服務:

String fetchFromMultipleSources(String key) throws Exception {
    try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
        scope.fork(() -> fetchFromPrimaryDB(key));
        scope.fork(() -> fetchFromCache(key));
        scope.fork(() -> fetchFromBackup(key));

        scope.join();

        // 返回第一個成功的結果
        return scope.result();
    }
}

任何子任務成功的瞬間,作用域就會取消其他任務。這種模式非常適合對延遲敏感的操作,即您需要競速多個來源。

自定義關閉策略

在實踐中,大多數 StructuredTaskScope 的使用不會直接使用 StructuredTaskScope 類,而是使用實現了關閉策略的兩個子類之一,或者編寫自定義子類來實現自定義關閉策略。

以下是一個收集所有成功結果並忽略失敗的自定義策略:

class AllSuccessesScope<T> extends StructuredTaskScope<T> {
    private final List<T> results =
        Collections.synchronizedList(new ArrayList<>());

    @Override
    protected void handleComplete(Subtask<? extends T> subtask) {
        if (subtask.state() == Subtask.State.SUCCESS) {
            results.add(subtask.get());
        }
    }

    public List<T> getResults() {
        return List.copyOf(results);
    }
}

// 用法
List<Data> collectAll() throws InterruptedException {
    try (var scope = new AllSuccessesScope<Data>()) {
        for (String source : dataSources) {
            scope.fork(() -> fetchData(source));
        }
        scope.join();
        return scope.getResults();
    }
}

虛擬線程:完美搭檔

虛擬線程提供了大量的線程——結構化併發可以正確且健壯地協調它們,並使可觀測性工具能夠按開發人員理解的方式顯示線程。這種組合非常強大,因為虛擬線程使得創建數百萬個線程的成本很低,而結構化併發則確保您能安全地管理它們。

// 現在啓動 10,000 個併發任務是可行的
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
    for (int i = 0; i < 10_000; i++) {
        final int taskId = i;
        scope.fork(() -> processTask(taskId));
    }
    scope.join().throwIfFailed();
}

使用平台線程,這將是災難性的。但使用虛擬線程和結構化併發,這變得簡單而安全。

模塊系統考量

在使用結構化併發構建模塊化應用程序時,理解 Java 的模塊系統變得很重要。對於模塊,反射失去了其"超能力",並且受限於與編譯代碼完全相同的可訪問性規則——它只能訪問導出包中公共類的公共成員。

默認情況下,只有 module-info.java 中顯式導出的包是可見的。如果您使用的是依賴反射的框架(如 Spring 或 Hibernate),您將需要額外的聲明:

module com.example.app {
    // 用於編譯時訪問的常規導出
    exports com.example.api;

    // 為運行時反射訪問開放
    opens com.example.entities to org.hibernate.orm.core;

    requires java.base;
    requires org.hibernate.orm.core;
}

在編譯時,開放的包完全被封裝,就像該指令不存在一樣,但在運行時,包的類型可用於反射,自由地與所有類型和成員(無論公開與否)交互。

為了在所有包上獲得完整的反射訪問權限,您可以聲明一個開放模塊:

open module com.example.app {
    exports com.example.api;
    requires java.base;
}

開放模塊會開放其包含的所有包,就像每個包都單獨在 opens 指令中使用一樣,這很方便但降低了封裝性。

可觀測性和調試

結構化併發顯著提高了可觀測性。線程轉儲現在顯示了清晰的父子關係:

jcmd <pid> Thread.dump_to_file -format=json output.json

JSON 輸出揭示了 StructuredTaskScope 及其在數組中的分叉子任務,使得理解正在運行的內容及其原因變得容易。這與關係隱式的扁平線程轉儲相比,是一種變革。

當前狀態與演進

結構化併發由 JEP 428 提出,並在 JDK 19 中作為孵化 API 交付,在 JDK 20 中重新孵化,通過 JEP 453 在 JDK 21 中首次預覽,並在 JDK 22 和 23 中重新預覽。截至 JDK 25,該 API 已經演進,使用靜態工廠方法替代了公共構造函數。

要在當前 JDK 版本中使用結構化併發,需啓用預覽特性:

# 編譯
javac --release 21 --enable-preview MyApp.java

# 運行
java --enable-preview MyApp

基於真實世界的反饋,該 API 正在穩定下來。結構化併發已被證明是一種安全、富有表現力且易於理解的併發方法,Python 庫率先開創了這一領域,隨後是 Kotlin 等語言。

最佳實踐

  • 始終使用 Try-With-Resources:必須關閉作用域以防止線程泄漏。切勿手動管理 StructuredTaskScope 的生命週期。
  • 選擇正確的策略:當所有結果都重要時使用 ShutdownOnFailure,在競速場景中使用 ShutdownOnSuccess,或者為特定需求實現自定義策略。
  • 與虛擬線程結合使用:結構化併發與虛擬線程結合時效果最佳,能夠通過簡單的代碼實現大規模併發。
  • 避免共享可變狀態:雖然結構化併發處理協調,但您仍然需要對共享數據的線程安全負責。
  • 考慮作用域值:為了在任務層次結構中傳遞上下文,作用域值(JEP 481)提供了比 ThreadLocal 更好的替代方案。

真實示例:聚合用户數據

讓我們構建一個從多個來源聚合數據的完整示例:

public class UserAggregator {
    record UserData(User user, List<Order> orders,
                    Stats stats, Recommendations recs) {}

    public UserData aggregate(String userId)
            throws ExecutionException, InterruptedException {
        try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
            Supplier<User> user =
                scope.fork(() -> userService.fetch(userId));
            Supplier<List<Order>> orders =
                scope.fork(() -> orderService.fetch(userId));
            Supplier<Stats> stats =
                scope.fork(() -> statsService.compute(userId));
            Supplier<Recommendations> recs =
                scope.fork(() -> mlService.recommend(userId));

            scope.join().throwIfFailed();

            return new UserData(
                user.get(),
                orders.get(),
                stats.get(),
                recs.get()
            );
        }
    }
}

這種模式簡潔、安全且高效。如果任何服務失敗,所有其他服務會立即被取消。作用域確保適當的清理。並且藉助虛擬線程,這可以擴展到數千個併發請求。

開發者觀點

Java 架構師決定不從 fork 方法返回 Future 實例,以避免與非結構化計算混淆,並與舊的併發模型進行清晰切割。這一設計決策強調了結構化併發是一種新的範式,而不僅僅是漸進式改進。

Rock the JVM 教程指出,結構化併發最終為 Java 帶來了其他 JVM 語言通過 Kotlin 協程和 Scala Cats Effects Fibers 等庫所提供的功能,但擁有官方的平台支持。

展望未來

結構化併發代表了我們對併發編程思考方式的根本轉變。我們不是管理單個線程和 Future,而是按層次結構組織併發工作——就像我們用方法和循環組織順序代碼一樣。

好處是顯而易見的:沒有線程泄漏、正確的錯誤傳播、協調的取消以及增強的可觀測性。結合虛擬線程,Java 現在提供了一個既強大又易於使用的併發模型。

隨着該 API 走向最終化,預計將在框架和庫中得到更廣泛的採用。Spring、Hibernate 及其他生態系統項目已經在考慮如何利用結構化併發來編寫更清晰、更可靠的併發代碼。


【注】本文譯自:Structured Concurrency Patterns in Java

user avatar
0 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.