- 線程角色與全局結構
- WalSender(主)
源碼:src/gausskernel/process/replication/walsender.cpp
主結構:knl_t_walsender_context(單線程上下文)
全局槽位:WalSndCtlData walsndctl(共享內存,最多MAX_REPLICATION個槽位)
關鍵成員:
–walsnds[i].pid:sender 後端 pid
–sendFile/sendSegNo/sendOff:當前正讀的 WAL 文件/段/偏移
–reply_message:對端備機回寫的 LSN、flush/apply 時間戳
–walSndCaughtUp:bool,標記是否已追平 - WalReceiver(備)
源碼:src/gausskernel/process/replication/walreceiver.cpp
主結構:knl_t_walreceiver_context
共享結構:WalRcvCtlBlock *walRcvCtlBlock(與 startup 進程交互)
關鍵成員:
–receivedUpto:已寫入本地 WAL 段的 LSN
–receiveStart:本次連接期望從主庫拿到的起始 LSN
–feedback_message:熱備反饋,用於主庫回收舊 WAL - Startup/Recovery(備)
源碼:src/gausskernel/process/postmaster/startup.cpp→StartupXLOG()
負責解析 WAL 記錄並調用對應 redo 函數;openGauss 裏增加ParallelRedoWorker支持並行回放。
- 建連與握手(源碼級流程)
① 備庫啓動 →postmasterforkstartup→startupforkwalreceiver
②WalReceiverMain()通過 libpq 連接主庫,發送IDENTIFY_SYSTEM命令
③ 主庫後端進入WalSenderMain()→WalSndHandshake()
– 校驗協議版本、系統標識、時間線
– 協商起始 LSN(receiveStart)
– 若啓用了 SSL、壓縮、quorum,這裏一併協商標誌位
④ 握手成功 → 進入WalSndLoop(),開始實時流式複製
- 主庫發送端(XLog → Socket)
核心函數:XLogSend()(在walsender.cpp)
- 循環體內
pgstat_report_activity()更新進程狀態SendXlog()→ 從sendFile當前偏移讀取 8 KB 塊pq_putmessage('d', ...)封包寫入 TCP 發送緩衝區- 更新
sentPtr(已發送 LSN)
- 當
sentPtr >= flushPtr且WalSndCaughtUp=true時進入WalSndWaitForWal(),等待主庫產生新 WAL(epoll 阻塞在WalSenderMain()的select())
- 備庫接收端(Socket → XLog)
核心函數:WalReceiverMain()→WalReceiverReceive()
pq_getmessage()收包 → 寫入walRcvCtlBlock->receiveBuf- 緩衝區滿或收到完整記錄 →
XLogWalRcvWrite()→write()到本地${PGDATA}/pg_xlog/(或pg_wal) - 每 4 KB 或超時觸發
XLogWalRcvFlush()→fsync() - 更新共享內存
receivedUpto,並通過WalReceiverSendReply()把received/flush/applyLSN 回寫給主庫
- 一致性協議(Quorum / 同步備)
主結構:WalSndCtlData裏的
SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]:後端事務等待隊列lsn[NUM_SYNC_REP_WAIT_MODE]:隊列頭線程要等待的 LSN
關鍵代碼:SyncRepWaitForLSN()(src/gausskernel/replication/syncrep.cpp)
- 事務提交 →
XLogFlush()→SyncRepWaitForLSN(XactCommitLSN) - 把自身掛到
SyncRepQueue[mode],並睡在 latch 上 - WalSender 收到備機 ACK 後,
SyncRepReleaseWaiters()喚醒滿足ackLsn >= commitLsn的等待者 - Quorum 邏輯:配置
synchronous_standby_names = 'ANY 1 (dn1_2,dn2_2)'時,只要 N/2 個備機 ACK 即可放行。
- 並行回放(openGauss 擴展)
新增文件:
src/gausskernel/process/replication/parallel_rewind.cppsrc/gausskernel/storage/buffer/parallel_redo.cpp
關鍵結構:ParallelRedoCtx(共享內存)
redoWorkers[]:回放工作線程池(默認 4 個,可配recovery_parallelism)availableMsg:生產者-消費者環形隊列,槽位大小 = 1 條 WAL 記錄
流程:
- Startup 進程解析 WAL → 把可並行記錄(heap、btree、undo 等)塞進
ParallelRedoCtx - 空閒 worker 搶到槽位 → 調用對應 rmgr 的
redo函數 - 不可並行記錄(如 checkpoint、twophase)由 startup 自己串行執行
- 所有 worker 完成當前 LSN 後,更新
lastAppliedLSN→ 向主庫反饋 apply 進度
- 級聯備與壓縮
- 級聯備:中間備庫同時扮演 “WalSender + WalReceiver” 角色,代碼路徑同上述,僅增加
AmWalReceiverForStandby標誌。 - 壓縮:握手階段協商
compression = zlib/lz4,XLogSend()裏先deflate()再pq_putmessage(),節省跨機房帶寬。
- 快速定位的斷點清單 | 場景 | 函數文件 | 行區 | |---|---|---| | 主庫事務等待同步 |
SyncRepWaitForLSN()| syncrep.cpp | | 主庫發送 WAL |XLogSend()| walsender.cpp | | 備庫寫 WAL |XLogWalRcvWrite()| walreceiver.cpp | | 備庫反饋 ACK |WalReceiverSendReply()| walreceiver.cpp | | 並行回放入口 |ParallelRedoMain()| parallel_redo.cpp | | 級聯備身份切換 |WalReceiverMain()判斷AmWalReceiverForStandby| walreceiver.cpp |
把以上關鍵點串起來,就能在源碼層面清晰看到:
“事務提交 → 主庫 flush → Sender 發送 → Receiver 寫盤 → ACK → 喚醒等待隊列 → Startup/Worker 並行回放”
的完整數據鏈。後續二次開發(例如新增壓縮算法、自定義同步級別)只需在對應結構體里加字段、在對應函數里加分支即可。