前言
在使用 Python 的 multiprocessing 模塊時,你是否遇到過這些困惑:
- 為什麼子進程能訪問
self.url和self.headers這些實例屬性? - 子進程是如何獲取類方法、類屬性和模塊函數的?
- 為什麼數據庫連接不能放在
__init__中? if __name__ == '__main__':到底保護了什麼?
本文將深入剖析 Windows 平台下 Python 多進程的底層機制,徹底解答這些疑問。
一、核心機制:spawn 方式創建進程
1.1 Windows 的特殊性
Windows 不支持 fork(),因此使用 spawn 方式創建子進程:
# 子進程啓動 = 重新運行一遍 Python 腳本
# 本質上執行: python your_script.py
關鍵特點:
- 子進程會重新導入所有模塊
- 重新執行模塊級別的代碼
- 跳過
if __name__ == '__main__':保護的代碼塊
1.2 驗證實驗
import os
# 模塊級別代碼 - 每個進程都會執行
print(f"[PID {os.getpid()}] 模塊正在加載...")
global_var = f"進程 {os.getpid()} 的全局變量"
class Demo:
class_var = "類屬性"
def worker(self):
print(f"[PID {os.getpid()}] Worker 執行")
if __name__ == '__main__':
from multiprocessing import Process
print(f"[主進程 {os.getpid()}] 啓動中")
obj = Demo()
p = Process(target=obj.worker)
p.start()
p.join()
輸出:
[PID 12345] 模塊正在加載... # 主進程
[主進程 12345] 啓動中
[PID 12346] 模塊正在加載... # 子進程重新加載!
[PID 12346] Worker 執行
二、數據傳遞的兩種方式
2.1 方式一:通過類定義加載(不序列化)
以下內容通過重新加載模塊獲得,不進行序列化:
| 元素 | 示例 | 子進程獲取方式 |
|---|---|---|
| import 的包 | import requests |
重新 import |
| 全局變量 | global_var = "value" |
重新執行模塊代碼創建 |
| 模塊級函數 | def func(): pass |
重新加載函數定義 |
| 類定義 | class MyClass: pass |
重新加載類定義 |
| 類屬性 | class_var = "value" |
從類定義獲取 |
| 類方法/靜態方法 | @classmethod / @staticmethod |
從類定義獲取 |
| 實例方法 | def method(self): pass |
從類定義獲取 |
特點:
- ✅ 使用原始定義的值
- ✅ 在新的內存地址
- ✅ 不受主進程運行時修改影響
2.2 方式二:通過 pickle 序列化傳遞
只有實例屬性通過 pickle 序列化傳遞:
class Spider:
def __init__(self):
# 這些會被序列化
self.url = "https://example.com"
self.headers = {"User-Agent": "..."}
self.data_list = []
序列化時機:在 p.start() 時,而不是 Process() 創建時!
三、完整的子進程啓動流程
3.1 時間線詳解
from multiprocessing import Process
class MyClass:
class_var = "原始值"
def __init__(self):
self.instance_var = "原始值"
def worker(self):
pass
if __name__ == '__main__':
obj = MyClass()
obj.instance_var = "修改後"
# 階段1: 創建進程對象
p = Process(target=obj.worker) # 只保存引用,不序列化
MyClass.class_var = "主進程修改" # 修改類屬性
# 階段2: 啓動子進程 ← 關鍵時刻!
p.start()
階段1:Process(target=obj.worker)
- 僅保存對
obj.worker的引用 - 不進行任何序列化操作
階段2:p.start() - 主進程
# 主進程執行:
serialized_data = pickle.dumps(obj) # 序列化實例對象
# 包含: self.instance_var = "修改後"
階段2:p.start() - 子進程
# ① 重新執行模塊
import requests
import pymongo
# ② 執行模塊級別代碼
global_var = "value" # 重新創建
mongo_client = MongoClient() # 重新創建
# ③ 重新加載類定義
class MyClass:
class_var = "原始值" # 原始定義!
def worker(self):
pass
# ④ 反序列化實例對象
obj = pickle.loads(serialized_data)
# obj.instance_var = "修改後" # 來自主進程
# ⑤ 執行 target 方法
obj.worker()
3.2 流程圖
主進程 子進程
│ │
├─ 實例化對象 │
│ obj = MyClass() │
│ │
├─ 修改實例屬性 │
│ obj.attr = "new" │
│ │
├─ Process(target=obj.method) │
│ (僅保存引用) │
│ │
├─ p.start() ──────────────────>├─ ① 重新執行腳本
│ (pickle序列化) ├─ ② 重新import包
│ ├─ ③ 重新創建全局變量
│ ├─ ④ 重新加載類定義
│ ├─ ⑤ 反序列化實例對象
│ └─ ⑥ 執行target方法
四、對比實驗:類屬性 vs 實例屬性
4.1 實驗代碼
import os
from multiprocessing import Process
class Demo:
class_var = "類屬性原始值"
def __init__(self):
self.instance_var = "實例屬性原始值"
def worker(self):
print(f"[PID {os.getpid()}]")
print(f" 類屬性: {Demo.class_var}")
print(f" 實例屬性: {self.instance_var}")
if __name__ == '__main__':
# 主進程修改
Demo.class_var = "主進程修改類屬性"
obj = Demo()
obj.instance_var = "主進程修改實例屬性"
print(f"[主進程 {os.getpid()}]")
print(f" 類屬性: {Demo.class_var}")
print(f" 實例屬性: {obj.instance_var}")
print("-" * 50)
p = Process(target=obj.worker)
p.start()
p.join()
4.2 輸出結果
[主進程 12345]
類屬性: 主進程修改類屬性
實例屬性: 主進程修改實例屬性
--------------------------------------------------
[PID 12346]
類屬性: 類屬性原始值 ← 恢復成原始定義!
實例屬性: 主進程修改實例屬性 ← 保持主進程的修改!
4.3 原因分析
# 子進程看到的值:
┌─────────────────────────────────────────┐
│ 元素 │ 來源 │ 值 │
├─────────────────────────────────────────┤
│ 類屬性 │ 重新加載類定義 │ 原始值 │
│ 類方法 │ 重新加載類定義 │ 原始定義│
│ 實例方法 │ 重新加載類定義 │ 原始定義│
│ 實例屬性 │ pickle序列化 │ 修改值 │
└─────────────────────────────────────────┘
五、實戰案例:爬蟲類的多進程設計
5.1 典型問題代碼
class Spider:
def __init__(self):
# ❌ 錯誤:數據庫連接無法序列化!
self.mongo_client = pymongo.MongoClient()
self.redis_client = redis.Redis()
# ✅ 正確:簡單數據可以序列化
self.url = "https://example.com"
self.headers = {"User-Agent": "..."}
問題:
- 數據庫連接、文件句柄等資源無法被 pickle 序列化
- 即使能序列化,連接在子進程中也是無效的
5.2 正確的設計模式
方案一:模塊級別創建連接
import pymongo
import redis
# 模塊級別 - 每個進程獨立創建
mongo_client = pymongo.MongoClient()
db = mongo_client['database']['collection']
redis_client = redis.Redis()
class Spider:
def __init__(self):
# 可序列化的配置
self.url = "https://example.com"
self.headers = {"User-Agent": "..."}
self.queue = JoinableQueue()
def worker(self):
# 使用模塊級別的連接
data = self.fetch_data()
db.insert_one(data)
redis_client.sadd('keys', data['id'])
方案二:懶加載模式
class Spider:
_db = None
_redis = None
@property
def db(self):
if self._db is None:
# 每個進程首次使用時創建
self._db = pymongo.MongoClient()['db']['collection']
return self._db
def worker(self):
self.db.insert_one({"data": "value"})
5.3 完整示例
import requests
from multiprocessing import Process, JoinableQueue
import pymongo
# 模塊級別資源
mongo_client = pymongo.MongoClient()
db = mongo_client['spider']['data']
class Spider:
def __init__(self):
# 可序列化的配置和隊列
self.url = 'https://api.example.com'
self.headers = {'User-Agent': 'Mozilla/5.0'}
self.task_queue = JoinableQueue()
self.result_queue = JoinableQueue()
def producer(self):
"""生產者任務"""
for page in range(1, 10):
params = {'page': page}
self.task_queue.put(params)
def worker(self):
"""消費者任務"""
while True:
params = self.task_queue.get()
response = requests.get(
self.url,
headers=self.headers, # 實例屬性可用
params=params
).json()
self.result_queue.put(response)
self.task_queue.task_done()
def saver(self):
"""保存任務"""
while True:
data = self.result_queue.get()
db.insert_one(data) # 使用模塊級連接
self.result_queue.task_done()
def run(self):
# 先生產任務
self.producer()
# 啓動多進程
processes = []
# 3個worker進程
for _ in range(3):
p = Process(target=self.worker)
p.daemon = True
processes.append(p)
p.start()
# 1個saver進程
p = Process(target=self.saver)
p.daemon = True
processes.append(p)
p.start()
# 等待任務完成
self.task_queue.join()
self.result_queue.join()
if __name__ == '__main__':
spider = Spider()
spider.run()
六、可序列化性檢查
6.1 可以序列化的類型
# ✅ 基本類型
self.number = 42
self.text = "hello"
self.flag = True
# ✅ 容器類型
self.list_data = [1, 2, 3]
self.dict_data = {'key': 'value'}
self.tuple_data = (1, 2, 3)
self.set_data = {1, 2, 3}
# ✅ multiprocessing 的隊列
self.queue = JoinableQueue()
self.queue = Queue()
# ✅ 自定義類實例(如果屬性都可序列化)
self.config = Config(url="...", timeout=30)
6.2 不能序列化的類型
# ❌ 文件操作
self.file = open('data.txt', 'w')
# ❌ 線程相關(非multiprocessing的)
self.lock = threading.Lock()
self.event = threading.Event()
# ❌ 網絡連接
self.socket = socket.socket()
# ❌ 數據庫連接
self.mongo = pymongo.MongoClient()
self.redis = redis.Redis()
self.mysql = pymysql.connect(...)
# ❌ lambda 函數
self.func = lambda x: x + 1
# ❌ 局部函數
def inner():
pass
self.func = inner
6.3 測試序列化
import pickle
class TestClass:
def __init__(self):
self.data = "test"
self.file = open('test.txt', 'w') # 不可序列化
# 測試
try:
obj = TestClass()
pickle.dumps(obj)
print("✅ 可以序列化")
except Exception as e:
print(f"❌ 序列化失敗: {e}")
# TypeError: cannot pickle '_io.TextIOWrapper' object
七、常見陷阱與解決方案
7.1 陷阱1:忘記 if __name__ == '__main__':
from multiprocessing import Process
def worker():
print("Worker")
# ❌ 錯誤:沒有保護
p = Process(target=worker)
p.start()
後果:在 Windows 上會無限遞歸創建進程!
原因:
- 主進程執行到
Process(),啓動子進程 - 子進程重新執行腳本
- 子進程也執行到
Process(),又啓動子子進程 - 無限循環...
正確做法:
if __name__ == '__main__':
p = Process(target=worker)
p.start()
7.2 陷阱2:在實例屬性中存儲不可序列化對象
class Spider:
def __init__(self):
# ❌ 這會導致序列化失敗
self.db = pymongo.MongoClient()
def worker(self):
self.db.insert_one({})
# 啓動時報錯: TypeError: cannot pickle ...
解決方案:
# 方案1: 模塊級別
db = pymongo.MongoClient()['database']
class Spider:
def worker(self):
db.insert_one({})
# 方案2: 懶加載
class Spider:
def __init__(self):
self._db = None
@property
def db(self):
if self._db is None:
self._db = pymongo.MongoClient()['database']
return self._db
7.3 陷阱3:期望共享類屬性
class Counter:
count = 0 # 類屬性
def __init__(self):
self.local_count = 0 # 實例屬性
def increment(self):
Counter.count += 1
self.local_count += 1
print(f"類屬性: {Counter.count}, 實例屬性: {self.local_count}")
if __name__ == '__main__':
from multiprocessing import Process
obj = Counter()
processes = [Process(target=obj.increment) for _ in range(5)]
for p in processes:
p.start()
for p in processes:
p.join()
# 期望: count = 5
# 實際: 每個進程都顯示 count = 1 (各自獨立的類定義)
解決方案:使用共享內存
from multiprocessing import Process, Value
class Counter:
def __init__(self):
self.count = Value('i', 0) # 共享整數
def increment(self):
with self.count.get_lock():
self.count.value += 1
7.4 陷阱4:修改全局變量
result_list = []
def worker(data):
result_list.append(data) # 只修改了子進程的副本!
if __name__ == '__main__':
processes = [Process(target=worker, args=(i,)) for i in range(5)]
for p in processes:
p.start()
for p in processes:
p.join()
print(result_list) # 輸出: [] (主進程的列表沒有變化)
解決方案:使用 Manager
from multiprocessing import Process, Manager
def worker(data, shared_list):
shared_list.append(data)
if __name__ == '__main__':
manager = Manager()
result_list = manager.list()
processes = [Process(target=worker, args=(i, result_list)) for i in range(5)]
for p in processes:
p.start()
for p in processes:
p.join()
print(list(result_list)) # 輸出: [0, 1, 2, 3, 4]
八、最佳實踐總結
8.1 設計原則
# ✅ 好的設計
class Worker:
def __init__(self):
# 配置數據(可序列化)
self.config = {...}
self.url = "..."
# 進程間通信(可序列化)
self.queue = JoinableQueue()
def run(self):
# 資源在使用時創建
db = pymongo.MongoClient()
# 工作邏輯
db.close()
# ❌ 不好的設計
class Worker:
def __init__(self):
# 不可序列化的資源
self.db = pymongo.MongoClient()
self.file = open('data.txt')
8.2 檢查清單
創建多進程類時,檢查:
- [ ] 所有實例屬性都是可序列化的
- [ ] 數據庫/網絡/文件資源放在模塊級別或懶加載
- [ ] 使用
if __name__ == '__main__':保護啓動代碼 - [ ] 理解類屬性在子進程中會重置為原始值
- [ ] 需要共享狀態時使用 Queue、Pipe、Manager 等
8.3 調試技巧
import os
class Debug:
def __init__(self):
print(f"[PID {os.getpid()}] __init__ 被調用")
self.data = "test"
def worker(self):
print(f"[PID {os.getpid()}] worker 執行")
print(f"[PID {os.getpid()}] self對象地址: {id(self)}")
print(f"[PID {os.getpid()}] self.data: {self.data}")
if __name__ == '__main__':
from multiprocessing import Process
print(f"[主進程 {os.getpid()}] 開始")
obj = Debug()
print(f"[主進程] obj地址: {id(obj)}")
p = Process(target=obj.worker)
p.start()
p.join()
九、總結
9.1 核心要點
- Windows 使用 spawn 方式:子進程 = 重新運行腳本
-
兩種數據獲取方式:
- 通過重新加載模塊:類定義、全局變量、模塊函數
- 通過 pickle 序列化:實例屬性
- 序列化時機:
p.start()時,而非Process()時 - 不可序列化資源:放在模塊級別或懶加載
9.2 記憶口訣
子進程啓動像重生,
重新加載整個程;
類屬性、方法和全局,
都從定義重新生;
唯有實例屬性特殊,
pickle序列化傳送;
資源連接不能傳,
模塊級別或懶生成。
9.3 快速參考表
| 數據類型 | 子進程獲取方式 | 內存地址 | 值來源 |
|---|---|---|---|
| import包 | 重新import | 新地址 | 重新導入 |
| 全局變量 | 重新創建 | 新地址 | 原始定義 |
| 模塊函數 | 重新加載 | 新地址 | 原始定義 |
| 類屬性 | 重新加載 | 新地址 | 原始定義 |
| 類方法 | 重新加載 | 新地址 | 原始定義 |
| 實例方法 | 重新加載 | 新地址 | 原始定義 |
| 實例屬性 | pickle序列化 | 新地址 | 主進程修改值 |
參考資料
- Python官方文檔 - multiprocessing
- pickle模塊文檔
- multiprocessing編程指南
本文原創,歡迎轉載,轉載請註明出處。
如有疑問或發現錯誤,歡迎評論指出!