1. 線程角色與全局結構
  • WalSender(主)
    源碼:src/gausskernel/process/replication/walsender.cpp
    主結構:knl_t_walsender_context(單線程上下文)
    全局槽位:WalSndCtlData walsndctl(共享內存,最多 MAX_REPLICATION 個槽位)
    關鍵成員:
    walsnds[i].pid:sender 後端 pid
    sendFile/sendSegNo/sendOff:當前正讀的 WAL 文件/段/偏移
    reply_message:對端備機回寫的 LSN、flush/apply 時間戳
    walSndCaughtUp:bool,標記是否已追平
  • WalReceiver(備)
    源碼:src/gausskernel/process/replication/walreceiver.cpp
    主結構:knl_t_walreceiver_context
    共享結構:WalRcvCtlBlock *walRcvCtlBlock(與 startup 進程交互)
    關鍵成員:
    receivedUpto:已寫入本地 WAL 段的 LSN
    receiveStart:本次連接期望從主庫拿到的起始 LSN
    feedback_message:熱備反饋,用於主庫回收舊 WAL
  • Startup/Recovery(備)
    源碼:src/gausskernel/process/postmaster/startup.cppStartupXLOG()
    負責解析 WAL 記錄並調用對應 redo 函數;openGauss 裏增加 ParallelRedoWorker 支持並行回放。

  1. 建連與握手(源碼級流程)
    ① 備庫啓動 → postmaster fork startupstartup fork walreceiver
    WalReceiverMain() 通過 libpq 連接主庫,發送 IDENTIFY_SYSTEM 命令
    ③ 主庫後端進入 WalSenderMain()WalSndHandshake()
    – 校驗協議版本、系統標識、時間線
    – 協商起始 LSN(receiveStart
    – 若啓用了 SSL、壓縮、quorum,這裏一併協商標誌位
    ④ 握手成功 → 進入 WalSndLoop(),開始實時流式複製

  1. 主庫發送端(XLog → Socket)
    核心函數:XLogSend()(在 walsender.cpp
  • 循環體內
  1. pgstat_report_activity() 更新進程狀態
  2. SendXlog() → 從 sendFile 當前偏移讀取 8 KB 塊
  3. pq_putmessage('d', ...) 封包寫入 TCP 發送緩衝區
  4. 更新 sentPtr(已發送 LSN)
  • sentPtr >= flushPtrWalSndCaughtUp=true 時進入 WalSndWaitForWal(),等待主庫產生新 WAL(epoll 阻塞在 WalSenderMain()select()

  1. 備庫接收端(Socket → XLog)
    核心函數:WalReceiverMain()WalReceiverReceive()
  • pq_getmessage() 收包 → 寫入 walRcvCtlBlock->receiveBuf
  • 緩衝區滿或收到完整記錄 → XLogWalRcvWrite()write() 到本地 ${PGDATA}/pg_xlog/(或 pg_wal
  • 每 4 KB 或超時觸發 XLogWalRcvFlush()fsync()
  • 更新共享內存 receivedUpto,並通過 WalReceiverSendReply()received/flush/apply LSN 回寫給主庫

  1. 一致性協議(Quorum / 同步備)
    主結構:WalSndCtlData 裏的
  • SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]:後端事務等待隊列
  • lsn[NUM_SYNC_REP_WAIT_MODE]:隊列頭線程要等待的 LSN

關鍵代碼:SyncRepWaitForLSN()src/gausskernel/replication/syncrep.cpp

  • 事務提交 → XLogFlush()SyncRepWaitForLSN(XactCommitLSN)
  • 把自身掛到 SyncRepQueue[mode],並睡在 latch 上
  • WalSender 收到備機 ACK 後,SyncRepReleaseWaiters() 喚醒滿足 ackLsn >= commitLsn 的等待者
  • Quorum 邏輯:配置 synchronous_standby_names = 'ANY 1 (dn1_2,dn2_2)' 時,只要 N/2 個備機 ACK 即可放行。

  1. 並行回放(openGauss 擴展)
    新增文件:
  • src/gausskernel/process/replication/parallel_rewind.cpp
  • src/gausskernel/storage/buffer/parallel_redo.cpp

關鍵結構:ParallelRedoCtx(共享內存)

  • redoWorkers[]:回放工作線程池(默認 4 個,可配 recovery_parallelism
  • availableMsg:生產者-消費者環形隊列,槽位大小 = 1 條 WAL 記錄

流程:

  1. Startup 進程解析 WAL → 把可並行記錄(heap、btree、undo 等)塞進 ParallelRedoCtx
  2. 空閒 worker 搶到槽位 → 調用對應 rmgr 的 redo 函數
  3. 不可並行記錄(如 checkpoint、twophase)由 startup 自己串行執行
  4. 所有 worker 完成當前 LSN 後,更新 lastAppliedLSN → 向主庫反饋 apply 進度

  1. 級聯備與壓縮
  • 級聯備:中間備庫同時扮演 “WalSender + WalReceiver” 角色,代碼路徑同上述,僅增加 AmWalReceiverForStandby 標誌。
  • 壓縮:握手階段協商 compression = zlib/lz4XLogSend() 裏先 deflate()pq_putmessage(),節省跨機房帶寬。

  1. 快速定位的斷點清單 | 場景 | 函數文件 | 行區 | |---|---|---| | 主庫事務等待同步 | SyncRepWaitForLSN() | syncrep.cpp | | 主庫發送 WAL | XLogSend() | walsender.cpp | | 備庫寫 WAL | XLogWalRcvWrite() | walreceiver.cpp | | 備庫反饋 ACK | WalReceiverSendReply() | walreceiver.cpp | | 並行回放入口 | ParallelRedoMain() | parallel_redo.cpp | | 級聯備身份切換 | WalReceiverMain() 判斷 AmWalReceiverForStandby | walreceiver.cpp |

把以上關鍵點串起來,就能在源碼層面清晰看到:
“事務提交 → 主庫 flush → Sender 發送 → Receiver 寫盤 → ACK → 喚醒等待隊列 → Startup/Worker 並行回放”
的完整數據鏈。後續二次開發(例如新增壓縮算法、自定義同步級別)只需在對應結構體里加字段、在對應函數里加分支即可。