博客 / 詳情

返回

Python筆記六之多進程

本文首發於公眾號:Hunter後端

原文鏈接:Python筆記六之多進程

在 Python 裏,我們使用 multiprocessing 這個模塊來進行多進程的操作。

multiprocessing 模塊通過創建子進程的方式來運行多進程,因此繞過了 Python 裏 GIL 的限制,可以充分利用機器上的多個處理器。

1、多進程使用示例

多進程的使用方式和多線程的方式類似,這裏使用到的是 multiprocessing.Process 類,下面是一個簡單的示例:

from multiprocessing import Process
import time

def f(x):
    if x % 2 == 1:
        time.sleep(x+1)
    print(x * x)
    return x * x


def test_multi_process():
    processes = []

    for i in range(5):
        processes.append(Process(target=f, args=(i,)))

    for p in processes:
        p.start()

    for p in processes:
        p.join(0.5)

    for p in processes:
        print(p, p.is_alive(), p.pid, p._parent_pid)


if __name__ == "__main__":
    test_multi_process()

在上面的示例中,test_multi_process() 函數裏使用多進程的方式調用 f 函數,和多線程的調用方式一致,通過 start() 方法啓動進程活動,使用 join() 方法阻塞調用其的進程。

接下來介紹一下 multiprocessing.Process 的一些方法和屬性。

1. run()

表示進程活動的方法,可以在子類中重載此方法,比如多線程筆記的操作裏重寫 run() 對函數執行報錯進行了處理,並返回了執行結果

2. start()

啓動進程活動,將對象的 run() 方法在一個單獨的進程中調用

3. join()

阻塞調用 join() 方法的進程,在上面的示例中也就是父進程,默認值為 None,也就表示阻塞操作。

如果設置為其他正數值,那麼則最多會阻塞多少秒,比如上面的示例為 0.5 秒,如果超時,那麼父進程則會繼續往後執行。

比如上面的示例輸出結果如下:

0
4
16
<Process name='Process-1' pid=6600 parent=24248 stopped exitcode=0> False 6600 24248
<Process name='Process-2' pid=4368 parent=24248 started> True 4368 24248
<Process name='Process-3' pid=13024 parent=24248 stopped exitcode=0> False 13024 24248
<Process name='Process-4' pid=3288 parent=24248 started> True 3288 24248
<Process name='Process-5' pid=16880 parent=24248 stopped exitcode=0> False 16880 24248
1
9

在打印每個進程的信息時,f() 函數內部進行 sleep 的進程還沒有執行結束,但是進程已經超時了,所以不再阻塞父進程向下執行。

4. is_alive()

上面有打印出信息,返回布爾值,表示該進程是否還活着。

5. pid 和 parent_pid

上面使用 .pid 和 ._parent_pid 屬性打印出了每個進程的 id 和其父進程的 id。

2、進程池

進程使用的對象是 multiprocessing.pool.Pool()。

接受 processes 參數為進程數,表示要使用的工作進程數目,如果不傳入,則默認使用 cpu 的核數,根據 os.cpu_count() 獲取。

接下來分別使用示例介紹 multiprocessing.pool 下的幾個調用方法,進程池的使用可以使用 map() 和 starmap() 兩個函數。

1. map()

map() 接受兩個參數,func 表示多進程要執行的函數,iterable 表示要執行的 func 函數輸入的參數的迭代對象。

這裏需要注意一下,map() 函數使用的 func 函數只能接受一個參數,比如我們前面定義的 f 函數,下面是其使用示例:

def f(x):
    return x * x


def test_pool_map():
    with Pool(processes=4) as pool:
        results = pool.map(func=f, iterable=range(10))

    print(results)

2. starmap()

starmap() 函數與 map() 使用方法類似,但是 iterable 迭代參數的元素是 func 函數的多個參數,比如我們想要對下面的 add() 函數使用多進程:

def f_add(x, y):
    return x + y

它的調用方式如下:

def test_pool_starmap():
    with Pool(processes=4) as pool:
        results = pool.starmap(func=f_add, iterable=zip(range(6), range(6, 12)))
    
    print(results)

這裏返回的 results 是一個列表,元素是每個進程執行的函數的返回結果。

3、進程間交換對象

前面介紹了,多進程的運行方式是通過建立子進程的形式來操作,而不同進程間數據是不共享的,這一點不同於多線程。

因為多線程的操作是在同一個進程內實現的,所以線程間數據是共享數據資源的。

接下來介紹一下如何在進程間進行對象的交換,其實進程間進行對象的交換是一個子命題,更高層級的概括是在進程間進行通信,在官方的文檔中對其進行了細分,所以這裏也對其進行分類別的介紹。

在進程間進行對象交換的方式有兩種,一種是隊列,一種是管道。

1. 隊列

1) 隊列的代碼示例

