Spark Cluster模式下DDP網絡配置解析

問題的核心

在Spark cluster模式下,executor是動態分配的,這引發了一個問題:

  • DDP需要master_addr和master_port
  • 但我們怎麼知道executor的IP?
  • 端口會不會衝突?

關鍵理解:DDP進程都在同一個Executor上

Spark Executor架構

Spark Cluster
├── Executor 1 (隨機分配,IP未知)
│   ├── Spark Task 1 → 運行spark_train_ddp_wrapper.py
│   │   ├── Process 0 (DDP rank 0)
│   │   ├── Process 1 (DDP rank 1)
│   │   ├── Process 2 (DDP rank 2)
│   │   └── Process 3 (DDP rank 3)
│   └── 所有進程都在同一executor上
│
├── Executor 2 (隨機分配,IP未知)
│   └── Spark Task 2 → 運行spark_train_ddp_wrapper.py
│       ├── Process 0 (DDP rank 0)
│       ├── Process 1 (DDP rank 1)
│       ├── Process 2 (DDP rank 2)
│       └── Process 3 (DDP rank 3)
│
└── Executor 3 ...

關鍵點:每個executor上的DDP進程都是獨立的訓練實例,它們不需要相互通信。

為什麼可以使用localhost?

單Executor內的DDP通信

單個executor內部,所有DDP進程:

  1. 運行在同一台機器上(同一個executor)
  2. 通過本地迴環接口(127.0.0.1 / localhost)通信
  3. 不需要知道executor的外部IP
Executor內部(IP=10.0.0.5,但我們不需要知道)
├── Process 0 → 連接 localhost:23456
├── Process 1 → 連接 localhost:23456
├── Process 2 → 連接 localhost:23456
└── Process 3 → 連接 localhost:23456
            ↑
        通過本地迴環接口通信
        (127.0.0.1)

端口選擇策略

雖然executor是動態分配的,但:

  1. 端口範圍衝突概率低
  • 選擇非常用端口(23456)
  • executor在隔離環境運行
  1. 每個executor獨立訓練
  • Executor 1運行訓練A(端口23456)
  • Executor 2運行訓練B(端口23456)
  • 它們互不干擾(不同容器)
  1. 隔離性保證
  • 每個executor有獨立網絡命名空間
  • localhost:23456只在executor內部有效
  • 不會衝突

工作原理詳解

啓動流程

# spark_train_ddp_wrapper.py 在executor上運行
def main():
    # 1. Spark將這個腳本提交到某個executor
    # 2. Executor的IP是什麼?我們不知道,也不需要知道
    
    torchrun_cmd = [
        sys.executable,
        '-m', 'torch.distributed.run',
        '--nproc_per_node', '4',        # 在同一executor上啓動4個進程
        '--nnodes', '1',                 # 只有1個節點(這個executor)
        '--node_rank', '0',              # 節點rank=0
        '--master_addr', 'localhost',    # 本地迴環接口
        '--master_port', '23456',       # 固定端口
        'spark_train.py'                 # 實際的訓練腳本
    ]
    
    # 3. torchrun在executor上執行
    subprocess.run(torchrun_cmd)

torchrun的工作機制

當torchrun啓動時:

# torchrun在executor內部執行
# Executor IP = 10.0.0.5 (假設,但我們不需要知道)

# 第1步:torchrun啓動master進程
# Process 0 (rank 0) 啓動,監聽 localhost:23456

# 第2步:torchrun啓動其他進程
# Process 1 (rank 1) 連接 localhost:23456
# Process 2 (rank 2) 連接 localhost:23456
# Process 3 (rank 3) 連接 localhost:23456

# 所有進程通過localhost通信
# ✅ 不需要知道executor的外部IP
# ✅ 端口只在executor內部使用

實際網絡拓撲

┌─────────────────────────────────────────┐
│ Executor Container (動態分配)            │
│ IP: 10.0.0.5 (我們不需要知道)           │
│                                         │
│  ┌───────────────────────────────────┐ │
│  │ localhost:23456                   │ │
│  │                                   │ │
│  │  Process 0 (rank 0) ←─┐         │ │
│  │  Process 1 (rank 1) ←─┼──→ 通信  │ │
│  │  Process 2 (rank 2) ←─┤         │ │
│  │  Process 3 (rank 3) ←─┘         │ │
│  └───────────────────────────────────┘ │
│                                         │
│  所有通信都在容器內部進行               │
│  不涉及外部網絡                         │
└─────────────────────────────────────────┘

多個Executor的隔離性

場景:有3個Executor同時運行訓練

Spark Cluster
│
├─ Executor 1 (隨機IP,如10.0.0.5)
│  └─ Training A
│     ├─ Process 0 連接 localhost:23456
│     ├─ Process 1 連接 localhost:23456
│     ├─ Process 2 連接 localhost:23456
│     └─ Process 3 連接 localhost:23456
│     ✅ 端口23456只在Executor 1內部使用
│
├─ Executor 2 (隨機IP,如10.0.0.6)
│  └─ Training B
│     ├─ Process 0 連接 localhost:23456
│     ├─ Process 1 連接 localhost:23456
│     ├─ Process 2 連接 localhost:23456
│     └─ Process 3 連接 localhost:23456
│     ✅ 端口23456只在Executor 2內部使用
│
└─ Executor 3 (隨機IP,如10.0.0.7)
   └─ Training C
      ├─ Process 0 連接 localhost:23456
      ├─ Process 1 連接 localhost:23456
      ├─ Process 2 連接 localhost:23456
      └─ Process 3 連接 localhost:23456
      ✅ 端口23456只在Executor 3內部使用

