动态

详情 返回 返回

# Python多進程深度解析:Windows下的進程創建與數據傳遞機制

前言

在使用 Python 的 multiprocessing 模塊時,你是否遇到過這些困惑:

  • 為什麼子進程能訪問 self.urlself.headers 這些實例屬性?
  • 子進程是如何獲取類方法、類屬性和模塊函數的?
  • 為什麼數據庫連接不能放在 __init__ 中?
  • if __name__ == '__main__': 到底保護了什麼?

本文將深入剖析 Windows 平台下 Python 多進程的底層機制,徹底解答這些疑問。


一、核心機制:spawn 方式創建進程

1.1 Windows 的特殊性

Windows 不支持 fork(),因此使用 spawn 方式創建子進程:

# 子進程啓動 = 重新運行一遍 Python 腳本
# 本質上執行: python your_script.py

關鍵特點

  • 子進程會重新導入所有模塊
  • 重新執行模塊級別的代碼
  • 跳過 if __name__ == '__main__': 保護的代碼塊

1.2 驗證實驗

import os

# 模塊級別代碼 - 每個進程都會執行
print(f"[PID {os.getpid()}] 模塊正在加載...")

global_var = f"進程 {os.getpid()} 的全局變量"

class Demo:
    class_var = "類屬性"
    
    def worker(self):
        print(f"[PID {os.getpid()}] Worker 執行")

if __name__ == '__main__':
    from multiprocessing import Process
    
    print(f"[主進程 {os.getpid()}] 啓動中")
    
    obj = Demo()
    p = Process(target=obj.worker)
    p.start()
    p.join()

輸出

[PID 12345] 模塊正在加載...        # 主進程
[主進程 12345] 啓動中

[PID 12346] 模塊正在加載...        # 子進程重新加載!
[PID 12346] Worker 執行

二、數據傳遞的兩種方式

2.1 方式一:通過類定義加載(不序列化)

以下內容通過重新加載模塊獲得,不進行序列化:

元素 示例 子進程獲取方式
import 的包 import requests 重新 import
全局變量 global_var = "value" 重新執行模塊代碼創建
模塊級函數 def func(): pass 重新加載函數定義
類定義 class MyClass: pass 重新加載類定義
類屬性 class_var = "value" 從類定義獲取
類方法/靜態方法 @classmethod / @staticmethod 從類定義獲取
實例方法 def method(self): pass 從類定義獲取

特點

  • ✅ 使用原始定義的值
  • ✅ 在新的內存地址
  • ✅ 不受主進程運行時修改影響

2.2 方式二:通過 pickle 序列化傳遞

只有實例屬性通過 pickle 序列化傳遞:

class Spider:
    def __init__(self):
        # 這些會被序列化
        self.url = "https://example.com"
        self.headers = {"User-Agent": "..."}
        self.data_list = []

序列化時機:在 p.start() 時,而不是 Process() 創建時!


三、完整的子進程啓動流程

3.1 時間線詳解

from multiprocessing import Process

class MyClass:
    class_var = "原始值"
    
    def __init__(self):
        self.instance_var = "原始值"
    
    def worker(self):
        pass

if __name__ == '__main__':
    obj = MyClass()
    obj.instance_var = "修改後"
    
    # 階段1: 創建進程對象
    p = Process(target=obj.worker)  # 只保存引用,不序列化
    
    MyClass.class_var = "主進程修改"  # 修改類屬性
    
    # 階段2: 啓動子進程 ← 關鍵時刻!
    p.start()

階段1:Process(target=obj.worker)

  • 僅保存對 obj.worker 的引用
  • 不進行任何序列化操作

階段2:p.start() - 主進程

# 主進程執行:
serialized_data = pickle.dumps(obj)  # 序列化實例對象
# 包含: self.instance_var = "修改後"

階段2:p.start() - 子進程

# ① 重新執行模塊
import requests
import pymongo

# ② 執行模塊級別代碼
global_var = "value"           # 重新創建
mongo_client = MongoClient()   # 重新創建

# ③ 重新加載類定義
class MyClass:
    class_var = "原始值"       # 原始定義!
    
    def worker(self):
        pass

# ④ 反序列化實例對象
obj = pickle.loads(serialized_data)
# obj.instance_var = "修改後"  # 來自主進程

# ⑤ 執行 target 方法
obj.worker()

3.2 流程圖

主進程                           子進程
  │                               │
  ├─ 實例化對象                    │
  │  obj = MyClass()              │
  │                               │
  ├─ 修改實例屬性                  │
  │  obj.attr = "new"             │
  │                               │
  ├─ Process(target=obj.method)   │
  │  (僅保存引用)                  │
  │                               │
  ├─ p.start() ──────────────────>├─ ① 重新執行腳本
  │  (pickle序列化)                ├─ ② 重新import包
  │                               ├─ ③ 重新創建全局變量
  │                               ├─ ④ 重新加載類定義
  │                               ├─ ⑤ 反序列化實例對象
  │                               └─ ⑥ 執行target方法

四、對比實驗:類屬性 vs 實例屬性

4.1 實驗代碼

import os
from multiprocessing import Process

class Demo:
    class_var = "類屬性原始值"
    
    def __init__(self):
        self.instance_var = "實例屬性原始值"
    
    def worker(self):
        print(f"[PID {os.getpid()}]")
        print(f"  類屬性: {Demo.class_var}")
        print(f"  實例屬性: {self.instance_var}")

if __name__ == '__main__':
    # 主進程修改
    Demo.class_var = "主進程修改類屬性"
    
    obj = Demo()
    obj.instance_var = "主進程修改實例屬性"
    
    print(f"[主進程 {os.getpid()}]")
    print(f"  類屬性: {Demo.class_var}")
    print(f"  實例屬性: {obj.instance_var}")
    print("-" * 50)
    
    p = Process(target=obj.worker)
    p.start()
    p.join()

4.2 輸出結果

[主進程 12345]
  類屬性: 主進程修改類屬性
  實例屬性: 主進程修改實例屬性
--------------------------------------------------
[PID 12346]
  類屬性: 類屬性原始值          ← 恢復成原始定義!
  實例屬性: 主進程修改實例屬性  ← 保持主進程的修改!

4.3 原因分析

# 子進程看到的值:
┌─────────────────────────────────────────┐
│ 元素          │ 來源           │ 值      │
├─────────────────────────────────────────┤
│ 類屬性        │ 重新加載類定義  │ 原始值  │
│ 類方法        │ 重新加載類定義  │ 原始定義│
│ 實例方法      │ 重新加載類定義  │ 原始定義│
│ 實例屬性      │ pickle序列化   │ 修改值  │
└─────────────────────────────────────────┘

五、實戰案例:爬蟲類的多進程設計

5.1 典型問題代碼

class Spider:
    def __init__(self):
        # ❌ 錯誤:數據庫連接無法序列化!
        self.mongo_client = pymongo.MongoClient()
        self.redis_client = redis.Redis()
        
        # ✅ 正確:簡單數據可以序列化
        self.url = "https://example.com"
        self.headers = {"User-Agent": "..."}

問題

  • 數據庫連接、文件句柄等資源無法被 pickle 序列化
  • 即使能序列化,連接在子進程中也是無效的

5.2 正確的設計模式

方案一:模塊級別創建連接

import pymongo
import redis

# 模塊級別 - 每個進程獨立創建
mongo_client = pymongo.MongoClient()
db = mongo_client['database']['collection']
redis_client = redis.Redis()

class Spider:
    def __init__(self):
        # 可序列化的配置
        self.url = "https://example.com"
        self.headers = {"User-Agent": "..."}
        self.queue = JoinableQueue()
    
    def worker(self):
        # 使用模塊級別的連接
        data = self.fetch_data()
        db.insert_one(data)
        redis_client.sadd('keys', data['id'])

方案二:懶加載模式

class Spider:
    _db = None
    _redis = None
    
    @property
    def db(self):
        if self._db is None:
            # 每個進程首次使用時創建
            self._db = pymongo.MongoClient()['db']['collection']
        return self._db
    
    def worker(self):
        self.db.insert_one({"data": "value"})

5.3 完整示例

import requests
from multiprocessing import Process, JoinableQueue
import pymongo

# 模塊級別資源
mongo_client = pymongo.MongoClient()
db = mongo_client['spider']['data']

