動態

詳情 返回 返回

C#.NET SemaphoreSlim 深入解析:輕量級異步鎖與併發控制 - 動態 詳情

簡介

SemaphoreSlim.NETSystem.Threading 命名空間提供的一個輕量級同步原語,用於限制對共享資源的併發訪問。它是傳統 Semaphore 類的輕量替代,專為高性能、異步場景設計,特別適合結合 async/await 的現代 .NET 應用(如 ASP.NET Core)。

在多線程或高併發應用中,共享資源的訪問需要同步以避免競爭條件。傳統的 Semaphore 類基於內核對象,性能開銷較高,而 SemaphoreSlim 是一個用户態同步原語,專為以下場景設計:

  • 限制併發訪問:控制同時訪問共享資源的線程數(如數據庫連接、文件寫入)。
  • 異步友好:通過 WaitAsync 方法支持 async/await,適合異步編程。
  • 輕量高效:相比 Semaphore,內存和性能開銷更低。
  • 跨平台支持:內置於 .NET,兼容所有 .NET 運行時。

解決的問題

問題 傳統 Semaphore SemaphoreSlim
線程阻塞開銷 高(內核模式) 低(用户模式旋轉等待)
異步支持 ❌ 不支持 ✅ 原生支持 async/await
內存佔用 ~1KB 每實例 ~24 bytes
跨進程同步 ✅ 支持 ❌ 僅限進程內

核心功能

  • 信號量計數:維護一個計數器,表示可用資源數。
  • 限制併發:通過 WaitWaitAsync 限制線程訪問,Release 釋放資源。
  • 異步支持:提供 WaitAsync 方法,適合異步任務。
  • 可取消性:通過 CancellationToken 支持取消等待。
  • 動態調整:支持初始化和動態調整信號量計數。
  • 線程安全:內置線程安全,適合多線程環境。

核心 API

SemaphoreSlim 位於 System.Threading 命名空間,核心 API 如下:

  1. 構造函數
// 初始化信號量,初始計數為 0,最大計數為 2
SemaphoreSlim semaphore = new SemaphoreSlim(0, 2);
  • SemaphoreSlim(int initialCount, int maxCount)

    • initialCount:信號量的初始計數。
    • maxCount:信號量的最大計數。
  1. Wait() 方法

Wait() 會請求一個信號量。如果當前沒有可用的信號量,則線程會被阻塞直到信號量可用。

semaphore.Wait();  // 請求信號量
// 執行受限資源的操作
  1. WaitAsync() 方法

WaitAsync() 是異步方法,用於在異步上下文中請求信號量。

await semaphore.WaitAsync();  // 異步請求信號量
// 執行異步操作
  1. Release() 方法

Release() 用於釋放一個信號量,增加信號量計數。通常在完成共享資源的訪問後調用它。

semaphore.Release();  // 釋放一個信號量

semaphore.Release(3);  // 釋放3個信號量

如果信號量的計數達到了最大值,Release() 會導致 SemaphoreSlim 的計數不會再增加。

  1. CurrentCount 屬性

CurrentCount 屬性用於查看當前信號量的計數。

int currentCount = semaphore.CurrentCount;  // 獲取當前可用的信號量計數
  1. 異常與超時處理

在信號量等待時,如果指定了超時,Wait(int milliseconds)WaitAsync(TimeSpan timeout) 可以避免無限期阻塞:

if (await semaphore.WaitAsync(TimeSpan.FromSeconds(5)))
{
    try
    {
        // 執行操作
    }
    finally
    {
        semaphore.Release();
    }
}
else
{
    // 超時邏輯
}

使用示例

基本的線程限制

public class SemaphoreExample
{
    private static SemaphoreSlim semaphore = new SemaphoreSlim(3, 3);  // 最多允許 3 個線程同時執行

    public static async Task Main(string[] args)
    {
        var tasks = new List<Task>();

        for (int i = 0; i < 10; i++)  // 10 個任務,但最多 3 個同時執行
        {
            int taskId = i;
            tasks.Add(Task.Run(async () =>
            {
                await semaphore.WaitAsync();  // 請求信號量
                try
                {
                    Console.WriteLine($"Task {taskId} started.");
                    await Task.Delay(1000);  // 模擬任務執行
                    Console.WriteLine($"Task {taskId} completed.");
                }
                finally
                {
                    semaphore.Release();  // 釋放信號量
                }
            }));
        }

        await Task.WhenAll(tasks);  // 等待所有任務完成
    }
}

在此示例中,最多隻有 3 個任務可以同時執行。SemaphoreSlim 控制併發任務的數量。

處理數據庫連接池

public class DatabaseConnectionPool
{
    private static SemaphoreSlim semaphore = new SemaphoreSlim(5, 5);  // 最大允許 5 個併發連接

    public static async Task AccessDatabaseAsync()
    {
        await semaphore.WaitAsync();  // 請求連接
        try
        {
            Console.WriteLine($"Accessing database on thread {Thread.CurrentThread.ManagedThreadId}");
            await Task.Delay(2000);  // 模擬數據庫操作
        }
        finally
        {
            semaphore.Release();  // 釋放連接
        }
    }
}

這裏,SemaphoreSlim 控制對數據庫連接的併發訪問,最多允許 5 個線程同時訪問數據庫。

API 請求限流

class RateLimitedHttpClient
{
    private readonly HttpClient _client = new();
    private readonly SemaphoreSlim _throttle = new(5); // 最大5個併發請求

