本文首發於公眾號:Hunter後端
原文鏈接:Python筆記六之多進程
在 Python 裏,我們使用 multiprocessing 這個模塊來進行多進程的操作。
multiprocessing 模塊通過創建子進程的方式來運行多進程,因此繞過了 Python 裏 GIL 的限制,可以充分利用機器上的多個處理器。
1、多進程使用示例
多進程的使用方式和多線程的方式類似,這裏使用到的是 multiprocessing.Process 類,下面是一個簡單的示例:
from multiprocessing import Process
import time
def f(x):
if x % 2 == 1:
time.sleep(x+1)
print(x * x)
return x * x
def test_multi_process():
processes = []
for i in range(5):
processes.append(Process(target=f, args=(i,)))
for p in processes:
p.start()
for p in processes:
p.join(0.5)
for p in processes:
print(p, p.is_alive(), p.pid, p._parent_pid)
if __name__ == "__main__":
test_multi_process()
在上面的示例中,test_multi_process() 函數裏使用多進程的方式調用 f 函數,和多線程的調用方式一致,通過 start() 方法啓動進程活動,使用 join() 方法阻塞調用其的進程。
接下來介紹一下 multiprocessing.Process 的一些方法和屬性。
1. run()
表示進程活動的方法,可以在子類中重載此方法,比如多線程筆記的操作裏重寫 run() 對函數執行報錯進行了處理,並返回了執行結果
2. start()
啓動進程活動,將對象的 run() 方法在一個單獨的進程中調用
3. join()
阻塞調用 join() 方法的進程,在上面的示例中也就是父進程,默認值為 None,也就表示阻塞操作。
如果設置為其他正數值,那麼則最多會阻塞多少秒,比如上面的示例為 0.5 秒,如果超時,那麼父進程則會繼續往後執行。
比如上面的示例輸出結果如下:
0
4
16
<Process name='Process-1' pid=6600 parent=24248 stopped exitcode=0> False 6600 24248
<Process name='Process-2' pid=4368 parent=24248 started> True 4368 24248
<Process name='Process-3' pid=13024 parent=24248 stopped exitcode=0> False 13024 24248
<Process name='Process-4' pid=3288 parent=24248 started> True 3288 24248
<Process name='Process-5' pid=16880 parent=24248 stopped exitcode=0> False 16880 24248
1
9
在打印每個進程的信息時,f() 函數內部進行 sleep 的進程還沒有執行結束,但是進程已經超時了,所以不再阻塞父進程向下執行。
4. is_alive()
上面有打印出信息,返回布爾值,表示該進程是否還活着。
5. pid 和 parent_pid
上面使用 .pid 和 ._parent_pid 屬性打印出了每個進程的 id 和其父進程的 id。
2、進程池
進程使用的對象是 multiprocessing.pool.Pool()。
接受 processes 參數為進程數,表示要使用的工作進程數目,如果不傳入,則默認使用 cpu 的核數,根據 os.cpu_count() 獲取。
接下來分別使用示例介紹 multiprocessing.pool 下的幾個調用方法,進程池的使用可以使用 map() 和 starmap() 兩個函數。
1. map()
map() 接受兩個參數,func 表示多進程要執行的函數,iterable 表示要執行的 func 函數輸入的參數的迭代對象。
這裏需要注意一下,map() 函數使用的 func 函數只能接受一個參數,比如我們前面定義的 f 函數,下面是其使用示例:
def f(x):
return x * x
def test_pool_map():
with Pool(processes=4) as pool:
results = pool.map(func=f, iterable=range(10))
print(results)
2. starmap()
starmap() 函數與 map() 使用方法類似,但是 iterable 迭代參數的元素是 func 函數的多個參數,比如我們想要對下面的 add() 函數使用多進程:
def f_add(x, y):
return x + y
它的調用方式如下:
def test_pool_starmap():
with Pool(processes=4) as pool:
results = pool.starmap(func=f_add, iterable=zip(range(6), range(6, 12)))
print(results)
這裏返回的 results 是一個列表,元素是每個進程執行的函數的返回結果。
3、進程間交換對象
前面介紹了,多進程的運行方式是通過建立子進程的形式來操作,而不同進程間數據是不共享的,這一點不同於多線程。
因為多線程的操作是在同一個進程內實現的,所以線程間數據是共享數據資源的。
接下來介紹一下如何在進程間進行對象的交換,其實進程間進行對象的交換是一個子命題,更高層級的概括是在進程間進行通信,在官方的文檔中對其進行了細分,所以這裏也對其進行分類別的介紹。
在進程間進行對象交換的方式有兩種,一種是隊列,一種是管道。
1. 隊列
1) 隊列的代碼示例
這裏的模塊的引入是 multiprocess.Queue,這個類近似於是 queue.Queue 的克隆,以下是官方文檔的一個示例,內容是在父進程中創建一個隊列,然後在子進程中寫入數據,然後再在父進程中讀取:
from multiprocessing import Process, Queue
def f(q):
q.put([42, None, 'hello'])
if __name__ == '__main__':
q = Queue()
p = Process(target=f, args=(q,))
p.start()
print(q.get()) # prints "[42, None, 'hello']"
p.join()
隊列的寫入使用 put(),讀取使用 get()。
get() 還可以加上兩個參數,block 和 timeout,block 表示是否阻塞,timeout 表示獲取的超時時間。
接下來我們再實現一個功能,兩個子進程寫入數據,一個子進程讀取數據,代碼示例如下:
from multiprocessing import Queue, Process
def f_write(q, n, name):
for i in range(n):
q.put(f"{name}_{i}")
time.sleep(0.1)
def f_read(q):
while q.qsize() > 0:
print(q.get(block=False, timeout=1))
time.sleep(0.5)
def test_queue():
# 三個進程,一個寫進程,兩個讀進程
q = Queue()
q.put("origin_value")
q.put("b")
# p1 = Process(target=f_queue, args=(q, "c"))
# p2 = Process(target=f_queue, args=(q, ))
p1 = Process(target=f_write, args=(q, 5, "a"))
p2 = Process(target=f_write, args=(q, 8, "b"))
p3 = Process(target=f_read, args=(q,))
p1.start()
p2.start()
p3.start()
p1.join()
p2.join()
p3.join()
print("total: ", q.qsize())
if __name__ == "__main__":
test_queue()
2) 隊列的相關方法
關於隊列的相關函數,除了前面介紹的幾種,還有比如判斷隊列的長度,是否為空等。
a) Queue()
在定義一個隊列的時候,我們前面是直接定義 q=Queue(),不為其設置元素長度,而如果我們想要為其設置一個最大的長度,可以加上 maxsize 參數:
q = Queue(maxsize=3)
那麼隊列裏最多隻能有三個元素,而如果隊列滿了還往其中 put() 加入操作,則會阻塞,直到其他進程對其讀取其中的數據。
b) put()
put() 函數表示的是往隊列裏添加元素,元素的類型不限,添加數字,字符串,字典,列表都可以:
q = Queue()
q.put(1)
q.put({"a": 4})
q.put([1,3,4])
前面介紹了,如果隊列滿了,還往隊列裏進行 put() 操作,則會進入阻塞操作,可以通過添加 block 或者 timeout 來進行避免。
block 表示是否阻塞,為 True 的話則會進入阻塞等待狀態。False 的話則會引發異常。
timeout 表示超時,嘗試往隊列裏添加數據,超出等待時間同樣已發隊列已滿的異常。
c) get()
get() 函數表示從隊列中讀取元素,隊列的寫入和讀取的原則是先入先出,最先進去的最先出來。
而為了避免隊列為空的情況下進行 get() 進入阻塞狀態,get() 可以使用兩個參數,一個是 block,表示是否阻塞,一個是 timeout,表示超時時間。
如果隊列為空還進行 get() 操作,使用上面這兩個操作則會 raise 一個 Empty 的 error。
d) qsize()
返回隊列的長度,但由於多進程或多線程的上下文,這個數字是不可靠的。
e) empty()
如果隊列是空的,則返回 True,否則返回 False,由於多進程或多線程的環境,該狀態是不可靠的。
f) full()
如果隊列設置了 maxsize 參數,那麼如果隊列滿了,則返回 True,否則返回 False,由於多進程或多線程的環境,該狀態是不可靠的。
g) close()
關閉隊列,如果執行了 q.close(),再往裏面添加元素執行 q.put() 操作,則會引發報錯。
2. 管道
1) 管道的相關函數
管道的引入方式如下:
from multiprocessing import Pipe
管道的定義可以直接實例化 Pipe,返回管道的兩端:
conn1, conn2 = Pipe()
默認情況下,Pipe() 的參數 duplex 值為 True,表示管道是雙工的,也就是可以雙向通信的,比如 conn1 可以寫入,也可以讀出,conn2 可以寫入也可以讀出數據。
而如果手動設置 duplex 為 False,那麼管道則是單向的,conn1 只能用於接收消息,conn2 只能發送消息。
管道用於發送和接收的函數分別如下:
發送信息
conn.send(obj)
發送的對象可以是字符串,也可以是其他對象,比如列表,字典等。
接收信息
conn.recv()
關閉連接對象
我們可以使用 close() 來關閉連接對象,當連接對象被垃圾回收時會自動調用:
conn.close()
判斷連接對象中是否有可以讀取的數據
如果我們直接使用 conn.recv() 的時候,如果管道內沒有可接收的對象,會進入阻塞狀態,直到管道內傳入數據。
我們可以使用 poll() 函數判斷管道內是否有可以讀取的數據,返回的是一個布爾型數據,表示是否有數據:
has_data = conn.poll()
但是如果不設置超時時間,同樣會進入等待狀態,所以可以設置一個最大阻塞秒數:
has_data = conn.pool(timeout=3) # 等待 3 秒
2) 管道的代碼示例
接下來我們用下面的代碼來進行管道的雙工測試,即從管道的兩端分別寫入和讀取數據。
from multiprocessing import Process, Pipe
def send_info(conn, info):
conn.send(info)
conn.close()
def read_info(conn):
while conn.poll(timeout=2):
info = conn.recv()
print(info)
def test_pipe():
# 兩個 conn 分別都往裏面讀和寫
parent_conn, child_conn = Pipe()
# p1 向 child 管道寫入
print("id out of func: ", id(child_conn))
p1 = Process(target=send_info, args=(child_conn, "send_info_from_child"))
p1.start()
p1.join()
# p2 從 parent 管道讀取
p2 = Process(target=read_info, args=(parent_conn,))
p2.start()
p2.join()
# p3 向 parent 管道寫入
p3 = Process(target=send_info, args=(parent_conn, "send_info_from_parent"))
p3.start()
p3.join()
# p4 從 child 管道讀取
p4 = Process(target=read_info, args=(child_conn,))
p4.start()
p4.join()
if __name__ == "__main__":
test_pipe()
注意 :如果兩個進程(或線程)同時嘗試讀取或寫入管道的 同一 端,則管道中的數據可能會損壞。當然,在不同進程中同時使用管道的不同端的情況下不存在損壞的風險。
4、進程間同步
與多線程一樣,多進程也可以使用鎖來確保一次只有一個進程來執行一個操作,比如有一個打印到標準輸出的操作,我們需要確保其打印的日誌不紊亂,就可以使用下面的操作:
from multiprocessing import Process, Lock
def f(l, i):
l.acquire()
try:
print("hello ", i)
finally:
l.release()
if __name__ == "__main__":
lock = Lock()
for num in range(10):
Process(target=f, args=(lock, num)).start()
而如果不使用鎖,我們重寫 f 函數如下:
def f(l, i):
print("hello ", i)
多執行幾次,我們可以看到控制枱的輸出會出現錯亂的情況,這樣就可能對輸出信息不能直觀查看,比如:
hello 2
hello 0
hello 4
hello hello 3
1
hello 5
hello 6
hello 8
hello 9
hello 7
5、進程間共享狀態
在併發編程的時候,應當儘量避免使用共享狀態,尤其是多進程操作時,但如果真的有這個需求,需要共享一些數據,multiprocessing 提供了兩種方法,一種是共享內存,一種是服務進程。
1. 共享內存
我們可以使用 Value 或者 Array 將數據存儲在共享內存映射中。
Value 是存儲的單個變量,Array 存儲的是數組,注意下,這裏的 Value 和 Array 在定義的時候都需要指定元素類型。
其引入及代碼示例如下:
from multiprocessing import Process, Value, Array
def f(n, a):
n.value = 5
a[0] = 100
if __name__ == "__main__":
num = Value('d', 1)
arr = Array('i', range(5))
print(num.value)
print(arr[:])
p = Process(target=f, args=(num, arr))
p.start()
p.join()
print(num.value)
print(arr[:])
其中,引入的方式可以直接從 multiprocessing 中引入,在定義 Value 和 Array 的時候,第一個參數是 'd' 和 'i',分別表示類型是雙精度浮點數和有符號整數。
這些共享對象將是進程和線程安全的。
更多的關於共享內存的信息,可以使用 multiprocessing.sharedctypes 模塊。
2. 服務進程
我們可以使用 Manager() 返回的管理對象控制一個服務進程,這個進程還可以保存 Python 對象並允許其他進程使用代理操作它們。
這個操作的意思就是使用 Manager() 會跟多進程的操作方式一樣,創建一個子進程,然後將一些需要共享的數據都放到這個子進程裏,其他子進程可以操作這個子進程的數據來達到數據共享的目的。
Manager() 支持的數據類型有:list,dict,Namespace,Lock,Value,Array 等,下面介紹一下代碼示例:
from multiprocessing import Process, Manager
def f(d, l):
d["a"] = 1
d["b"] = 2
l[0] = 100
if __name__ == "__main__":
with Manager() as manager:
d = manager.dict()
l = manager.list(range(5))
p = Process(target=f, args=(d, l))
p.start()
p.join()
print(d)
print(l)
使用服務進程的管理器比使用共享內存對象更靈活,因為它們可以支持任意對象類型。
此外,單個管理器可以通過網絡由不同計算機上的進程共享。但是,它們比使用共享內存慢。
如果想獲取更多後端相關文章,可掃碼關注閲讀: