动态

详情 返回 返回

Promise 與異步編程 - 动态 详情

Promise 是 JavaScript 中的一個重要概念,與前端的工作更是息息相關。因此本文將整理一下 Promise 在日常工作中的應用。


概念

從 MDN | 使用 Promise 中我們能學習到 Promise 的基礎使用與錯誤處理、組合等概念,可以將 Promise 的特點概括為:

  • Promise 對象有三種狀態,且狀態一旦改變就不會再變。其值記錄在內部屬性 [[PromiseState]] 中:

    • pending: 進行中
    • fulfilled: 已成功
    • rejected: 已失敗
  • 主要用於異步計算,並且可以將異步操作隊列化 (鏈式調用),按照期望的順序執行,返回符合預期的結果。
  • 可以在對象之間傳遞和操作 Promise,幫助我們處理隊列
  • 鏈式調用的寫法更簡潔,可以避免回調地獄

在現實工作中,當我們使用 Promise 時更多是對請求的管理,由於不同請求或任務的異步性。因此我們會根據不同的使用場景處理 Promise 的調度。

async/await

async/await 是基於 Promise 的一種語法糖,使得異步代碼的編寫和理解更加像同步代碼。

當一個函數前通過 async 關鍵字聲明,那這個函數的返回值一定會返回一個 Promise,即便函數返回的值與 Promise 無關,也會隱式包裝為 Promise 對象:

async function getAge() {
  return 18;
}

getAge().then(age => console.log(`age: ${age}`))
// age: 18

await

await 操作符通常和 async 是配套使用的,它會等待 Promise 並拆開 Promise 包裝直接取到裏面的值。當它處於 await 狀態時,Promise 還處於 ``,後續的代碼將不會被執行,因此看起來像是 “同步” 的。

function delayResolve(x, timeout = 2000) {
  return new Promise((resolve) => {
    setTimeout(() => {
      resolve(x);
    }, timeout);
  });
}

async function main() {
  const x = await delayResolve(2, 2000);
  console.log(x);

  const y = await delayResolve(1, 1000);
  console.log(y);
}

main();
// 2
// 1

錯誤處理

async/await 的錯誤處理通過是通過 try..catch 來捕獲錯誤。當然,我們也會根據實際業務的需求來 catch 我們真正需要處理的問題。

 try {
    const response = await axios.get('https://example.com/user');

    // 處理響應數據
    console.log('User data fetched:', response.data);
  } catch (error) {
    console.error('Error response:', error.response);
    // 做其他錯誤處理
  }

學習了前文的基礎概念後,我們可以更近一步的探討 Promise 的使用。

Promise 串聯

Promise 串聯一般是指多個 Promise 操作按順序執行,其中每個操作的開始通常依賴於前一個操作的完成。這種串行執行的一個典型場景是在第一個異步操作完成後,其結果被用作第二個異步操作的輸入,依此類推。

考慮以下場景:加載系統時,需要優先讀取用户數據,同時需要用户的數據去讀取用户的訂單的信息,再需要兩者信息生成用户報告。因此這是一個存在前後依賴的場景。

function fetchUserInfo(userId) {
  return axios.get(`/api/users/${userId}`);
}

function fetchUserOrders(userId) {
  return axios.get(`/api/orders/${userId}`);
}

function generateReport(userInfo, orders) {
  // 根據用户信息和訂單生成報告
  return {
    userName: userInfo.name,
    totalOrders: orders.length,
    // ...其他報告數據
  };
}

常規處理方法

處理串聯請求無非有兩種方法:

方法 1: 鏈式調用 .then()

在這種方法中,我們利用 .then() 的鏈式調用來處理每個異步任務。這種方式的優點是每個步驟都明確且連貫,但可能導致所謂的“回調地獄”,尤其是在處理多個串聯的異步操作時。

const userId = '12345'; // 假設已知的用户ID

fetchUserInfo(userId)
  .then(response => {
    const userInfo = response.data;
    return fetchUserOrders(userInfo.id); // 使用用户ID獲取訂單
  })
  .then(response => {
    const orders = response.data;
    return generateReport(userInfo, orders); // 生成報告
  })
  .then(report => {
    console.log('用户報告:', report);
  })
  .catch(error => {
    console.error('在處理請求時發生錯誤:', error);
  });

方法 2: 使用 async/await

async/await 提供了一種更加直觀、類似同步的方式來處理異步操作。它使代碼更易於閲讀和維護,特別是在處理複雜的異步邏輯時。