這裏的模塊的引入是 multiprocess.Queue,這個類近似於是 queue.Queue 的克隆,以下是官方文檔的一個示例,內容是在父進程中創建一個隊列,然後在子進程中寫入數據,然後再在父進程中讀取:


from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())    # prints "[42, None, 'hello']"
    p.join()

隊列的寫入使用 put(),讀取使用 get()。

get() 還可以加上兩個參數,block 和 timeout,block 表示是否阻塞,timeout 表示獲取的超時時間。

接下來我們再實現一個功能,兩個子進程寫入數據,一個子進程讀取數據,代碼示例如下:

from multiprocessing import Queue, Process


def f_write(q, n, name):
    for i in range(n):
        q.put(f"{name}_{i}")
        time.sleep(0.1)


def f_read(q):
    while q.qsize() > 0:
        print(q.get(block=False, timeout=1))
        time.sleep(0.5)


def test_queue():
    # 三個進程,一個寫進程,兩個讀進程
    q = Queue()
    q.put("origin_value")
    q.put("b")

    # p1 = Process(target=f_queue, args=(q, "c"))
    # p2 = Process(target=f_queue, args=(q, ))
    p1 = Process(target=f_write, args=(q, 5, "a"))
    p2 = Process(target=f_write, args=(q, 8, "b"))
    p3 = Process(target=f_read, args=(q,))

    p1.start()
    p2.start()
    p3.start()

    p1.join()
    p2.join()
    p3.join()

    print("total: ", q.qsize())

if __name__ == "__main__":
    test_queue()

2) 隊列的相關方法

關於隊列的相關函數,除了前面介紹的幾種,還有比如判斷隊列的長度,是否為空等。

a) Queue()

在定義一個隊列的時候,我們前面是直接定義 q=Queue(),不為其設置元素長度,而如果我們想要為其設置一個最大的長度,可以加上 maxsize 參數:

q = Queue(maxsize=3)

那麼隊列裏最多隻能有三個元素,而如果隊列滿了還往其中 put() 加入操作,則會阻塞,直到其他進程對其讀取其中的數據。

b) put()

put() 函數表示的是往隊列裏添加元素,元素的類型不限,添加數字,字符串,字典,列表都可以:

q = Queue()
q.put(1)
q.put({"a": 4})
q.put([1,3,4])

前面介紹了,如果隊列滿了,還往隊列裏進行 put() 操作,則會進入阻塞操作,可以通過添加 block 或者 timeout 來進行避免。

block 表示是否阻塞,為 True 的話則會進入阻塞等待狀態。False 的話則會引發異常。

timeout 表示超時,嘗試往隊列裏添加數據,超出等待時間同樣已發隊列已滿的異常。

c) get()

get() 函數表示從隊列中讀取元素,隊列的寫入和讀取的原則是先入先出,最先進去的最先出來。

而為了避免隊列為空的情況下進行 get() 進入阻塞狀態,get() 可以使用兩個參數,一個是 block,表示是否阻塞,一個是 timeout,表示超時時間。

如果隊列為空還進行 get() 操作,使用上面這兩個操作則會 raise 一個 Empty 的 error。

d) qsize()

返回隊列的長度,但由於多進程或多線程的上下文,這個數字是不可靠的。

e) empty()

如果隊列是空的,則返回 True,否則返回 False,由於多進程或多線程的環境,該狀態是不可靠的。

f) full()

如果隊列設置了 maxsize 參數,那麼如果隊列滿了,則返回 True,否則返回 False,由於多進程或多線程的環境,該狀態是不可靠的。

g) close()

關閉隊列,如果執行了 q.close(),再往裏面添加元素執行 q.put() 操作,則會引發報錯。

2. 管道

1) 管道的相關函數

管道的引入方式如下:

from multiprocessing import Pipe

管道的定義可以直接實例化 Pipe,返回管道的兩端:

conn1, conn2 = Pipe()

默認情況下,Pipe() 的參數 duplex 值為 True,表示管道是雙工的,也就是可以雙向通信的,比如 conn1 可以寫入,也可以讀出,conn2 可以寫入也可以讀出數據。

而如果手動設置 duplex 為 False,那麼管道則是單向的,conn1 只能用於接收消息,conn2 只能發送消息。

管道用於發送和接收的函數分別如下:

發送信息

conn.send(obj)

發送的對象可以是字符串,也可以是其他對象,比如列表,字典等。

接收信息

conn.recv()

關閉連接對象

我們可以使用 close() 來關閉連接對象,當連接對象被垃圾回收時會自動調用:

conn.close()

判斷連接對象中是否有可以讀取的數據

如果我們直接使用 conn.recv() 的時候,如果管道內沒有可接收的對象,會進入阻塞狀態,直到管道內傳入數據。

我們可以使用 poll() 函數判斷管道內是否有可以讀取的數據,返回的是一個布爾型數據,表示是否有數據:

