在 JavaScript 日常開發中,經常會遇到這樣的情況:多個異步任務需要同時訪問或修改同一個資源。這個資源可能很簡單,比如內存中的一個變量;也可能複雜一些,比如一份配置文件、一條數據庫記錄或者是某個外部服務的接口調用,只要它是共享的,就有可能被不同的任務同時操作。
問題在於,異步任務不像同步代碼那樣一行一行地按順序執行,而是可能同時進行。如果沒有任何機制來協調它們的先後順序,就會出現混亂:有的任務可能會覆蓋掉別的任務的修改,有的任務可能會在讀取舊數據的基礎上做出錯誤的決策,最終導致整個系統狀態不一致。
舉一個生活中常見的例子,假設有一個簡單的餘額變量 balance 和一個取款函數 withdraw,如下所示:
let balance = 100;
async function withdraw(amount) {
const current = balance;
// 模擬異步延遲,比如數據庫寫入
await sleep(300);
if (current >= amount) {
balance = current - amount;
console.log(`取款 ${amount} 成功,剩餘餘額: ${balance}`);
} else {
console.log(`取款 ${amount} 失敗,餘額不足`);
}
}
// 同時觸發兩個取款操作
withdraw(30);
withdraw(80);
執行結果如下:
取款 30 成功,剩餘餘額: 70
取款 80 成功,剩餘餘額: 20
可以看到結果明顯不正確,問題在於兩次操作幾乎同時讀到 balance 為 100。
- 第 1 次操作,餘額 100 扣 30 → balance = 70
- 第 2 次操作,程序也以為餘額有 100 扣 80 → balance = 20
看到這裏,有的同學可能會説:“你用
await啊,這麼簡單的問題!”。await withdraw(30); await withdraw(80);但是,這位同學請你先別急,在同一個作用域下使用
await確實可以輕鬆解決問題,可是很多時候情況並不是這麼簡單。
假設有一個取款按鈕,用户在第一次操作還沒有完成時又請求了第二次,這樣的連續調用是不是就不能使用await處理了呢?async function onWithdrawClick() { // 即使添加了 await,不同的點擊事件仍會併發執行各自的 withdraw 調用 await withdraw(); }看到點擊事件,又有的同學可能會説:“那你加個防抖啊,這不是有手就行嗎?”
這位同學請你也先別急,請注意我們兩次調用
withdraw的參數並不相同,難道説要捨棄最後一次之前的調用嗎?
假設使用防抖,用户先取款 30 元再取款 80 元,最終的結果是 80 元取款成功,30 元的取款卻沒有響應,這顯然不符合預期。而且有可能並不是在同一個終端發起的
withdraw,所以暫不考慮以上兩種方案。簡單起見,後續代碼中也是直接連續兩次調用
withdraw模擬操作,就不再贅述原因。
那麼這個問題應該如何解決呢?
大家都知道,在支持多線程的語言中,如果有多個線程共享一個變量,通常使用同步鎖(synchronized)來保證同一時間只有一個線程可以修改共享變量。以 Java 為例,對 withdraw 進行改造如下:
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class BankAccount {
private final Lock lock = new ReentrantLock(true);
private int balance = 100;
public void withdraw(int amount) {
// 獲取鎖,保證臨界區安全
lock.lock();
try {
// 讀取餘額
int current = balance;
// 模擬異步操作的延遲
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (current >= amount) {
balance = current - amount;
System.out.println("取款 " + amount + " 成功,剩餘餘額: " + balance);
} else {
System.out.println("取款 " + amount + " 失敗,餘額不足");
}
} finally {
// 釋放鎖
lock.unlock();
}
}
}
public class Main {
public static void main(String[] args) {
BankAccount account = new BankAccount();
// 兩個線程幾乎同時執行取款操作
new Thread(() -> account.withdraw(30)).start();
new Thread(() -> account.withdraw(80)).start();
}
}
執行結果如下:
取款 30 成功,剩餘餘額: 70
取款 80 失敗,餘額不足
可以發現執行結果穩定正確,達到了預期的效果。
可是在 JavaScript 中,並沒有像 Java 那樣的 ReentrantLock 類,那麼能不能編寫一個類似於 ReentrantLock 的類來實現這麼一個互斥鎖的功能呢?
答案是肯定的,藉助 Promise 的特性,完全可以自己實現一個輕量級的、可用於異步任務的互斥鎖(目前只考慮類公平鎖)。
接下來,就使用 Promise 來實現一個迷你版的 Mutex,從而在異步環境下安全地操作共享資源。
class Mutex {
_locked = false;
_queue = [];
lock() {
if (this._locked) {
return new Promise((resolve) => {
this._queue.push(resolve);
});
}
this._locked = true;
return Promise.resolve();
}
unlock() {
if (this._queue.length > 0) {
const next = this._queue.shift();
if (next) {
next();
}
return;
}
this._locked = false;
}
}
_locked 標記鎖是否被佔用,_queue 存儲等待鎖的任務。
lock 方法嘗試獲取鎖,如果鎖被佔用就返回一個 Promise 並加入隊列等待,否則立即獲得鎖。
unlock 方法釋放鎖,如果隊列中有等待任務,就依次喚醒下一個,否則將鎖標記為空閒,從而保證異步操作按順序執行臨界區代碼。
現在使用這個迷你版的 Mutex 來改造代碼:
const mutex = new Mutex();
let balance = 100;
async function withdraw(amount) {
await mutex.lock();
try {
const current = balance;
await sleep(300);
if (current >= amount) {
balance = current - amount;
console.log(`取款 ${amount} 成功,剩餘餘額: ${balance}`);
} else {
console.log(`取款 ${amount} 失敗,餘額不足`);
}
} finally {
mutex.unlock();
}
}
withdraw(30);
withdraw(80);
執行結果如下:
取款 30 成功,剩餘餘額: 70
取款 80 失敗,餘額不足
結果正確!
即使在多個異步操作幾乎同時執行的情況下,這個迷你版的 Mutex 也能夠保證共享資源的訪問是互斥的,避免了數據競爭和狀態混亂。
當然,這個迷你版的 Mutex 只是一個最小實現,僅用於驗證思路的可行性。在實際的生產環境中,通常不建議使用自制的迷你版 Mutex,因為它缺乏完整的邊界檢查、等待超時、錯誤處理以及更復雜場景下的健壯性保證。
推薦使用社區成熟的庫,例如 async-mutex,它封裝了完整的互斥鎖功能,提供了可靠的方法來保證異步任務安全訪問共享資源,同時代碼簡潔、易讀,並經過社區驗證,能夠應對更復雜的併發場景。
使用 async-mutex 改造示例如下:
import { Mutex } from "async-mutex";
const mutex = new Mutex();
let balance = 100;
async function withdraw(amount) {
await mutex.acquire();
try {
const current = balance;
await sleep(300);
if (current >= amount) {
balance = current - amount;
console.log(`取款 ${amount} 成功,剩餘餘額: ${balance}`);
} else {
console.log(`取款 ${amount} 失敗,餘額不足`);
}
} finally {
mutex.release();
}
}
async-mutex
Github: https://github.com/DirtyHairy/async-mutex
Mutex 通常指的是一種用於同步併發進程的數據結構。
例如,在訪問一個非線程安全資源之前,線程會先鎖定 mutex,這會確保該線程在其他線程持有鎖期間被阻塞,從而強制實現對資源的獨佔訪問。操作完成後,線程釋放鎖,其他線程才可以繼續獲取鎖並訪問該資源。
儘管 JavaScript 嚴格來説是單線程的,但由於它的異步執行模型,依然可能出現需要同步原語的競態條件。
比如,與 Web Worker 通信時,需要連續發送多個消息才能完成某個任務。在消息異步交換的過程中,完全可能再次調用。如果在異步過程中狀態處理不當,就會出現難以修復、甚至更難追蹤的競態問題。
這個庫的作用就是將 Mutex 的概念引入到 JavaScript 中,鎖定 mutex 會返回一個 Promise,該 Promise 會在 mutex 可用時才 resolve。當異步流程完成後(通常會經歷多次事件循環),調用者需要調用釋放函數來釋放 mutex,以便下一個等待中的任務繼續執行。
安裝
# npm
npm install async-mutex
# pnpm
pnpm add async-mutex
# yarn
yarn add async-mutex
該庫由 TypeScript 編寫,可以在任何支持 ES5、ES6 Promise 和 Array.isArray 的環境中使用。在較老的瀏覽器中,可以使用 shim 支持(例如 core-js)。
創建 mutex
import { Mutex } from "async-mutex";
const mutex = new Mutex();
同步執行代碼
- Promise
mutex.runExclusive(() => {
// ...
}).then((result) => {
// ...
});
- async / await
await mutex.runExclusive(async () => {
// ...
});
runExclusive 會在 mutex 解鎖後執行提供的回調函數,該函數可以返回一個 Promise,當 Promise 被 resolve 或 reject(或者是一個普通函數執行完成)後 mutex 會被釋放。
runExclusive 返回一個 Promise,其狀態與回調函數的返回結果一致。如果在回調函數執行過程中拋出異常,mutex 也會被釋放,並且返回結果會被 reject。
手動加鎖 / 釋放
- Promise
mutex.acquire().then((release) => {
// ...
release();
});
- async / await
const release = await mutex.acquire();
try {
// ...
} finally {
release();
}
acquire 會返回一個 Promise,當 mutex 可用時 resolve。該 Promise 會解析為一個 release 函數,必須調用該函數來釋放鎖。
release 函數是冪等的,多次調用不會產生副作用。
重要提示: 如果未調用 release,mutex 會一直保持鎖定狀態,應用可能會死鎖。請確保在任何情況下都調用 release,並妥善處理異常。
無作用域的釋放
除了調用 acquire 返回的 release 回調,還可以直接在 mutex 上調用 release。
mutex.release();
檢查 mutex 是否被鎖定
mutex.isLocked();
取消等待中的鎖請求
可以調用 cancel 來取消所有等待中的鎖請求,被取消的請求會以 E_CANCELED reject。
- Promise
import { E_CANCELED } from "async-mutex";
mutex.runExclusive(() => {
// ...
}).then(() => {
// ...
}).catch(e => {
if (e === E_CANCELED) {
// ...
}
});
- async / await
import { E_CANCELED } from "async-mutex";
try {
await mutex.runExclusive(() => {
// ...
});
} catch (e) {
if (e === E_CANCELED) {
// ...
}
}
這對 acquire 同樣適用,如果 acquire 被取消,返回的 Promise 也會被 reject,錯誤為 E_CANCELED。
你可以通過向 Mutex 構造函數傳入自定義錯誤來修改拋出的錯誤。
const mutex = new Mutex(new Error("fancy custom error"));
注意:取消操作只會影響等待中的鎖請求,不會強制釋放當前已經持有的鎖。因此,即使調用了 cancel,mutex 也可能依舊不可用。
等待 mutex 可用
如果只是想等待 mutex 可用(而不是立刻獲取鎖),可以調用 waitForUnlock,它會返回一個 Promise,當 mutex 再次可獲取時 resolve。
但是,這個操作本身不會鎖定 mutex,因此一旦經過異步邊界,不保證 mutex 依然可用。
- Promise
mutex.waitForUnlock().then(() => {
// ...
});
- async / await
await mutex.waitForUnlock();
// ...
限制等待 mutex 的時間
有時需要限制程序等待 mutex 可用的時間,可以使用 withTimeout 裝飾器,它會修改 acquire 和 runExclusive 的行為。
import { withTimeout, E_TIMEOUT } from "async-mutex";
const mutexWithTimeout = withTimeout(new Mutex(), 100);
被裝飾後的 mutex API 不變。
withTimeout 的第二個參數是超時時間(毫秒),超過時間後,acquire 和 runExclusive 返回的 Promise 會被 reject,錯誤為 E_TIMEOUT。
在超時的情況下,runExclusive 不會執行回調。
withTimeout 的第三個參數可選,可以自定義拋出的錯誤。
const mutexWithTimeout = withTimeout(new Mutex(), 100, new Error("new fancy error"));
如果 mutex 不可用則立即失敗
如果不希望等待鎖,可以使用 tryAcquire 裝飾器。
它會修改 acquire 和 runExclusive 的行為,在 mutex 不可用時立即拋出 E_ALREADY_LOCKED。
- Promise
import { tryAcquire, E_ALREADY_LOCKED } from "async-mutex";
tryAcquire(mutex).runExclusive(() => {
// ...
}).then(() => {
// ...
}).catch(e => {
if (e === E_ALREADY_LOCKED) {
// ...
}
});
- async / await
import { tryAcquire, E_ALREADY_LOCKED } from "async-mutex";
try {
await tryAcquire(mutex).runExclusive(() => {
// ...
});
} catch (e) {
if (e === E_ALREADY_LOCKED) {
// ...
}
}
同樣,你可以傳入自定義錯誤作為第二個參數。
tryAcquire(mutex, new Error("new fancy error"))
.runExclusive(() => {
// ...
});
結語
本次插件分享到此結束,感謝你的閲讀,如果對你有幫助請給我一個 👍 支持吧!
🙂 我是 xiaohe0601,關注我瞭解更多實用的開源插件!