class Spider:
    def __init__(self):
        # 可序列化的配置和隊列
        self.url = 'https://api.example.com'
        self.headers = {'User-Agent': 'Mozilla/5.0'}
        self.task_queue = JoinableQueue()
        self.result_queue = JoinableQueue()
    
    def producer(self):
        """生產者任務"""
        for page in range(1, 10):
            params = {'page': page}
            self.task_queue.put(params)
    
    def worker(self):
        """消費者任務"""
        while True:
            params = self.task_queue.get()
            response = requests.get(
                self.url, 
                headers=self.headers,  # 實例屬性可用
                params=params
            ).json()
            self.result_queue.put(response)
            self.task_queue.task_done()
    
    def saver(self):
        """保存任務"""
        while True:
            data = self.result_queue.get()
            db.insert_one(data)  # 使用模塊級連接
            self.result_queue.task_done()
    
    def run(self):
        # 先生產任務
        self.producer()
        
        # 啓動多進程
        processes = []
        
        # 3個worker進程
        for _ in range(3):
            p = Process(target=self.worker)
            p.daemon = True
            processes.append(p)
            p.start()
        
        # 1個saver進程
        p = Process(target=self.saver)
        p.daemon = True
        processes.append(p)
        p.start()
        
        # 等待任務完成
        self.task_queue.join()
        self.result_queue.join()

if __name__ == '__main__':
    spider = Spider()
    spider.run()

六、可序列化性檢查

6.1 可以序列化的類型

# ✅ 基本類型
self.number = 42
self.text = "hello"
self.flag = True

# ✅ 容器類型
self.list_data = [1, 2, 3]
self.dict_data = {'key': 'value'}
self.tuple_data = (1, 2, 3)
self.set_data = {1, 2, 3}

# ✅ multiprocessing 的隊列
self.queue = JoinableQueue()
self.queue = Queue()

# ✅ 自定義類實例(如果屬性都可序列化)
self.config = Config(url="...", timeout=30)

6.2 不能序列化的類型

# ❌ 文件操作
self.file = open('data.txt', 'w')

# ❌ 線程相關(非multiprocessing的)
self.lock = threading.Lock()
self.event = threading.Event()

# ❌ 網絡連接
self.socket = socket.socket()

# ❌ 數據庫連接
self.mongo = pymongo.MongoClient()
self.redis = redis.Redis()
self.mysql = pymysql.connect(...)

# ❌ lambda 函數
self.func = lambda x: x + 1

# ❌ 局部函數
def inner():
    pass
self.func = inner

6.3 測試序列化

import pickle

class TestClass:
    def __init__(self):
        self.data = "test"
        self.file = open('test.txt', 'w')  # 不可序列化

# 測試
try:
    obj = TestClass()
    pickle.dumps(obj)
    print("✅ 可以序列化")
except Exception as e:
    print(f"❌ 序列化失敗: {e}")
    # TypeError: cannot pickle '_io.TextIOWrapper' object

七、常見陷阱與解決方案

7.1 陷阱1:忘記 if __name__ == '__main__':

from multiprocessing import Process

def worker():
    print("Worker")

# ❌ 錯誤:沒有保護
p = Process(target=worker)
p.start()

後果:在 Windows 上會無限遞歸創建進程!

原因

  1. 主進程執行到 Process(),啓動子進程
  2. 子進程重新執行腳本
  3. 子進程也執行到 Process(),又啓動子子進程
  4. 無限循環...

正確做法

if __name__ == '__main__':
    p = Process(target=worker)
    p.start()

7.2 陷阱2:在實例屬性中存儲不可序列化對象

class Spider:
    def __init__(self):
        # ❌ 這會導致序列化失敗
        self.db = pymongo.MongoClient()
    
    def worker(self):
        self.db.insert_one({})

# 啓動時報錯: TypeError: cannot pickle ...

解決方案

# 方案1: 模塊級別
db = pymongo.MongoClient()['database']

class Spider:
    def worker(self):
        db.insert_one({})

# 方案2: 懶加載
class Spider:
    def __init__(self):
        self._db = None
    
    @property
    def db(self):
        if self._db is None:
            self._db = pymongo.MongoClient()['database']
        return self._db

7.3 陷阱3:期望共享類屬性

class Counter:
    count = 0  # 類屬性
    
    def __init__(self):
        self.local_count = 0  # 實例屬性
    
    def increment(self):
        Counter.count += 1
        self.local_count += 1
        print(f"類屬性: {Counter.count}, 實例屬性: {self.local_count}")