has_data = conn.poll()

但是如果不設置超時時間,同樣會進入等待狀態,所以可以設置一個最大阻塞秒數:

has_data = conn.pool(timeout=3)  # 等待 3 秒

2) 管道的代碼示例

接下來我們用下面的代碼來進行管道的雙工測試,即從管道的兩端分別寫入和讀取數據。

from multiprocessing import Process, Pipe


def send_info(conn, info):
    conn.send(info)
    conn.close()


def read_info(conn):
    while conn.poll(timeout=2):
        info = conn.recv()
        print(info)


def test_pipe():
    # 兩個 conn 分別都往裏面讀和寫
    parent_conn, child_conn = Pipe()

    # p1 向 child 管道寫入
    print("id out of func: ", id(child_conn))
    p1 = Process(target=send_info, args=(child_conn, "send_info_from_child"))
    p1.start()
    p1.join()

    # p2 從 parent 管道讀取
    p2 = Process(target=read_info, args=(parent_conn,))
    p2.start()
    p2.join()

    # p3 向 parent 管道寫入
    p3 = Process(target=send_info, args=(parent_conn, "send_info_from_parent"))
    p3.start()
    p3.join()

    # p4 從 child 管道讀取
    p4 = Process(target=read_info, args=(child_conn,))
    p4.start()
    p4.join()


if __name__ == "__main__":
    test_pipe()

注意 :如果兩個進程(或線程)同時嘗試讀取或寫入管道的 同一 端,則管道中的數據可能會損壞。當然,在不同進程中同時使用管道的不同端的情況下不存在損壞的風險。

4、進程間同步

與多線程一樣,多進程也可以使用鎖來確保一次只有一個進程來執行一個操作,比如有一個打印到標準輸出的操作,我們需要確保其打印的日誌不紊亂,就可以使用下面的操作:

from multiprocessing import Process, Lock

def f(l, i):
    l.acquire()
    try:
        print("hello ", i)
    finally:
        l.release()

if __name__ == "__main__":
    lock = Lock()
    for num in range(10):
        Process(target=f, args=(lock, num)).start()

而如果不使用鎖,我們重寫 f 函數如下:

def f(l, i):
    print("hello ", i)

多執行幾次,我們可以看到控制枱的輸出會出現錯亂的情況,這樣就可能對輸出信息不能直觀查看,比如:

hello  2
hello  0
hello  4
hello hello  3
 1
hello  5
hello  6
hello  8
hello  9
hello  7

5、進程間共享狀態

在併發編程的時候,應當儘量避免使用共享狀態,尤其是多進程操作時,但如果真的有這個需求,需要共享一些數據,multiprocessing 提供了兩種方法,一種是共享內存,一種是服務進程。

1. 共享內存

我們可以使用 Value 或者 Array 將數據存儲在共享內存映射中。

Value 是存儲的單個變量,Array 存儲的是數組,注意下,這裏的 Value 和 Array 在定義的時候都需要指定元素類型。

其引入及代碼示例如下:

from multiprocessing import Process, Value, Array


def f(n, a):
    n.value = 5
    a[0] = 100


if __name__ == "__main__":
    num = Value('d', 1)
    arr = Array('i', range(5))
    print(num.value)
    print(arr[:])

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print(num.value)
    print(arr[:])

其中,引入的方式可以直接從 multiprocessing 中引入,在定義 Value 和 Array 的時候,第一個參數是 'd' 和 'i',分別表示類型是雙精度浮點數和有符號整數。

這些共享對象將是進程和線程安全的。

更多的關於共享內存的信息,可以使用 multiprocessing.sharedctypes 模塊。

2. 服務進程

我們可以使用 Manager() 返回的管理對象控制一個服務進程,這個進程還可以保存 Python 對象並允許其他進程使用代理操作它們。

這個操作的意思就是使用 Manager() 會跟多進程的操作方式一樣,創建一個子進程,然後將一些需要共享的數據都放到這個子進程裏,其他子進程可以操作這個子進程的數據來達到數據共享的目的。

Manager() 支持的數據類型有:list,dict,Namespace,Lock,Value,Array 等,下面介紹一下代碼示例:

from multiprocessing import Process, Manager


def f(d, l):
    d["a"] = 1
    d["b"] = 2
    l[0] = 100


if __name__ == "__main__":
    with Manager() as manager:
        d = manager.dict()
        l = manager.list(range(5))

        p = Process(target=f, args=(d, l))
        p.start()
        p.join()

        print(d)
        print(l)

使用服務進程的管理器比使用共享內存對象更靈活,因為它們可以支持任意對象類型。

此外,單個管理器可以通過網絡由不同計算機上的進程共享。但是,它們比使用共享內存慢。

如果想獲取更多後端相關文章,可掃碼關注閲讀:

user avatar aipaobudehoutao 頭像
1 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.