动态

详情 返回 返回

Python 異步協程:從 async/await 到 asyncio 再到 async with - 动态 详情

在 Python 3.8 以後的版本中,異步編程變得越來越重要。本文將系統介紹 Python 標準庫中的異步編程工具,帶領大家掌握 async/await 語法和 asyncio 的使用。

從一個簡單的場景開始

假設我們在處理一些耗時的 I/O 操作,比如讀取多個文件或處理多個數據。為了模擬這種場景,我們先用 time.sleep() 來代表耗時操作:

import time
import random

def process_item(item):
    # 模擬耗時操作
    print(f"處理中:{item}")
    process_time = random.uniform(0.5, 2.0)
    time.sleep(process_time)
    return f"處理完成:{item},耗時 {process_time:.2f} 秒"

def process_all_items():
    items = ["任務A", "任務B", "任務C", "任務D"]
    results = []
    for item in items:
        result = process_item(item)
        results.append(result)
    return results

if __name__ == "__main__":
    start = time.time()
    results = process_all_items()
    end = time.time()
    
    print("\n".join(results))
    print(f"總耗時:{end - start:.2f} 秒")
處理中:任務A
處理中:任務B
處理中:任務C
處理中:任務D
處理完成:任務A,耗時 1.97 秒
處理完成:任務B,耗時 1.28 秒
處理完成:任務C,耗時 0.66 秒
處理完成:任務D,耗時 1.80 秒
總耗時:5.72 秒

這段代碼的問題很明顯:每個任務都必須等待前一個任務完成才能開始。如果有4個任務,每個任務平均耗時1秒,那麼總耗時就接近4秒。

認識 async/await

Python 引入了 async/await 語法來支持異步編程。當我們在函數定義前加上 async 關鍵字時,這個函數就變成了一個"協程"(coroutine)。而 await 關鍵字則用於等待一個協程完成。讓我們改寫上面的代碼:

import asyncio
import random
import time

async def process_item(item):
    print(f"處理中:{item}")
    # async 定義的函數變成了協程
    process_time = random.uniform(0.5, 2.0)
    # time.sleep() 換成 asyncio.sleep()
    await asyncio.sleep(process_time)  # await 等待異步操作完成
    return f"處理完成:{item},耗時 {process_time:.2f} 秒"

async def process_all_items():
    items = ["任務A", "任務B", "任務C", "任務D"]
    # 創建任務列表
    tasks = [
        asyncio.create_task(process_item(item))
        for item in items
    ]
    print("開始處理")
    results = await asyncio.gather(*tasks)
    return results

async def main():
    start = time.time()
    results = await process_all_items()
    end = time.time()
    
    print("\n".join(results))
    print(f"總耗時:{end - start:.2f} 秒")

if __name__ == "__main__":
    asyncio.run(main())
開始處理
處理中:任務A
處理中:任務B
處理中:任務C
處理中:任務D
處理完成:任務A,耗時 1.97 秒
處理完成:任務B,耗時 0.80 秒
處理完成:任務C,耗時 0.83 秒
處理完成:任務D,耗時 1.46 秒
總耗時:1.97 秒

讓我們詳細解釋這段代碼的執行過程:

  1. 當函數被 async 關鍵字修飾後,調用該函數不會直接執行函數體,而是返回一個協程對象
  2. await 關鍵字只能在 async 函數內使用,它表示"等待這個操作完成後再繼續"
  3. asyncio.create_task() 將協程包裝成一個任務,該任務會被事件循環調度執行
  4. asyncio.gather() 併發運行多個任務,並等待它們全部完成
  5. asyncio.run() 創建事件循環,運行 main() 協程,直到它完成

使用 asyncio.wait_for 添加超時控制

在實際應用中,我們往往需要為異步操作設置超時時間:

import asyncio
import random
import time

async def process_item(item):
    process_time = random.uniform(0.5, 2.0)
    try:
        # 設置1秒超時
        await asyncio.wait_for(
            asyncio.sleep(process_time),
            timeout=1.0
        )
        return f"處理完成:{item},耗時 {process_time:.2f} 秒"
    except asyncio.TimeoutError:
        return f"處理超時:{item}"

async def main():
    items = ["任務A", "任務B", "任務C", "任務D"]
    tasks = [
        asyncio.create_task(process_item(item))
        for item in items
    ]
    
    start = time.time()
    results = await asyncio.gather(*tasks, return_exceptions=True)
    end = time.time()
    
    print("\n".join(results))
    print(f"總耗時:{end - start:.2f} 秒")

if __name__ == "__main__":
    asyncio.run(main())
處理超時:任務A
處理完成:任務B,耗時 0.94 秒
處理超時:任務C
處理完成:任務D,耗時 0.78 秒
總耗時:1.00 秒

使用異步上下文管理器

Python 中的 with 語句可以用於資源管理,類似地,異步編程中我們可以使用 async with 。一個類要支持異步上下文管理,需要實現 __aenter____aexit__ 方法:

import asyncio
import random

