博客 / 詳情

返回

會話更新的防抖進化 —— 填補“亂序”與“丟數據”的深坑

摘要:在上一篇文章中,我們設計了一個基於 Actor 模式的“寫緩衝(Write-Behind)”防抖系統,看似美好,但是還是有消息亂序與數據丟失的隱患。本文將詳細記錄 V2 版本的重構思路:通過引入 阻塞背壓 (Blocking Backpressure)延遲確認 (Deferred ACK)事件循環 (Event Loop),構建一個更加健壯、嚴謹的防抖系統。


1. 背景與挑戰:從“跑通”到“跑穩”

在構建即時通訊系統的會話列表時,核心矛盾在於海量消息吞吐數據庫有限寫入能力之間的衝突。業務要求每條消息都能更新會話的 last_active_timedigest(摘要),但直接的數據庫 UPDATE 操作會導致嚴重的寫放大(Write Amplification)。

上期的 V1 採用了“流水線 + Actor”模式:利用內存隊列暫存消息,通過 Map 進行去重合並,最後批量落庫。這種方案有三個隱患:

  1. 消息亂序風險:利用 NAK(拒絕消息)進行限流,會導致 NATS 將消息重新投遞。在重試過程中,舊消息可能排在新消息之後,破壞 FIFO 順序,導致會話狀態“時光倒流”。
  2. 數據丟失風險:消息一旦執行 ACK,若此時服務發生 OOM 或斷電,內存中未落庫的數據將永久丟失,違背了數據可靠性原則。
  3. 驅動模型冗餘:依賴定時器(ScheduledExecutor)輪詢,在低負載時存在空轉資源浪費,在高負載時又難以做到極致的“貪婪消費”。

針對上述問題,V2 版本對核心數據流轉邏輯進行了深度重構。


2. 核心重構一:流量控制 —— 棄用 NAK,擁抱阻塞 (Blocking Backpressure)

2.1 隱患分析:NAK 帶來的“亂序風暴”

初始版本為了保持消費者線程的非阻塞特性,在隊列滿時選擇直接 NAK 消息。NATS Server 收到 NAK 後會將消息重新加入待發送隊列。

這引發了一個嚴重的邏輯漏洞:重試打破了順序
假設用户先後發送了消息 A(內容:Hello)和消息 B(內容:Bye)。若 A 被 NAK 而 B 成功入隊,A 稍後被重試投遞時,將排在 B 之後。系統處理時會誤認為 A 是最新狀態,導致會話摘要錯誤地顯示為 "Hello" 而非 "Bye"。此外,頻繁的 NAK 還會增加中間件的負載,甚至觸發最大投遞次數限制導致消息被丟棄。

2.2 解決方案:基於 TCP 的自然背壓

V2 版本將接收隊列替換為阻塞隊列 (BlockingQueue),並使用 put() 操作替代 offer()

  • 機制:設置極小的隊列容量(如 Size=8)。當消費者處理速度低於生產者時,隊列瞬間填滿。此時,NATS 消費線程在嘗試 put 時被操作系統掛起(Block)。
  • 連鎖反應:消費線程停止從 Socket 讀取數據 -\> 系統的 TCP 接收窗口(Receive Window)被填滿 -\> NATS Server 感知擁塞 -\> 自動降低對該客户端的推送速率。
  • 收益

    • 嚴格順序:消息在服務端排隊等待,進入系統的順序永遠是絕對的 FIFO。
    • 零丟包:不再有 NAK,也就消除了因“重試次數超限”而被丟棄的風險。

3. 核心重構二:數據安全 —— 延遲確認 (Deferred ACK) 與原子性提交

3.1 隱患分析:過早 ACK 導致的數據“裸奔”

初始版本遵循 接收 -> 轉換 -> ACK -> 入隊 的流程。ACK 意味着“責任轉移完成”。但在 Write-Behind 模式下,數據此時僅僅存在於易失性的內存中。一旦發生服務宕機,這部分已確認但未持久化的數據將徹底丟失。

3.2 解決方案:持有句柄的事務性處理