async function getUserReport(userId) {
  try {
    const userInfoResponse = await fetchUserInfo(userId);
    const userInfo = userInfoResponse.data;

    const userOrdersResponse = await fetchUserOrders(userInfo.id);
    const orders = userOrdersResponse.data;

    const report = generateReport(userInfo, orders);
    console.log('用户報告:', report);
  } catch (error) {
    console.error('在處理請求時發生錯誤:', error);
  }
}

const userId = '12345'; // 假設已知的用户ID
getUserReport(userId);

在這個示例中,使用 async/await 使得代碼的邏輯更加清晰和直觀,減少了代碼的嵌套深度,使錯誤處理變得簡單。

串聯自動化

以上是日常工作中最常見的需求.但這裏我們還可以發散一下思維,考慮更復雜的情況:

現在有一個數組,數組內有 10 個或更多的異步函數,每個函數都依賴前一個異步函數的返回值需要做處理。在這種請求多了的特殊情況下我們手動維護會顯得很冗餘,因此可以通過循環來簡化邏輯:

方法 1: 通過數組方法 reduce 組合

const processFunctions = [processStep1, processStep2, processStep3, ...];

processFunctions.reduce((previousPromise, currentFunction) => {
  return previousPromise.then(result => currentFunction(result));
}, Promise.resolve(initialValue))
.then(finalResult => {
  console.log('最終結果:', finalResult);
})
.catch(error => {
  console.error('處理過程中發生錯誤:', error);
});

方法 2: 循環體和 async/await 的結合

async function handleSequentialTasks(tasks, initialResult) {
  let result = initialResult;

  try {
    for (const task of tasks) {
      result = await task(result);
    }
    console.log('最終結果:', result);
  } catch (error) {
    console.error('處理過程中發生錯誤:', error);
  }
}

const tasks = [task1, task2, task3, ...];
handleSequentialTasks(tasks, initialValue);

Promise 併發

併發(Concurrency)在編程中是指多個任務在同一時間段內啓動、執行,但不一定同時完成。在 JavaScript 的 Promise 中,併發通常涉及同時開始多個異步操作,並根據它們何時解決(fulfilled)或被拒絕(rejected)來進行相應的處理。

Promise 的併發會比串聯的場景更復雜。Promise 對象提供了幾個靜態方法來處理併發情況,讓開發者可以根據不同的使用場景選擇合適的方法:

Promise.all(iterable)

Promise.all 靜態方法接受一個 Promise 可迭代對象作為輸入,當傳入的數組中每個都被 resolve 後返回一個 Promise。若任意一個 Promise 被 reject 後就 reject。

const promise1 = fetch('https://example.com/api/data1');
const promise2 = fetch('https://example.com/api/data2');

Promise.all([promise1, promise2])
  .then(([data1, data2]) => {
    console.log('所有數據已加載:', data1, data2);
  })
  .catch(error => {
    console.error('加載數據時發生錯誤:', error);
  });

Promise.allSettled(iterable)

Promise.allSettled 方法同樣接受一個 Promise 的可迭代對象。不同於 Promise.all,這個方法等待所有傳入的 Promise 都被解決(無論是 fulfilled 或 rejected),然後返回一個 Promise,它解決為一個數組,每個數組元素代表對應的 Promise 的結果。這使得無論成功還是失敗,你都可以得到每個 Promise 的結果。

Promise.allSettled([
  Promise.resolve(33),
  new Promise((resolve) => setTimeout(() => resolve(66), 0)),
  99,
  Promise.reject(new Error("an error")),
]).then((values) => console.log(values));

// [
//   { status: 'fulfilled', value: 33 },
//   { status: 'fulfilled', value: 66 },
//   { status: 'fulfilled', value: 99 },
//   { status: 'rejected', reason: Error: an error }
// ]

Promise.race(iterable)

Promise.race 方法接受一個 Promise 的可迭代對象,但與 Promise.allPromise.allSettled 不同,它不等待所有的 Promise 都被解決。相反,Promise.race 返回一個 Promise,它解決或被拒絕取決於傳入的迭代對象中哪個 Promise 最先解決或被拒絕。

function sleep(time, value, state) {
  return new Promise((resolve, reject) => {
    setTimeout(() => {
      if (state === "fulfill") {
        return resolve(value);
      } else {
        return reject(new Error(value));
      }
    }, time);
  });
}

const p1 = sleep(500, "one", "fulfill");
const p2 = sleep(100, "two", "fulfill");

Promise.race([p1, p2]).then((value) => {
  console.log(value); // "two"
  // Both fulfill, but p2 is faster
});

const p3 = sleep(100, "three", "fulfill");
const p4 = sleep(500, "four", "reject");