    public async Task<string> GetAsync(string url)
    {
        await _throttle.WaitAsync();
        try
        {
            return await _client.GetStringAsync(url);
        }
        finally
        {
            _throttle.Release();
        }
    }
}

高級功能

超時與取消控制

async Task<bool> TryAccessAsync(TimeSpan timeout, CancellationToken ct)
{
    // 帶超時和取消的等待
    if (await semaphore.WaitAsync(timeout, ct))
    {
        try { /* 操作資源 */ }
        finally { semaphore.Release(); }
        return true;
    }
    return false; // 超時未獲取
}

批量獲取與釋放

// 批量獲取3個許可
await semaphore.WaitAsync(3); 

try
{
    // 執行需要多個許可的操作
    ProcessBulkData();
}
finally
{
    // 批量釋放3個許可
    semaphore.Release(3); 
}

資源池實現

class ResourcePool<T> where T : IDisposable
{
    private readonly ConcurrentQueue<T> _resources = new();
    private readonly SemaphoreSlim _semaphore;

    public ResourcePool(IEnumerable<T> resources)
    {
        foreach (var res in resources) _resources.Enqueue(res);
        _semaphore = new SemaphoreSlim(_resources.Count, _resources.Count);
    }

    public async Task<ResourceLease> AcquireAsync(CancellationToken ct = default)
    {
        await _semaphore.WaitAsync(ct);
        _resources.TryDequeue(out var resource);
        return new ResourceLease(resource, this);
    }

    private void Release(T resource)
    {
        _resources.Enqueue(resource);
        _semaphore.Release();
    }

    public readonly struct ResourceLease : IDisposable
    {
        private readonly ResourcePool<T> _pool;
        public T Resource { get; }

        public ResourceLease(T resource, ResourcePool<T> pool)
        {
            Resource = resource;
            _pool = pool;
        }

        public void Dispose() => _pool.Release(Resource);
    }
}

避免常見陷阱

// ❌ 錯誤:忘記釋放
semaphore.Wait();
DoWork(); // 若異常則永遠不釋放

// ✅ 正確:使用using模式
public struct SemaphoreGuard : IDisposable
{
    private SemaphoreSlim _semaphore;
    public SemaphoreGuard(SemaphoreSlim semaphore)
    {
        _semaphore = semaphore;
        semaphore.Wait();
    }
    public void Dispose() => _semaphore?.Release();
}

using (new SemaphoreGuard(semaphore))
{
    // 受保護操作
}

與Channel結合實現生產者-消費者

var channel = Channel.CreateBounded<int>(capacity: 100);
var semaphore = new SemaphoreSlim(0, 100); // 初始無許可

// 生產者
public async Task ProduceAsync(int item)
{
    await channel.Writer.WriteAsync(item);
    semaphore.Release(); // 增加可用許可
}

// 消費者
public async Task ConsumeAsync()
{
    await semaphore.WaitAsync(); // 等待數據
    var item = await channel.Reader.ReadAsync();
    ProcessItem(item);
}

性能優化

  • 適合線程內同步

SemaphoreSlim 是輕量級的,僅在同一個進程內工作,如果需要跨進程的同步,可以使用 Semaphore 類。

  • 最小化信號量請求與釋放的頻率

每次調用 Wait()Release() 都有一定的開銷。在高併發場景中,頻繁地請求和釋放信號量可能會影響性能,因此儘量將資源使用放在一個操作內進行。

  • 避免死鎖

使用 SemaphoreSlim 時,要確保所有 Wait() 操作都有對應的 Release() 調用,避免死鎖。

  • 超時機制

為了避免某些線程永遠等待信號量,考慮為 WaitAsyncWait 設置超時,或者在等待時使用 CancellationToken 來支持取消操作。

常見使用場景

  • 數據庫連接池
    控制併發訪問數據庫的連接數,防止連接池耗盡。
  • 併發訪問共享資源
    限制同時訪問某些資源(例如文件、緩存、API)的線程數。
  • 限制高併發請求
    在高併發場景下,限制客户端請求數,防止系統過載。
  • 任務調度與執行
    控制任務的併發執行數量,保證系統負載均衡。
  • 信號量與生產者消費者模型
    使用 SemaphoreSlim 配合隊列實現生產者消費者模型,限制隊列中同時處理的任務數。

image.png

與相關同步原語對比

特性 SemaphoreSlim Semaphore Mutex Monitor ReaderWriterLockSlim
跨進程
遞歸獲取
異步支持
讀寫分離
輕量級
超時支持

總結

SemaphoreSlim 是一種高效的線程同步原語,能夠有效控制併發線程數,廣泛應用於限制資源訪問、調度任務執行等場景。相比於 Semaphore,它在線程內的同步場景中更加輕量、性能更好。在使用時,應注意避免死鎖、合理設置超時以及優化信號量的請求與釋放頻率,以確保系統性能與穩定性。

資源和文檔

  • 官方文檔:

    • Microsoft Learn:https://learn.microsoft.com/en-us/dotnet/api/system.threading...
    • .NET Threading:https://learn.microsoft.com/en-us/dotnet/standard/threading
  • GitHub:https://github.com/dotnet/runtime
user avatar xvrzhao 頭像 l7luo 頭像 huifeideniao 頭像 shenchendebanma 頭像 yayahonghong 頭像
點贊 5 用戶, 點贊了這篇動態!
點贊

Add a new 評論

Some HTML is okay.