Stories

Detail Return Return

Java 21 虛擬線程 vs 緩存線程池與固定線程池 - Stories Detail

探索 Java 併發如何從 Java 8 的增強發展到 Java 21 的虛擬線程,從而實現輕量級、可擴展且高效的多線程處理。

引言

併發編程仍然是構建可擴展、響應式 Java 應用程序的關鍵部分。多年來,Java 持續增強了其多線程編程能力。本文回顧了從 Java 8 到 Java 21 併發的演進,重點介紹了重要的改進以及 Java 21 中引入的具有重大影響的虛擬線程。

從 Java 8 開始,併發 API 出現了顯著的增強,例如原子變量、併發映射以及集成 lambda 表達式以實現更具表現力的並行編程。

Java 8 引入的關鍵改進包括:

  • 線程與執行器
  • 同步與鎖
  • 原子變量與 ConcurrentMap

Java 21 於 2023 年底發佈,帶來了虛擬線程這一重大演進,從根本上改變了 Java 應用程序處理大量併發任務的方式。虛擬線程為服務器應用程序提供了更高的可擴展性,同時保持了熟悉的"每個請求一個線程"的編程模型。

或許,Java 21 中最重要的特性就是虛擬線程。
在 Java 21 中,Java 的基本併發模型保持不變,Stream API 仍然是並行處理大型數據集的首選方式。
隨着虛擬線程的引入,併發 API 現在能提供更好的性能。在當今的微服務和可擴展服務器應用領域,線程數量必須增長以滿足需求。虛擬線程的主要目標是使服務器應用程序能夠實現高可擴展性,同時仍使用簡單的"每個請求一個線程"模型。

虛擬線程

在 Java 21 之前,JDK 的線程實現使用的是操作系統線程的薄包裝器。然而,操作系統線程代價高昂:

  • 如果每個請求在其整個持續時間內消耗一個操作系統線程,線程數量很快就會成為可擴展性的瓶頸。
  • 即使使用線程池,吞吐量仍然受到限制,因為實際線程數量是有上限的。

虛擬線程的目標是打破 Java 線程與操作系統線程之間的 1:1 關係。
虛擬線程應用了類似於虛擬內存的概念。正如虛擬內存將大的地址空間映射到較小的物理內存一樣,虛擬線程允許運行時通過將它們映射到少量操作系統線程來製造擁有許多線程的假象。

平台線程是操作系統線程的薄包裝器。
而虛擬線程並不綁定到任何特定的操作系統線程。虛擬線程可以執行平台線程可以運行的任何代碼。這是一個主要優勢——現有的 Java 代碼通常無需修改或僅需少量修改即可在虛擬線程上運行。虛擬線程由平台線程承載,這些平台線程仍然由操作系統調度。

例如,您可以像這樣創建一個使用虛擬線程的執行器:

ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();

對比示例

虛擬線程僅在主動執行 CPU 密集型任務時才消耗操作系統線程。虛擬線程在其生命週期內可以在不同的載體線程上掛載或卸載。

通常,當虛擬線程遇到阻塞操作時,它會自行卸載。一旦該阻塞任務完成,虛擬線程通過掛載到任何可用的載體線程上來恢復執行。這種掛載和卸載過程頻繁且透明地發生——不會阻塞操作系統線程。

示例 — 源代碼

Example01CachedThreadPool.java

在此示例中,使用緩存線程池創建了一個執行器:

var executor = Executors.newCachedThreadPool()
package threads;

import java.time.Duration;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;

/**
 *
 * @author Milan Karajovic <milan.karajovic.rs@gmail.com>
 *
 */

public class Example01CachedThreadPool {