為了實現 At-Least-Once(至少一次) 的投遞保證,ACK 的時機必須後移至數據庫事務提交之後。這要求內存中的防抖容器(Map)不僅要存儲業務數據,還必須持有原始消息的引用(Handle)。

  • 數據結構升級:Map 的 Value 包裝為 MessageWrapper,包含 SessionUpdateEvent(業務對象)和 Message(NATS 原生句柄)。
  • 智能合併策略

    • 覆蓋場景:當新消息更新了同一個 Session,舊消息的數據價值失效。此時應立即 ACK 舊消息,僅保留新消息在 Map 中。
    • 持久化場景:僅當 flushAll() 方法成功執行完數據庫 batchUpdate 後,才遍歷當前批次的所有 Message 執行 ack()
    • 異常兜底:若入庫失敗,則對該批次所有 Message 執行 nak(),觸發 NATS 重發,等待下一次處理。

4. 核心重構三:驅動模型 —— 單線程事件循環 (Event Loop)

4.1 隱患分析:定時器的侷限性

使用 ScheduledExecutorService 存在天然的滯後性。即使隊列瞬間被打滿,消費者也必須等待定時器觸發。且此前嘗試的虛擬線程模型並不適合這種“單例、長期駐留、CPU 密集型(Map 操作)”的任務。

4.2 解決方案:貪婪的單線程 Loop

V2 版本採用了類似 Redis 或 Node.js 的 Single Thread Event Loop 模型。創建一個長期駐留的平台線程,運行一個永不停止的 while 循環。

  • 貪婪消費:使用 queue.poll(timeout)。一旦有數據,立即喚醒處理;處理完一條後,繼續循環嘗試獲取,最大化吞吐。
  • 自帶心跳poll 的超時時間(如 500ms)即為“最大落庫延遲”。如果系統空閒,線程休眠;一旦超時醒來,強制檢查是否需要刷盤。
  • 無鎖設計:由於數據讀取、Map 合併、數據庫寫入均在同一個線程內串行執行,徹底消除了併發競爭,既安全又高效。

5. 核心代碼實現 (V2)

Talk is cheap. 下面是經過 V2 重構後的核心代碼實現。請注意代碼是如何通過 BlockingQueueLinkedHashMap 的組合來實現上述流控與安全邏輯的。

5.1 整體結構與雙隊列定義

我們摒棄了複雜的第三方緩存組件,直接使用 JVM 內部數據結構,減少網絡開銷。

@Slf4j
@Component
public class SessionUpdateListener implements NatsConsumer {

  // 【Queue A】接收隊列:容量極小 (8),核心作用是建立"背壓"
  // 當消費者處理不過來時,這個隊列會滿,從而阻塞 NATS 客户端線程
  private static final int RECEIVE_QUEUE_SIZE = 8;
  private final BlockingQueue<Message> receiveQueue = new ArrayBlockingQueue<>(RECEIVE_QUEUE_SIZE);

  // 【Queue B】防抖容器:LRU 模式,用於合併重複的 Session 更新
  // accessOrder=true 確保我們能在容量滿時擠出"最老"的數據
  private static final int PENDING_MAP_SIZE = 1000;
  private final LinkedHashMap<String, MessageWrapper> pendingMap =
    new LinkedHashMap<>(16, 0.75f, true);

  // 單線程處理器:系統的"心臟"
  private final Thread processingThread;

  public SessionUpdateListener(SessionUpdateService sessionUpdateService) {
    this.sessionUpdateService = sessionUpdateService;
    // 啓動一個守護線程,專門負責"消費 -> 合併 -> 落庫"的全流程
    processingThread = new Thread(this::processLoop, "session-update-processor");
    processingThread.setDaemon(true);
    processingThread.start();
  }
  
  // 包裝類:持有 NATS 原始句柄,直到入庫成功才 ACK
  private record MessageWrapper(Message message, SessionUpdateEvent event) {}
}

5.2 生產者:優雅的阻塞背壓

這是 V2 版本最大的改變。我們不再使用 offer + NAK,而是直接使用 put

  @SneakyThrows
  @Override
  public void onMessage(Message msg) {
    // 【關鍵點】阻塞式寫入
    // 如果 receiveQueue 滿了,當前線程(NATS Client 線程)會在此掛起 (WAITING)
    // 這會導致 TCP 接收窗口填滿,進而讓 NATS Server 自動降低推送速率
    // 保證了所有消息嚴格 FIFO,不亂序,不丟棄
    receiveQueue.put(msg);
  }

5.3 消費者:貪婪的事件循環 (Event Loop)

