摘要:在上一篇文章中,我們設計了一個基於 Actor 模式的“寫緩衝(Write-Behind)”防抖系統,看似美好,但是還是有消息亂序與數據丟失的隱患。本文將詳細記錄 V2 版本的重構思路:通過引入 阻塞背壓 (Blocking Backpressure)、延遲確認 (Deferred ACK) 和 事件循環 (Event Loop),構建一個更加健壯、嚴謹的防抖系統。
1. 背景與挑戰:從“跑通”到“跑穩”
在構建即時通訊系統的會話列表時,核心矛盾在於海量消息吞吐與數據庫有限寫入能力之間的衝突。業務要求每條消息都能更新會話的 last_active_time 和 digest(摘要),但直接的數據庫 UPDATE 操作會導致嚴重的寫放大(Write Amplification)。
上期的 V1 採用了“流水線 + Actor”模式:利用內存隊列暫存消息,通過 Map 進行去重合並,最後批量落庫。這種方案有三個隱患:
- 消息亂序風險:利用
NAK(拒絕消息)進行限流,會導致 NATS 將消息重新投遞。在重試過程中,舊消息可能排在新消息之後,破壞 FIFO 順序,導致會話狀態“時光倒流”。 - 數據丟失風險:消息一旦執行
ACK,若此時服務發生 OOM 或斷電,內存中未落庫的數據將永久丟失,違背了數據可靠性原則。 - 驅動模型冗餘:依賴定時器(ScheduledExecutor)輪詢,在低負載時存在空轉資源浪費,在高負載時又難以做到極致的“貪婪消費”。
針對上述問題,V2 版本對核心數據流轉邏輯進行了深度重構。
2. 核心重構一:流量控制 —— 棄用 NAK,擁抱阻塞 (Blocking Backpressure)
2.1 隱患分析:NAK 帶來的“亂序風暴”
初始版本為了保持消費者線程的非阻塞特性,在隊列滿時選擇直接 NAK 消息。NATS Server 收到 NAK 後會將消息重新加入待發送隊列。
這引發了一個嚴重的邏輯漏洞:重試打破了順序。
假設用户先後發送了消息 A(內容:Hello)和消息 B(內容:Bye)。若 A 被 NAK 而 B 成功入隊,A 稍後被重試投遞時,將排在 B 之後。系統處理時會誤認為 A 是最新狀態,導致會話摘要錯誤地顯示為 "Hello" 而非 "Bye"。此外,頻繁的 NAK 還會增加中間件的負載,甚至觸發最大投遞次數限制導致消息被丟棄。
2.2 解決方案:基於 TCP 的自然背壓
V2 版本將接收隊列替換為阻塞隊列 (BlockingQueue),並使用 put() 操作替代 offer()。
- 機制:設置極小的隊列容量(如 Size=8)。當消費者處理速度低於生產者時,隊列瞬間填滿。此時,NATS 消費線程在嘗試
put時被操作系統掛起(Block)。 - 連鎖反應:消費線程停止從 Socket 讀取數據 -\> 系統的 TCP 接收窗口(Receive Window)被填滿 -\> NATS Server 感知擁塞 -\> 自動降低對該客户端的推送速率。
-
收益:
- 嚴格順序:消息在服務端排隊等待,進入系統的順序永遠是絕對的 FIFO。
- 零丟包:不再有 NAK,也就消除了因“重試次數超限”而被丟棄的風險。
3. 核心重構二:數據安全 —— 延遲確認 (Deferred ACK) 與原子性提交
3.1 隱患分析:過早 ACK 導致的數據“裸奔”
初始版本遵循 接收 -> 轉換 -> ACK -> 入隊 的流程。ACK 意味着“責任轉移完成”。但在 Write-Behind 模式下,數據此時僅僅存在於易失性的內存中。一旦發生服務宕機,這部分已確認但未持久化的數據將徹底丟失。
3.2 解決方案:持有句柄的事務性處理
為了實現 At-Least-Once(至少一次) 的投遞保證,ACK 的時機必須後移至數據庫事務提交之後。這要求內存中的防抖容器(Map)不僅要存儲業務數據,還必須持有原始消息的引用(Handle)。
- 數據結構升級:Map 的 Value 包裝為
MessageWrapper,包含SessionUpdateEvent(業務對象)和Message(NATS 原生句柄)。 -
智能合併策略:
- 覆蓋場景:當新消息更新了同一個 Session,舊消息的數據價值失效。此時應立即 ACK 舊消息,僅保留新消息在 Map 中。
- 持久化場景:僅當
flushAll()方法成功執行完數據庫batchUpdate後,才遍歷當前批次的所有 Message 執行ack()。 - 異常兜底:若入庫失敗,則對該批次所有 Message 執行
nak(),觸發 NATS 重發,等待下一次處理。
4. 核心重構三:驅動模型 —— 單線程事件循環 (Event Loop)
4.1 隱患分析:定時器的侷限性
使用 ScheduledExecutorService 存在天然的滯後性。即使隊列瞬間被打滿,消費者也必須等待定時器觸發。且此前嘗試的虛擬線程模型並不適合這種“單例、長期駐留、CPU 密集型(Map 操作)”的任務。
4.2 解決方案:貪婪的單線程 Loop
V2 版本採用了類似 Redis 或 Node.js 的 Single Thread Event Loop 模型。創建一個長期駐留的平台線程,運行一個永不停止的 while 循環。
- 貪婪消費:使用
queue.poll(timeout)。一旦有數據,立即喚醒處理;處理完一條後,繼續循環嘗試獲取,最大化吞吐。 - 自帶心跳:
poll的超時時間(如 500ms)即為“最大落庫延遲”。如果系統空閒,線程休眠;一旦超時醒來,強制檢查是否需要刷盤。 - 無鎖設計:由於數據讀取、Map 合併、數據庫寫入均在同一個線程內串行執行,徹底消除了併發競爭,既安全又高效。
5. 核心代碼實現 (V2)
Talk is cheap. 下面是經過 V2 重構後的核心代碼實現。請注意代碼是如何通過 BlockingQueue 和 LinkedHashMap 的組合來實現上述流控與安全邏輯的。
5.1 整體結構與雙隊列定義
我們摒棄了複雜的第三方緩存組件,直接使用 JVM 內部數據結構,減少網絡開銷。
@Slf4j
@Component
public class SessionUpdateListener implements NatsConsumer {
// 【Queue A】接收隊列:容量極小 (8),核心作用是建立"背壓"
// 當消費者處理不過來時,這個隊列會滿,從而阻塞 NATS 客户端線程
private static final int RECEIVE_QUEUE_SIZE = 8;
private final BlockingQueue<Message> receiveQueue = new ArrayBlockingQueue<>(RECEIVE_QUEUE_SIZE);
// 【Queue B】防抖容器:LRU 模式,用於合併重複的 Session 更新
// accessOrder=true 確保我們能在容量滿時擠出"最老"的數據
private static final int PENDING_MAP_SIZE = 1000;
private final LinkedHashMap<String, MessageWrapper> pendingMap =
new LinkedHashMap<>(16, 0.75f, true);
// 單線程處理器:系統的"心臟"
private final Thread processingThread;
public SessionUpdateListener(SessionUpdateService sessionUpdateService) {
this.sessionUpdateService = sessionUpdateService;
// 啓動一個守護線程,專門負責"消費 -> 合併 -> 落庫"的全流程
processingThread = new Thread(this::processLoop, "session-update-processor");
processingThread.setDaemon(true);
processingThread.start();
}
// 包裝類:持有 NATS 原始句柄,直到入庫成功才 ACK
private record MessageWrapper(Message message, SessionUpdateEvent event) {}
}
5.2 生產者:優雅的阻塞背壓
這是 V2 版本最大的改變。我們不再使用 offer + NAK,而是直接使用 put。
@SneakyThrows
@Override
public void onMessage(Message msg) {
// 【關鍵點】阻塞式寫入
// 如果 receiveQueue 滿了,當前線程(NATS Client 線程)會在此掛起 (WAITING)
// 這會導致 TCP 接收窗口填滿,進而讓 NATS Server 自動降低推送速率
// 保證了所有消息嚴格 FIFO,不亂序,不丟棄
receiveQueue.put(msg);
}
5.3 消費者:貪婪的事件循環 (Event Loop)
我們移除了 ScheduledExecutor,改用 poll(timeout) 機制。這既保證了高負載下的實時消費,又充當了低負載下的心跳機制。
private void processLoop() {
while (true) {
try {
// 1. 貪婪獲取:帶 500ms 超時
// 有數據 -> 立即返回,微秒級延遲
// 無數據 -> 等待 500ms,相當於心跳間隔
Message msg = receiveQueue.poll(CHECK_INTERVAL_MS, TimeUnit.MILLISECONDS);
if (msg != null) {
addToPendingMap(msg); // 進貨:放入 Map 合併
} else {
flushAll(); // 心跳:超時沒數據,強制刷盤,防止數據滯留
}
} catch (Exception e) {
if (e instanceof InterruptedException) {
throw (RuntimeException) e;
}
log.error("Error processing queue", e);
}
}
}
5.4 核心邏輯:智能合併與延遲 ACK
這是解決“亂序”與“丟數據”的關鍵邏輯。注意我們是如何處理 ACK 的:只 ACK 被覆蓋的舊消息,保留最新消息的句柄直到落庫。
private void addToPendingMap(Message msg) {
// ... 解析代碼略 (ConvertUtils) ...
// 智能合併邏輯 (compute)
pendingMap.compute(sessionKey, (key, existing) -> {
if (existing != null) {
if (existing.event().getTimestamp() > event.getTimestamp()) {
// 【場景 A:亂序】
// 內存裏的消息比新來的更"新",説明新消息是遲到的舊狀態
// 策略:直接 ACK 新消息(丟棄),保留內存裏的現有值
msg.ack();
return existing;
} else {
// 【場景 B:正常更新/覆蓋】
// 新消息是最新的,舊消息已經沒用了
// 策略:ACK 舊消息(它完成了歷史使命),將新消息放入 Map
existing.message().ack();
return new MessageWrapper(msg, event);
}
}
// 【場景 C:新值】直接存入
return new MessageWrapper(msg, event);
});
// 兜底策略:如果 Map 滿了,主動擠出最老的數據單獨落庫
if (pendingMap.size() >= PENDING_MAP_SIZE && !pendingMap.containsKey(sessionKey)) {
evictOldest();
}
}
5.5 提交階段:原子性批量落庫
最後,在刷盤階段,我們嚴格遵循 先寫庫,後 ACK 的原則,實現了 At-Least-Once 語義。
private void flushAll() {
if (pendingMap.isEmpty()) return;
try {
// 1. 數據庫批量寫入 (Batch Insert/Update)
int updated = sessionUpdateService.batchCreateOrUpdateSessions(requests);
// 2. 只有入庫成功,才對這批消息進行 ACK
// 如果 DB 報錯,這裏不會執行 ACK,NATS 會在超時後自動重發所有消息
for (MessageWrapper wrapper : pendingMap.values()) {
wrapper.message().ack();
}
pendingMap.clear();
log.debug("Batch updated {} sessions", updated);
} catch (Exception e) {
// 異常處理:日誌記錄
// 注意:這裏沒有顯式調用 NAK,而是依靠 NATS 的 AckWait 機制自動重試
// 或者也可以在這裏顯式調用 nak() 加速重試
log.error("Batch update failed...", e);
}
}
6. 總結
從 V1 到 V2 的演進,體現了架構設計中從“功能實現”到“生產高可用”的思維轉變:
- 一致性優先:在消息順序和數據不丟面前,非阻塞的極致吞吐量是可以被權衡的。阻塞隊列提供了系統自我保護的底線,防止了雪崩和亂序。
- 事務延伸:通過延遲 ACK,將中間件的消息生命週期與數據庫事務強綁定,實現了跨系統的最終一致性。
- 架構極簡:單線程模型在 I/O 密集型(數據庫寫)與計算密集型(Map 操作)混合的場景下,通過串行化設計去除了複雜的鎖機制,反而提升了系統的穩定性和可維護性。
通過這次優化,系統成功填補了高併發下的併發陷阱,不僅解決了寫放大問題,更確保了會話數據的精準與安全。
本文由mdnice多平台發佈