    public void executeTasks(final int NUMBER_OF_TASKS) {

        final int BLOCKING_CALL = 1;
        System.out.println("Number of tasks which executed using 'newCachedThreadPool()' " + NUMBER_OF_TASKS + " tasks each.");

        long startTime = System.currentTimeMillis();

        try (var executor = Executors.newCachedThreadPool()) {

            IntStream.range(0, NUMBER_OF_TASKS).forEach(i -> {
                executor.submit(() -> {
                    // 模擬阻塞調用
                    Thread.sleep(Duration.ofSeconds(BLOCKING_CALL));
                    return i;
                });
            });

        } catch (Exception e) {
            throw new RuntimeException(e);
        }

        long endTime = System.currentTimeMillis();
        System.out.println("For executing " + NUMBER_OF_TASKS + " tasks duration is: " + (endTime - startTime) + " ms");
    }

}
package threads;

import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;

/**
 *
 * @author Milan Karajovic <milan.karajovic.rs@gmail.com>
 *
 */

@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class Example01CachedThreadPoolTest {

    @Test
    @Order(1)
    public void test_1000_tasks() {
        Example01CachedThreadPool example01CachedThreadPool = new Example01CachedThreadPool();
        example01CachedThreadPool.executeTasks(1000);
    }

    @Test
    @Order(2)
    public void test_10_000_tasks() {
        Example01CachedThreadPool example01CachedThreadPool = new Example01CachedThreadPool();
        example01CachedThreadPool.executeTasks(10_000);
    }

    @Test
    @Order(3)
    public void test_100_000_tasks() {
        Example01CachedThreadPool example01CachedThreadPool = new Example01CachedThreadPool();
        example01CachedThreadPool.executeTasks(100_000);
    }

    @Test
    @Order(4)
    public void test_1_000_000_tasks() {
        Example01CachedThreadPool example01CachedThreadPool = new Example01CachedThreadPool();
        example01CachedThreadPool.executeTasks(1_000_000);
    }

}

我 PC 上的測試結果:

Example02FixedThreadPool.java

使用固定線程池創建執行器:

var executor = Executors.newFixedThreadPool(500)
package threads;

import java.time.Duration;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;

/**
 *
 * @author Milan Karajovic <milan.karajovic.rs@gmail.com>
 *
 */

public class Example02FixedThreadPool {

    public void executeTasks(final int NUMBER_OF_TASKS) {

        final int BLOCKING_CALL = 1;
        System.out.println("Number of tasks which executed using 'newFixedThreadPool(500)' " + NUMBER_OF_TASKS + " tasks each.");

        long startTime = System.currentTimeMillis();

        try (var executor = Executors.newFixedThreadPool(500)) {

            IntStream.range(0, NUMBER_OF_TASKS).forEach(i -> {
               executor.submit(() -> {
                   // 模擬阻塞調用
                  Thread.sleep(Duration.ofSeconds(BLOCKING_CALL));
                  return i;
               });
            });

        }   catch (Exception e) {
            throw new RuntimeException(e);
        }

        long endTime = System.currentTimeMillis();
        System.out.println("For executing " + NUMBER_OF_TASKS + " tasks duration is: " + (endTime - startTime) + " ms");
    }

}
package threads;

import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;

/**
 *
 * @author Milan Karajovic <milan.karajovic.rs@gmail.com>
 *
 */

@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class Example02FixedThreadPoolTest {

    @Test
    @Order(1)
    public void test_1000_tasks() {
        Example02FixedThreadPool example02FixedThreadPool = new Example02FixedThreadPool();
        example02FixedThreadPool.executeTasks(1000);
    }

    @Test
    @Order(2)
    public void test_10_000_tasks() {
        Example02FixedThreadPool example02FixedThreadPool = new Example02FixedThreadPool();
        example02FixedThreadPool.executeTasks(10_000);
    }

    @Test
    @Order(3)
    public void test_100_000_tasks() {
        Example02FixedThreadPool example02FixedThreadPool = new Example02FixedThreadPool();
        example02FixedThreadPool.executeTasks(100_000);
    }

    @Test
    @Order(4)
    public void test_1_000_000_tasks() {
        Example02FixedThreadPool example02FixedThreadPool = new Example02FixedThreadPool();
        example02FixedThreadPool.executeTasks(1_000_000);
    }

}