Promise.race([p3, p4]).then(
  (value) => {
    console.log(value); // "three"
    // p3 is faster, so it fulfills
  },
  (error) => {
    // Not called
  },
);

const p5 = sleep(500, "five", "fulfill");
const p6 = sleep(100, "six", "reject");

Promise.race([p5, p6]).then(
  (value) => {
    // Not called
  },
  (error) => {
    console.error(error.message); // "six"
    // p6 is faster, so it rejects
  },
);

Promise.any(iterable)

Promise.any 接受一個 Promise 的可迭代對象,並返回一個 Promise。它解決為迭代對象中第一個被解決的 Promise 的結果。如果所有的 Promise 都被拒絕,Promise.any 會返回一個 AggregateError 實例。

const promise1 = new Promise((resolve, reject) => {
  setTimeout(resolve, 500, "one");
});

const promise2 = new Promise((resolve, reject) => {
  setTimeout(reject, 100, "two");
});

Promise.race([promise1, promise2])
  .then((value) => {
    console.log("succeeded with value:", value);
  })
  .catch((reason) => {
    // Only promise1 is fulfilled, but promise2 is faster
    console.error("failed with reason:", reason);
  });
// failed with reason: two

控制批次

JavaScript 默認提供的併發處理函數很方便我們根據業務場景的不同來處理請求,但顯然我們工作中所遇到的需求得考慮更復雜的情況,還需要進一步的封裝和擴展我們的 API。

在服務器端編程,我們經常遇到需要批量處理數據的場景。例如,批量修改數據庫中的用户數據。在這種情況下,由於數據庫操作的性能限制或者 API 調用限制,我們不能直接一口氣修改全部,因為短時間內發出太多的請求數據庫也會處理不來導致應用性能下降。因此,我們需要一種方法來限制同時進行的操作任務數,以保證程序的效率和穩定性。

我們代入實際業務場景:假設有一個社區組織了一次大型户外活動,活動吸引了大量參與者進行在線報名和付費。由於突發情況(比如惡劣天氣或其他不可抗力因素),活動不得不取消。這時,組織者需要對所有已付費的參與者進行退款。

活動組織者發起「解散活動」後,服務端接收到請求後當然也不能一次性全部執行退款的操作啦,畢竟一場活動説不定有上千人。因此我們需要分批次去處理。

在上述社區活動退款的例子中,服務器端處理退款請求的一個有效方法是實施分批次併發控制。這種方法不僅保護了後端服務免受過載,還確保了整個退款過程的可管理性和可靠性。

分批次處理時有以下關鍵問題需要考慮:

  1. 批次大小:確定每個批次中處理的退款請求數量。這個數字應基於服務器的處理能力和支付網關的限制來確定。
  2. 批次間隔:設置每個批次之間的時間間隔。這有助於避免短時間內發出過多請求,從而減輕對數據庫和支付網關的壓力。
  3. 錯誤處理:在處理退款請求時,應妥善處理可能發生的錯誤,並確保能夠重新嘗試失敗的退款操作。

簡易版併發控制

將所有待處理的異步任務(如退款請求)存放在一個 tasks 數組中,在調用併發請求前將 tasks 數組分割成多個小批次,每個批次包含固定數量的請求。每當前一個批次處理完後,才處理下一個批次的請求,直到所有批次的請求都被處理完畢:

// 假設這些是返回 Promise 的函數
const tasks = [task1, task2, task3, ...];

// 分割任務數組為批次
function splitIntoBatches(tasks, batchSize) {
  let batches = [];
  for (let i = 0; i < tasks.length; i += batchSize) {
    batches.push(tasks.slice(i, i + batchSize));
  }
  return batches;
}

// 處理單個批次的函數
function processBatch(batch) {
  return Promise.all(batch.map(task => task()));
}

// 控制併發的主函數
async function processTasksInBatches(tasks, batchSize) {
  const batches = splitIntoBatches(tasks, batchSize);

  for (const batch of batches) {
    await processBatch(batch);
    // 可以在這裏加入日誌記錄或其他處理
    console.log('一個批次處理完畢');
  }

  console.log('所有批次處理完畢');
}

// 調用主函數,假設每批次處理 10 個任務
processTasksInBatches(tasks, 10);

這種寫法實現的併發簡單易懂,也易於維護,在一些併發壓力不大,比較簡單的業務場景來看是足夠了。

但如果我們將這種處理方式放在時序圖上進行分析,就能發現服務器可能有能力處理更多的併發任務,而這種方法可能沒有充分利用可用資源。每個批次開始前會依賴於上一個批次中請求響應時間最慢的那一個,因此我們還可以進一步考慮優化併發實現方案。

