Stories

Detail Return Return

Python多線程基礎(一) - Stories Detail

計算機有兩種常見的任務類型

  • 計算密集型,時間多用在I/O操作上,比如文件讀寫、網絡請求、數據庫查詢
  • I/O密集型,時間多用在計算上,如數值計算、圖像處理、排序、搜索

由於Python存在GIL(全局解釋器鎖),同一時間只有一個線程可以執行Python字節碼,使得在計算密集型任務中無法充分利用多核CPU,因此,Python的多線程一般用於I/O密集型任務。
注:上述Python指Python的官方實現CPython,一些其他的實現如PyPy、Jython都沒有GIL

使用Thread類創建線程

from time import sleep, perf_counter
def task():
    print("任務開始...")
    sleep(1)
    print("任務結束...")

if __name__ == "__main__":
    start = perf_counter()
    task()
    task()
    end = perf_counter()
    print(f"任務耗時:{end - start: 0.2f}s")

如果我們有兩個耗時1s的任務,全部放在主線程(main thread)執行

任務開始...
任務結束...
任務開始...
任務結束...
任務耗時: 2.00s

結果是大約是2s,我們可以使用多線程優化程序

from time import sleep, perf_counter
from threading import Thread
def task(task_id: int):
    print(f"任務 {task_id}開始...")
    sleep(1)
    print(f"任務 {task_id}結束...")

if __name__ == "__main__":
    threads = []
    start = perf_counter()
    for i in range(1, 6):
        t = Thread(target=task, args=(i, ))
        threads.append(t)
        t.start()
    for t in threads:
        t.join()
    end = perf_counter()
    print(f"任務耗時{end - start: 0.2f}s ")

使用Thread構造一個線程,target是目標函數(注意不要寫成函數調用),args是一個tuple,tuple中的元素會依次傳遞給目標函數作為參數
start啓動線程,join阻塞調用它的線程(這裏是主線程)直到子線程任務結束。

任務 1開始...
任務 2開始...
任務 3開始...
任務 4開始...
任務 5開始...
任務 1結束...
任務 2結束...
任務 4結束...
任務 3結束...
任務 5結束...
任務耗時 1.00s 

繼承Thread類

有時我們需要自定義線程行為,或者線程邏輯較為複雜,可以繼承Thread類封裝自己的線程。

import urllib.request
import urllib.error
from threading import Thread
class HttpRequestThread(Thread):
    def __init__(self, url: str):
        super().__init__()
        self.url = url

    def run(self):
        print(f"checking {self.url}")
        try:
            response = urllib.request.urlopen(self.url)
            print(response.code)
        except urllib.error.HTTPError as e:
            print(e.code)
        except urllib.error.URLError as e:
            print(e.reason)
            
if __name__ == "__main__":
    urls = [
        'http://httpstat.us/200',
        'http://httpstat.us/400'
    ]
    threads = [HttpRequestThread(url) for url in urls]
    for thread in threads:
        thread.start() # 這裏要調用start而不是run,否則不會開啓新的線程
    for thread in threads:
        thread.join()

繼承Thread類,重寫init方法添加需要的屬性。重寫run方法,定義線程的行為。
注意,啓動線程仍要調用start,不要直接調用run方法。

從線程中返回數據

要從線程中返回數據,可以擴展線程類定義屬性來返回值

from threading import Thread
import urllib.request
import urllib.error

class HttpRequestThread(Thread):
    def __init__(self, url: str):
        super().__init__()
        self.url = url
        self.http_stat_code = None
        self.reason = None

    def run(self):
        print(f"checking {self.url}")
        try:
            response = urllib.request.urlopen(self.url)
            self.http_stat_code = response.code
        except urllib.error.HTTPError as e:
            self.http_stat_code = e.code
        except urllib.error.URLError as e:
            self.reason = e.reason

if __name__ == "__main__":
    urls = [
        'http://httpstat.us/200',
        'http://httpstat.us/400'
    ]
    threads = [HttpRequestThread(url) for url in urls]
    [t.start() for t in threads]
    [t.join() for t in threads]
    [print(f"{t.url}: {t.http_stat_code}") for t in threads]

守護線程

守護線程是在後台執行的線程,不會影響程序的退出。用於定時任務、日誌、python垃圾回收
的循環引用檢查等。

from threading import Thread
import time

def show_timer():
    count = 0
    while True:
        time.sleep(1)
        count += 1
        print(f"已執行{count}秒")

if __name__ == "__main__":
    # thread = Thread(target=show_timer) # 非守護線程
    thread = Thread(target=show_timer)
    thread.start()

    answer = input("按任意鍵退出程序\n")
    print("Done")

線程不是守護線程,main thread結束後,線程並未結束,程序不會終止

from threading import Thread
import time

def show_timer():
    count = 0
    while True:
        time.sleep(1)
        count += 1
        print(f"已執行{count}秒")

if __name__ == "__main__":
    # thread = Thread(target=show_timer) # 非守護線程
    thread = Thread(target=show_timer, daemon=True)
    thread.start()

    answer = input("按任意鍵退出程序\n")
    print("Done")

創建線程時daemon=True設置為守護線程,此時main thread結束後,無論子線程是否結束,程序終止。

線程池

線程池是用於管理和複用線程的機制。預先創建一些線程,避免頻繁創建和銷燬線程的開銷
Executor是線程池(ThreadPoolExecutor)和進程池(ProcessPoolExecutor)的基類
Executor.submit為線程池分配任務, 返回一個future對象。

from concurrent.futures import ThreadPoolExecutor, Future
from time import sleep, perf_counter
def task(task_id: int):
    print(f"任務{task_id}開始...")
    sleep(1)
    return f"任務{task_id}結束"

if __name__ == "__main__":
    start = perf_counter()
    with ThreadPoolExecutor() as executor:
        f1: Future = executor.submit(task, 1)
        f2: Future = executor.submit(task, 2)
        print(f1.result())
        print(f2.result())
    end = perf_counter()
    print(f"程序耗時{end - start: 0.2f}秒")

結果

任務1開始...
任務2開始...
任務1結束
任務2結束
程序耗時 1.00秒

Process finished with exit code 0

Add a new Comments

Some HTML is okay.