我們移除了 ScheduledExecutor,改用 poll(timeout) 機制。這既保證了高負載下的實時消費,又充當了低負載下的心跳機制。

  private void processLoop() {
    while (true) {
      try {
        // 1. 貪婪獲取:帶 500ms 超時
        // 有數據 -> 立即返回,微秒級延遲
        // 無數據 -> 等待 500ms,相當於心跳間隔
        Message msg = receiveQueue.poll(CHECK_INTERVAL_MS, TimeUnit.MILLISECONDS);
        
        if (msg != null) {
          addToPendingMap(msg); // 進貨:放入 Map 合併
        } else {
          flushAll(); // 心跳:超時沒數據,強制刷盤,防止數據滯留
        }
      } catch (Exception e) {
         if (e instanceof InterruptedException) {
          throw (RuntimeException) e;
        }
        log.error("Error processing queue", e);
      }
    }
  }

5.4 核心邏輯:智能合併與延遲 ACK

這是解決“亂序”與“丟數據”的關鍵邏輯。注意我們是如何處理 ACK 的:只 ACK 被覆蓋的舊消息,保留最新消息的句柄直到落庫。

  private void addToPendingMap(Message msg) {
    // ... 解析代碼略 (ConvertUtils) ...

    // 智能合併邏輯 (compute)
    pendingMap.compute(sessionKey, (key, existing) -> {
      if (existing != null) {
        if (existing.event().getTimestamp() > event.getTimestamp()) {
          // 【場景 A:亂序】
          // 內存裏的消息比新來的更"新",説明新消息是遲到的舊狀態
          // 策略:直接 ACK 新消息(丟棄),保留內存裏的現有值
          msg.ack();
          return existing;
        } else {
          // 【場景 B:正常更新/覆蓋】
          // 新消息是最新的,舊消息已經沒用了
          // 策略:ACK 舊消息(它完成了歷史使命),將新消息放入 Map
          existing.message().ack();
          return new MessageWrapper(msg, event);
        }
      }
      // 【場景 C:新值】直接存入
      return new MessageWrapper(msg, event);
    });
    
    // 兜底策略:如果 Map 滿了,主動擠出最老的數據單獨落庫
    if (pendingMap.size() >= PENDING_MAP_SIZE && !pendingMap.containsKey(sessionKey)) {
        evictOldest();
    }
  }

5.5 提交階段:原子性批量落庫

最後,在刷盤階段,我們嚴格遵循 先寫庫,後 ACK 的原則,實現了 At-Least-Once 語義。

  private void flushAll() {
    if (pendingMap.isEmpty()) return;

    try {
      // 1. 數據庫批量寫入 (Batch Insert/Update)
      int updated = sessionUpdateService.batchCreateOrUpdateSessions(requests);
      
      // 2. 只有入庫成功,才對這批消息進行 ACK
      // 如果 DB 報錯,這裏不會執行 ACK,NATS 會在超時後自動重發所有消息
      for (MessageWrapper wrapper : pendingMap.values()) {
        wrapper.message().ack();
      }
      pendingMap.clear();
      log.debug("Batch updated {} sessions", updated);
    } catch (Exception e) {
      // 異常處理:日誌記錄
      // 注意:這裏沒有顯式調用 NAK,而是依靠 NATS 的 AckWait 機制自動重試
      // 或者也可以在這裏顯式調用 nak() 加速重試
      log.error("Batch update failed...", e);
    }
  }

6. 總結

從 V1 到 V2 的演進,體現了架構設計中從“功能實現”到“生產高可用”的思維轉變:

  1. 一致性優先:在消息順序和數據不丟面前,非阻塞的極致吞吐量是可以被權衡的。阻塞隊列提供了系統自我保護的底線,防止了雪崩和亂序。
  2. 事務延伸:通過延遲 ACK,將中間件的消息生命週期與數據庫事務強綁定,實現了跨系統的最終一致性。
  3. 架構極簡單線程模型在 I/O 密集型(數據庫寫)與計算密集型(Map 操作)混合的場景下,通過串行化設計去除了複雜的鎖機制,反而提升了系統的穩定性和可維護性。

通過這次優化,系統成功填補了高併發下的併發陷阱,不僅解決了寫放大問題,更確保了會話數據的精準與安全。

本文由mdnice多平台發佈

user avatar
0 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.