動態任務隊列

在之前的 “控制批次” 方法中,我們發現固定處理批次的侷限性,尤其是在併發任務數量較大時可能導致的資源利用不足。為了解決這個問題,我們可以考慮採用一種更靈活的方法:維護一個動態的任務隊列來處理異步請求:

  • 任務隊列:創建一個任務隊列,其中包含所有待處理的異步任務。
  • 動態出隊和入隊:當隊列中的任務完成時,它會被移出隊列,同時根據當前的系統負載和任務處理能力,從待處理任務列表中拉取新的任務進入隊列。
  • 併發數控制:設置一個最大併發數,確保任何時候處理中的任務數量不會超過這個限制。

我們封裝一個函數,提供 concurrency 參數作為併發限制:

function parallelLimit(tasks, {concurrency = 10}) {
  const results = [];
  const executing = new Set();

  let currentlyRunning = 0;
  let currentIndex = 0;

  return new Promise((resolve) => {
    const next = () => {
      if (currentIndex < tasks.length) {
        // 取出記錄數,準備執行
        const index = currentIndex;
        const task = tasks[index];

        currentIndex += 1
        currentlyRunning += 1;

        const resultPromise = task().then((result) => {
          // 任務執行完畢,更新運行數、保存結果
          currentlyRunning -= 1;
          results[index] = result;
          executing.delete(resultPromise);

          // 開啓下一個任務
          next();
        });

        executing.add(resultPromise);

        // 當前運行的任務數小於限制並且還有任務未開始時,繼續添加任務
        if (currentlyRunning < concurrency && currentIndex < tasks.length) {
          next();
        }
      } else if (currentlyRunning === 0) {
        // 所有任務都已完成
        resolve(results);
      }
    };

    // 初始化
    for (let i = 0; i < Math.min(concurrency, tasks.length); i += 1) {
      next();
    }
  });
}

該函數會在初始階段會按照併發數先同步執行指定任務數,若某個任務執行完畢後,在執行完畢的回調中會喚醒下一個任務,直至任務隊列執行完畢。

以下添加一些測試數據用於測試:

function asyncTask(id) {
  return new Promise((resolve) => {
    setTimeout(() => {
      console.log(`任務 ${id} 完成`);
      resolve(`結果 ${id}`);
    }, Math.random() * 2000);
  });
}
const taskArray = Array.from({ length: 10 }, (_, i) => () => asyncTask(i + 1));

parallelLimit(taskArray, {concurrency: 3}).then((results) => {
  console.log('所有任務完成:', results);
});

第三方庫

在實際的項目開發中,特別是面臨複雜的併發處理需求時,我們更多會考慮使用成熟的第三庫來處理業務問題,它們具備更完善的測試用例來檢驗邊界情況。

處理併發的熱門庫有 RxJSp-mapasync.js

  • RxJS 是一個以響應式編程為核心的庫,竟然搭配 Angular 在網頁端搭配使用,提供了豐富的操作符和方法來處理異步事件和數據流。
  • p-mapasync.js 包的體積更小,更適合在服務端中使用。p-map 專注於提供併發控制功能,而 async.js 提供包括併發控制、隊列管理等廣泛的異步處理模式,功能會更全。

筆者在 Node.js 環境下只需要處理併發問題,故用的 p-map 會更多一些。下面簡要介紹 p-map 的使用:

import pMap from 'p-map';

const list = Array.from({ length: 10 }, (_, i) => i)

pMap(list, asyncTask, { concurrency: 3 })
  .then((results) => {
    console.log('所有任務完成:', results);
  });

p-map 的源碼實現很精簡,建議想深入複習併發的同學去閲讀其底層代碼的實現作為參考思路。

總結

在本文中,我們首先回顧了 Promise 的基本概念及其在 JavaScript 異步編程中的常用方法。通過這個基礎,我們能夠更好地理解如何有效地處理和組織異步代碼。

隨後,我們深入到併發處理的實際應用場景,探討了如何根據具體需求選擇合適的併發實現策略。我們討論了從簡單的批次控制到更復雜的動態任務隊列的不同方法,展示了在不同場景下優化異步任務處理的多種可能性。

但值得注意的是,我們自行實現的併發控制工具在沒有做足測試用例測試時,可能不適合直接應用於生產環境。在實際的項目開發中,選擇成熟且持續維護的第三方庫往往是更安全和高效的選擇。比如筆者選擇的 p-map 穩定性和可靠性相比上文簡單實現的版本將會更好。


參考資料

  • MDN | async 函數
  • GitHub | p-map

Add a new 评论

Some HTML is okay.