class AsyncResource:
    async def __aenter__(self):
        # 異步初始化資源
        print("正在初始化資源...")
        await asyncio.sleep(0.1)
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        # 異步清理資源
        print("正在清理資源...")
        await asyncio.sleep(0.1)
    
    async def process(self, item):
        # 異步處理任務
        print(f"正在處理任務:{item}")
        process_time = random.uniform(0.5, 2.0)
        await asyncio.sleep(process_time)
        return f"處理完成:{item},耗時 {process_time:.2f} 秒"

async def main():
    items = ["任務A", "任務B", "任務C"]
    
    async with AsyncResource() as resource:
        tasks = [
            asyncio.create_task(resource.process(item))
            for item in items
        ]
        results = await asyncio.gather(*tasks)
    
    print("\n".join(results))

if __name__ == "__main__":
    asyncio.run(main())
正在初始化資源...
正在處理任務:任務A
正在處理任務:任務B
正在處理任務:任務C
正在清理資源...
處理完成:任務A,耗時 1.31 秒
處理完成:任務B,耗時 0.77 秒
處理完成:任務C,耗時 0.84 秒

使用事件循環執行阻塞操作 run_in_executor

在異步編程中,我們可能會遇到一些無法避免的阻塞操作(比如調用傳統的同步API)。這時,asyncio.get_running_loop()run_in_executor 就顯得特別重要:

import asyncio
import time
import requests  # 一個同步的HTTP客户端庫

async def blocking_operation():
    # 獲取當前事件循環
    loop = asyncio.get_running_loop()

    # 在線程池中執行阻塞操作
    result = await loop.run_in_executor(
        None,  # 使用默認的線程池執行器
        requests.get,  # 要執行的阻塞函數
        'http://httpbin.org/delay/1'  # 函數參數
    )
    return result.status_code

async def non_blocking_operation():
    await asyncio.sleep(1)
    return "非阻塞操作完成"

async def main():
    # 同時執行阻塞和非阻塞操作
    tasks = [
        asyncio.create_task(blocking_operation()),
        asyncio.create_task(non_blocking_operation())
    ]
    
    start = time.time()
    results = await asyncio.gather(*tasks)
    end = time.time()
    
    print(f"操作結果:{results}")
    print(f"總耗時:{end - start:.2f} 秒")

if __name__ == "__main__":
    asyncio.run(main())

輸出:

操作結果:[200, '非阻塞操作完成']
總耗時:1.99 秒

這個例子展示瞭如何在異步程序中優雅地處理同步操作。如果不使用 run_in_executor,阻塞操作會阻塞整個事件循環,導致其他任務無法執行:

  • requests.get() 是同步操作,會阻塞當前線程
  • 事件循環運行在主線程上
  • 如果直接在協程中調用 requests.get() ,整個事件循環都會被阻塞
  • 其他任務無法在這期間執行
  • run_in_executor 會將阻塞操作放到另一個線程中執行
  • 主線程的事件循環可以繼續處理其他任務
  • 當線程池中的操作完成時,結果會被返回給事件循環

最佳實踐是:

  • 儘量使用原生支持異步的庫(如 aiohttp)
  • 如果必須使用同步庫,就用 run_in_executor
  • 對於 CPU 密集型任務也可以用 run_in_executor 放到進程池中執行

任務取消:優雅地終止異步操作

有時我們需要取消正在執行的異步任務,比如用户中斷操作或超時處理:

import asyncio
import random

async def long_operation(name):
    try:
        print(f"{name} 開始執行")
        while True:  # 模擬一個持續運行的操作
            await asyncio.sleep(0.5)
            print(f"{name} 正在執行...")
    except asyncio.CancelledError:
        print(f"{name} 被取消了")
        raise  # 重要:繼續傳播取消信號

async def main():
    # 創建三個任務
    task1 = asyncio.create_task(long_operation("任務1"))
    task2 = asyncio.create_task(long_operation("任務2"))
    task3 = asyncio.create_task(long_operation("任務3"))
    
    # 等待1秒後取消task1
    await asyncio.sleep(1)
    task1.cancel()
    
    # 等待2秒後取消其餘任務
    await asyncio.sleep(1)
    task2.cancel()
    task3.cancel()
    
    try:
        # 等待所有任務完成或被取消
        await asyncio.gather(task1, task2, task3, return_exceptions=True)
    except asyncio.CancelledError:
        print("某個任務被取消了")

if __name__ == "__main__":
    asyncio.run(main())

輸出:

任務1 開始執行
任務2 開始執行
任務3 開始執行
任務1 正在執行...
任務2 正在執行...
任務3 正在執行...
任務1 被取消了
任務2 正在執行...
任務3 正在執行...
任務2 正在執行...
任務3 正在執行...
任務2 被取消了
任務3 被取消了

這個例子展示瞭如何正確處理任務取消:

  1. 任務可以在執行過程中被取消
  2. 被取消的任務會拋出 CancelledError
  3. 我們應該適當處理取消信號,確保資源被正確清理

深入理解協程:為什麼需要 async/await?

協程(Coroutine)是一種特殊的函數,它可以在執行過程中暫停,並在之後從暫停的地方繼續執行。當我們使用 async 定義一個函數時,我們實際上是在定義一個協程:

import asyncio

# 這是一個普通函數
def normal_function():
    return "Hello"

# 這是一個協程
async def coroutine_function():
    await asyncio.sleep(1)
    return "Hello"

# 讓我們看看它們的區別
print(normal_function)      # <function normal_function at 0x1052cc040>
print(coroutine_function)   # <function coroutine_function at 0x1054b9790>

# 調用它們的結果不同
print(normal_function())    # 直接返回: "Hello"
print(coroutine_function()) # RuntimeWarning: coroutine 'coroutine_function' was never awaited
# <coroutine object coroutine_function at 0x105962e40>

await 如何與事件循環協作

協程(Coroutine)的核心在於它可以在執行過程中主動交出控制權,讓其他代碼有機會執行。讓我們通過一個詳細的例子來理解這個過程:

import asyncio

async def task1():
    print("任務1:開始")
    print("任務1:準備休眠")
    await asyncio.sleep(2)  # 關鍵點1:交出控制權
    print("任務1:休眠結束")

async def task2():
    print("任務2:開始")
    print("任務2:準備休眠")
    await asyncio.sleep(1)  # 關鍵點2:交出控制權
    print("任務2:休眠結束")

async def main():
    # 同時執行兩個任務
    await asyncio.gather(task1(), task2())

asyncio.run(main())

這段代碼的輸出會是:

任務1:開始
任務1:準備休眠
任務2:開始
任務2:準備休眠
任務2:休眠結束    # 1秒後
任務1:休眠結束    # 2秒後

讓我們詳細解釋執行過程:

  1. 當程序遇到 await asyncio.sleep(2) 時:

    • 這個 sleep 操作被註冊到事件循環中
    • Python 記錄當前的執行位置
    • task1 主動交出控制權
    • 重要:task1 並沒有停止運行,而是被暫停了,等待之後恢復
  2. 事件循環接管控制權後:

    • 尋找其他可以執行的協程(這裏是 task2)
    • 開始執行 task2,直到遇到 await asyncio.sleep(1)
    • task2 也交出控制權,被暫停
  3. 事件循環繼續工作:

    • 管理一個計時器,追蹤這兩個 sleep 操作
    • 1秒後,發現 task2 的 sleep 時間到了
    • 恢復 task2 的執行,打印"任務2:休眠結束"
    • 2秒到時,恢復 task1 的執行,打印"任務1:休眠結束"

這就像是一個指揮家(事件循環)在指揮一個管絃樂隊(多個協程):

  • 當某個樂器(協程)需要休息時,它舉手示意(await)
  • 指揮家看到後,立即指揮其他樂器演奏
  • 當休息時間到了,指揮家會示意這個樂器繼續演奏

代碼驗證:

import asyncio
import time

async def report_time(name, sleep_time):
    print(f"{time.strftime('%H:%M:%S')} - {name}開始")
    await asyncio.sleep(sleep_time)
    print(f"{time.strftime('%H:%M:%S')} - {name}結束")

async def main():
    # 同時執行多個任務
    await asyncio.gather(
        report_time("任務A", 2),
        report_time("任務B", 1),
        report_time("任務C", 3)
    )

asyncio.run(main())

輸出:

00:19:26 - 任務A開始
00:19:26 - 任務B開始
00:19:26 - 任務C開始
00:19:27 - 任務B結束
00:19:28 - 任務A結束
00:19:29 - 任務C結束

這種機制的優勢在於:

  1. 單線程執行,沒有線程切換開銷
  2. 協程主動交出控制權,而不是被操作系統強制切換
  3. 比起回調地獄,代碼更清晰易讀
  4. 錯誤處理更直觀,可以使用普通的 try/except

理解了這個機制,我們就能更好地使用異步編程:

  • await 的時候,其他協程有機會執行
  • 耗時操作應該是真正的異步操作(比如 asyncio.sleep
  • 不要在協程中使用阻塞操作,那樣會卡住整個事件循環

小結

Python 的異步編程主要依賴以下概念:

  1. async/await 語法:定義和等待協程
  2. asyncio 模塊:提供事件循環和任務調度
  3. Task 對象:表示待執行的工作單元
  4. 異步上下文管理器:管理異步資源

使用異步編程的關鍵點:

  1. I/O 密集型任務最適合使用異步編程
  2. 所有耗時操作都應該是真正的異步操作
  3. 注意處理超時和異常情況
  4. 合理使用 asyncio.gather() 和 asyncio.wait_for()

異步編程不是萬能的,但在處理 I/O 密集型任務時確實能帶來顯著的性能提升。合理使用這些工具,能讓我們的程序更高效、更優雅。

user avatar soroqer 头像 crossoverjie 头像 runyubingxue 头像 mangrandechangjinglu 头像 anonymous_5f6b14f11289a 头像 vistart 头像 tyltr 头像 liberhome 头像 5e4jkgqh 头像 changhao_flag 头像 phpnan 头像 hanhoudeniupai 头像
点赞 13 用户, 点赞了这篇动态!
点赞

Add a new 评论

Some HTML is okay.