if __name__ == '__main__':
    from multiprocessing import Process
    
    obj = Counter()
    processes = [Process(target=obj.increment) for _ in range(5)]
    for p in processes:
        p.start()
    for p in processes:
        p.join()
    
    # 期望: count = 5
    # 實際: 每個進程都顯示 count = 1 (各自獨立的類定義)

解決方案:使用共享內存

from multiprocessing import Process, Value

class Counter:
    def __init__(self):
        self.count = Value('i', 0)  # 共享整數
    
    def increment(self):
        with self.count.get_lock():
            self.count.value += 1

7.4 陷阱4:修改全局變量

result_list = []

def worker(data):
    result_list.append(data)  # 只修改了子進程的副本!

if __name__ == '__main__':
    processes = [Process(target=worker, args=(i,)) for i in range(5)]
    for p in processes:
        p.start()
    for p in processes:
        p.join()
    
    print(result_list)  # 輸出: [] (主進程的列表沒有變化)

解決方案:使用 Manager

from multiprocessing import Process, Manager

def worker(data, shared_list):
    shared_list.append(data)

if __name__ == '__main__':
    manager = Manager()
    result_list = manager.list()
    
    processes = [Process(target=worker, args=(i, result_list)) for i in range(5)]
    for p in processes:
        p.start()
    for p in processes:
        p.join()
    
    print(list(result_list))  # 輸出: [0, 1, 2, 3, 4]

八、最佳實踐總結

8.1 設計原則

# ✅ 好的設計
class Worker:
    def __init__(self):
        # 配置數據(可序列化)
        self.config = {...}
        self.url = "..."
        
        # 進程間通信(可序列化)
        self.queue = JoinableQueue()
    
    def run(self):
        # 資源在使用時創建
        db = pymongo.MongoClient()
        # 工作邏輯
        db.close()

# ❌ 不好的設計
class Worker:
    def __init__(self):
        # 不可序列化的資源
        self.db = pymongo.MongoClient()
        self.file = open('data.txt')

8.2 檢查清單

創建多進程類時,檢查:

  • [ ] 所有實例屬性都是可序列化的
  • [ ] 數據庫/網絡/文件資源放在模塊級別或懶加載
  • [ ] 使用 if __name__ == '__main__': 保護啓動代碼
  • [ ] 理解類屬性在子進程中會重置為原始值
  • [ ] 需要共享狀態時使用 Queue、Pipe、Manager 等

8.3 調試技巧

import os

class Debug:
    def __init__(self):
        print(f"[PID {os.getpid()}] __init__ 被調用")
        self.data = "test"
    
    def worker(self):
        print(f"[PID {os.getpid()}] worker 執行")
        print(f"[PID {os.getpid()}] self對象地址: {id(self)}")
        print(f"[PID {os.getpid()}] self.data: {self.data}")

if __name__ == '__main__':
    from multiprocessing import Process
    
    print(f"[主進程 {os.getpid()}] 開始")
    obj = Debug()
    print(f"[主進程] obj地址: {id(obj)}")
    
    p = Process(target=obj.worker)
    p.start()
    p.join()

九、總結

9.1 核心要點

  1. Windows 使用 spawn 方式:子進程 = 重新運行腳本
  2. 兩種數據獲取方式

    • 通過重新加載模塊:類定義、全局變量、模塊函數
    • 通過 pickle 序列化:實例屬性
  3. 序列化時機p.start() 時,而非 Process()
  4. 不可序列化資源:放在模塊級別或懶加載

9.2 記憶口訣

子進程啓動像重生,
重新加載整個程;
類屬性、方法和全局,
都從定義重新生;
唯有實例屬性特殊,
pickle序列化傳送;
資源連接不能傳,
模塊級別或懶生成。

9.3 快速參考表

數據類型 子進程獲取方式 內存地址 值來源
import包 重新import 新地址 重新導入
全局變量 重新創建 新地址 原始定義
模塊函數 重新加載 新地址 原始定義
類屬性 重新加載 新地址 原始定義
類方法 重新加載 新地址 原始定義
實例方法 重新加載 新地址 原始定義
實例屬性 pickle序列化 新地址 主進程修改值

參考資料

  • Python官方文檔 - multiprocessing
  • pickle模塊文檔
  • multiprocessing編程指南

本文原創,歡迎轉載,轉載請註明出處。

如有疑問或發現錯誤,歡迎評論指出!

user avatar
0 用户, 点赞了这篇动态!

发布 评论

Some HTML is okay.