為什麼不會衝突?

  1. 網絡隔離:每個executor有獨立的網絡命名空間
  2. localhost的作用域:localhost只在executor內部有效
  3. 端口獨立性:不同executor的23456端口互不干擾

與多節點訓練的區別

多節點訓練(需要知道Master IP)

# Node 0: Master節點
torchrun_cmd = [
    '--master_addr', '10.0.0.100',  # Master節點的實際IP
    '--master_port', '23456',
]

# Node 1: Worker節點
torchrun_cmd = [
    '--master_addr', '10.0.0.100',  # 連接到Master節點
    '--master_port', '23456',
]

為什麼需要知道IP?

  • 節點在不同的機器上
  • 需要通過網絡連接
  • 必須知道Master的IP地址

單節點(我們的場景)

torchrun_cmd = [
    '--master_addr', 'localhost',  # 本地迴環
    '--master_port', '23456',
]

為什麼不需要知道IP?

  • 所有進程在同一台機器(executor)上
  • 通過本地迴環接口通信
  • 不需要外部IP地址

端口衝突的實際情況

可能發生的情況

雖然理論上有衝突風險,但實際:

情況1:同一Executor內
# 不會衝突:同一個進程中
python train.py  # 用端口23456
情況2:不同Executor
# 不會衝突:不同的容器
Executor A: 端口23456  # 在容器A內部
Executor B: 端口23456  # 在容器B內部,互不干擾
情況3:同一機器上的不同進程
# 可能衝突:在同一台機器的不同進程中
Process A: 使用端口23456
Process B: 使用端口23456  # ❌ 衝突

解決方案:讓torchrun自動分配端口

# 不指定固定端口,讓torchrun自動選擇
torchrun_cmd = [
    sys.executable,
    '-m', 'torch.distributed.run',
    '--nproc_per_node', str(num_processes),
    '--nnodes', '1',
    '--node_rank', '0',
    # 不指定master_port,讓torchrun自動分配
    spark_train_script
]

最佳實踐

方案1:固定端口(當前實現)

'--master_addr', 'localhost',
'--master_port', '23456',

優點

  • 簡單明瞭
  • 容易調試
  • 日誌清晰

缺點

  • 理論上可能端口衝突
  • 需要確保端口可用

方案2:自動端口(推薦)

import socket

def find_available_port(start=23456):
    """自動查找可用端口"""
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.settimeout(1)
    for port in range(start, start + 100):
        try:
            result = sock.bind(('', port))
            sock.close()
            return port
        except:
            continue
    return None

# 使用
port = find_available_port(23456)
torchrun_cmd = [
    '--master_port', str(port) if port else '23456',
]

方案3:讓torchrun處理(最簡單)

# 不指定master_port,讓torchrun自動選擇
torchrun_cmd = [
    '--master_addr', 'localhost',
    # 不指定master_port
    spark_train_script
]

總結

你不需要知道Executor的IP!

原因

  1. ✅ 所有DDP進程在同一executor上運行
  2. ✅ 使用localhost通信(本地迴環)
  3. ✅ executor的IP無關緊要
  4. ✅ 每個executor的localhost是獨立的

端口選擇

當前配置

'--master_addr', 'localhost',  # ✅ 正確
'--master_port', '23456',      # ✅ 通常可用

為什麼工作

  • localhost在executor內部
  • 23456端口在executor內部使用
  • 不同executor之間互不干擾

如果端口衝突

處理方式:

# 改端口
'--master_port', '23457'

# 或讓torchrun自動分配
# 不指定--master_port參數

實際操作

當前代碼(spark_train_ddp_wrapper.py)

torchrun_cmd = [
    sys.executable,
    '-m', 'torch.distributed.run',
    '--nproc_per_node', str(num_processes),
    '--nnodes', '1',
    '--node_rank', '0',
    '--master_addr', 'localhost',    # ✅ 保持這個
    '--master_port', '23456',       # ✅ 保持這個(通常可用)
    spark_train_script
]

這是正確的配置,因為:

  • ✅ 所有進程在同一executor上
  • ✅ 通過localhost通信
  • ✅ 不需要知道executor的IP
  • ✅ 端口在executor內部使用,不會衝突

如果確實遇到端口衝突

修改為:

'--master_port', '23457',  # 或其他端口

或讓系統自動分配:

# 移除--master_port參數
torchrun_cmd = [
    sys.executable,
    '-m', 'torch.distributed.run',
    '--nproc_per_node', str(num_processes),
    '--nnodes', '1',
    '--node_rank', '0',
    '--master_addr', 'localhost',
    # 不指定master_port
    spark_train_script
]

關鍵要點

  1. 不需要知道executor IP:使用localhost即可
  2. 端口獨立性:不同executor的端口互不干擾
  3. 本地通信:所有DDP通信在executor內部進行
  4. 配置簡單:localhost + 固定端口即可工作