我 PC 上的測試結果:

Example03VirtualThread.java

使用虛擬線程每任務執行器創建執行器:

var executor = Executors.newVirtualThreadPerTaskExecutor()
package threads;

import java.time.Duration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;

/**
 *
 * @author Milan Karajovic <milan.karajovic.rs@gmail.com>
 *
 */

public class Example03VirtualThread {

    public void executeTasks(final int NUMBER_OF_TASKS) {

        final int BLOCKING_CALL = 1;
        System.out.println("Number of tasks which executed using 'newVirtualThreadPerTaskExecutor()' " + NUMBER_OF_TASKS + " tasks each.");

        long startTime = System.currentTimeMillis();

        try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {

            IntStream.range(0, NUMBER_OF_TASKS).forEach(i -> {
               executor.submit(() -> {
                   // 模擬阻塞調用
                  Thread.sleep(Duration.ofSeconds(BLOCKING_CALL));
                  return i;
               });
            });

        }   catch (Exception e) {
            throw new RuntimeException(e);
        }

        long endTime = System.currentTimeMillis();
        System.out.println("For executing " + NUMBER_OF_TASKS + " tasks duration is: " + (endTime - startTime) + " ms");
    }

}
package threads;

import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;

/**
 *
 * @author Milan Karajovic <milan.karajovic.rs@gmail.com>
 *
 */

@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class Example03VirtualThreadTest {

    @Test
    @Order(1)
    public void test_1000_tasks() {
        Example03VirtualThread example03VirtualThread = new Example03VirtualThread();
        example03VirtualThread.executeTasks(1000);
    }

    @Test
    @Order(2)
    public void test_10_000_tasks() {
        Example03VirtualThread example03VirtualThread = new Example03VirtualThread();
        example03VirtualThread.executeTasks(10_000);
    }

    @Test
    @Order(3)
    public void test_100_000_tasks() {
        Example03VirtualThread example03VirtualThread = new Example03VirtualThread();
        example03VirtualThread.executeTasks(100_000);
    }

    @Test
    @Order(4)
    public void test_1_000_000_tasks() {
        Example03VirtualThread example03VirtualThread = new Example03VirtualThread();
        example03VirtualThread.executeTasks(1_000_000);
    }

    @Test
    @Order(5)
    public void test_2_000_000_tasks() {
        Example03VirtualThread example03VirtualThread = new Example03VirtualThread();
        example03VirtualThread.executeTasks(2_000_000);
    }

}

我 PC 上的測試結果:

結論

您可以清楚地看到用於處理所有 NUMBER_OF_TASKS 的不同執行器實現之間的執行時間差異。值得嘗試不同的 NUMBER_OF_TASKS 值以觀察性能變化。

虛擬線程的優勢在處理大量任務時變得尤其明顯。當 NUMBER_OF_TASKS 設置為較高的數值時——例如 1,000,000——性能差距是顯著的。如下表所示,虛擬線程在處理大量任務時效率要高得多:

我確信,在澄清這一點之後,如果您的應用程序使用併發 API 處理大量任務,您會認真考慮遷移到 Java 21 並利用虛擬線程。在許多情況下,這種轉變可以顯著提高應用程序的性能和可擴展性。

源代碼:GitHub Repository – Comparing Threads in Java 21


【注】本文譯自:Java 21 Virtual Threads vs Cached and Fixed Threads

user avatar u_16297326 Avatar seazhan Avatar jkdataapi Avatar devlive Avatar qqxx6661 Avatar beiyinglunkuo Avatar sevencode Avatar nathannie Avatar entropy_adding Avatar chaokunyang Avatar jinjiedefarmer Avatar njwutong Avatar
Favorites 16 users favorite the story!
Favorites

Add a new Comments

Some HTML is okay.