多線程問題

對於mysql裏面上萬條信息,我開多線程,比如10個,要是完成第一個線程處理前k條,後面的依次處理後k條,這怎麼建立?要是第3個線程處理k到k+n條數據出問題的時候,我應該記錄這批次信息的id到一個單獨的地方,我覺得這數據量小,可以存在List裏面,然後該批次(批次大小200)的所有數據都進行事務回滾,不提交。最後全部都處理完了,再去處理這些異常的list列表裏面的素材,

數據分片和多線程分配

我希望10個線程去處理不同的數據段,比如每個線程處理1萬條數據,這是數據分片或者説範圍劃分

實現方式:

  1. 主線程預先分片:在主線程中,首先確定總數據量(比如一萬條)和分片規則,一個簡單的奏效的方法就是基於主鍵ID進行範圍劃分(假設ID是連續的)
  • 查詢出最小和最大的ID
  • 計算每個線程需要處理的數據量sliceSize = totalRecords / numThreads
  • 為每個線程分配一個ID範圍Thread 1: [minId, minId + sliceSize),Thread 2: [minId + sliceSize, minId + 2 * sliceSize)
  1. 每個線程內部分批:每個線程負責處理分配給它的哪個大範圍,在線程內部,為了控制事務大小和內存運用,會為這個大範圍的數據按照批次大小(比如200條)進行處理
  • 線程3會從數據庫分頁查詢WHERE id >= start_id AND id<end_id LIMIT 200 OFFSET 0,處理第一批200條
  • 然後OFFSET 200 處理下一批,直到處理完

互斥的。就是這樣做的好處就是避免了線程間對同一條數據的競爭,因為每個線程處理的數據範圍

異常處理、回滾、重試

  1. 事務邊界:每個批次200條作為一個獨立的事務
  • 開始處理批次前,開啓一個數據庫事務。
  • 如果這200條中任何一條處理失敗(拋出異常),整個事務會立即回滾,數據庫會撤銷該批次內所有已執行的操作,保證數據一致性
  • commit事務就是如果全部成功就
  1. 異常捕獲和記錄
  • 當第三個線程處理範圍的某個批次失敗時,會被try-catch捕獲
  • 在catch塊中,首先執行事務回滾,確保數據庫乾淨
  • 批次的偏移量信息(threadId: 3, batchIndex: 5)就是然後不是記錄單個失敗的id,而是記錄整個失敗批次的標識信息,這個標識可以是起始ID和結束ID(failedRange: {start: 3000, end: 3200),或者
  • 將這個標識符添加到一個全局的線程安全的異常批次列表中,比如ConcurrentLinkedQueue或者Collections.synchronizedList(new ArrayList<>)中,對於數據量小的,用List足夠。
  1. 後續處理
  • 主線程利用ExecutorService和countDownLatch等工具,等待所有10個工作線程達成他們各自的材料範圍
  • 所有正常批次處理完畢後,程序開始檢查哪個全局的異常批次列表
  • 然後,許可啓動新的線程或者複用原來的線程,專門對這些之前失效的批次進行重試,重試邏輯和之前一樣,每個批次一個事務,成功則提交,失敗則回滾然後再次記錄,定義一個retryMaxCount,超過了就人工重試吧。

線程池

從疑問入手,線程池拒絕策略怎麼選才不會丟失任務?

線程池拒絕策略的觸發,本質是:“任務提交速度超過了線程池的處理能力”

  1. 核心線程全忙
  2. 任務隊列已存滿待執行任務
  3. 線程數以達到最大線程數,無法再創建新線程。

JDK獻出了4種拒絕策略,實現RejectedExecutionHandler接口,

AbortPolicy(默認策略):直接拋異常,任務必丟

  • 核心邏輯
    一旦觸發拒絕,直接拋出RejectedExecutionException異常,提交的任務不會被執行。
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5,10,60L,TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(20),
  new ThreadPoolExecutor.AbortPolicy()//默認策略,可省略
  )
  • 適用場景
    適合任務丟失會直接導致業務異常,需立即感知的場景,比如金融轉賬(丟了轉賬任務必須馬上報錯,不能默默吞掉)
  • 風險點
    假如沒有捕獲異常,會導致提交任務的線程崩潰;如果捕獲了異常卻不處理,任務還是會丟。

DiscardPolicy:默默丟棄任務

  • 核心邏輯
    觸發拒絕時,直接丟棄任務,不拋出任何異常

DiscardOldestPolicy:丟棄隊列最老任務,保留新任務

  • 核心邏輯
    觸發拒絕時,先把任務隊列種“最老的任務(隊列頭部任務)”丟棄,再把新提交的任務加入隊列。
  • 使用場景
    適合新任務比老任務更核心的場景,比如實時數據統計(用户最新的行為數據比10秒前的更有價值,丟老數據保留新數據)。
  • 風險點
    會丟老任務,且如果隊列種是“必須執行的核心任務”(比如訂單創建),丟老任務會導致業務異常 。

CallerRunsPolicy: 讓提交任務的線程自己執行,不丟任務

  • 核心邏輯
    觸發拒絕時,不丟任務也不拋異常,而是讓“提交任務的線程”(比如主線程)自己執行這個任務。
  • 使用場景
    不會丟失就是比如秒殺下單,用户註冊—即使線程池滿了,也能讓提交任務的線程執行任務,雖然會慢,可
  • 關鍵優勢
    任務無丟失,只要提交線程不崩潰。任務一定可以執行。
    自帶限流效果:提交線程執行任務時,無法再提交新任務,相當於“減緩任務提交速度”,間接幫線程池減輕壓力
  • 風險點
    假如提交任務的線程是“核心線程(如主線程)”,執行任務時會阻塞主線程,導致其他業務受影響。

自定義拒絕策略:MQ

  • 核心邏輯
    把任務丟進消息隊列種,然後由專門的消費者線程從MQ中拉取任務,重新提交到線程池,直到執行成功就是觸發拒絕時,不直接處理任務,而
  • 使用場景
    絕對不能丟任務的核心任務,比如訂單支付、轉賬、庫存扣減—即使線程池長期滿負荷,任務也能在MQ中暫存,重試後執行。
  • 關鍵優勢
    無任務丟失風險,且不阻塞提交線程

1. CallerRunsPolicy會阻塞提交線程,怎麼避免?

CallerRunsPolicy確實可能阻塞線程,所以實際用的時候要加兩個限制:第一,判斷提交線程是否是核心線程,如果是核心線程,就不用CallerRunsPolicy,
改用MQ重試策略,避免阻塞核心業務;第二,給CallerRunsPolicy加超時控制,比如用Future.get(timeout),超時則丟棄任務。

2. 用MQ重試策略,怎麼避免任務重複執行

要避免任務重複執行,需要考慮以下難題:第一,任務要帶唯一標識(比如訂單ID),提交到MQ時把唯一標識作為消息ID;第二,執行任務前先檢查這個唯一標識的任務是否已經執行過(查mysql或者redis)
,假如已經執行過就直接返回成功,沒執行過再執行—比如下單任務,用訂單ID作為唯一標識,執行前查Redis是否有order:123:executed的key,有就跳過,沒有就執行扣減庫存,執行完再存key。

3. 線程池滿了,除了拒絕策略,還有什麼辦法避免丟任務

除了拒絕策略,還要從源頭減少拒絕的發生:第一:提前預熱核心線程,避免首次任務延遲導致隊列堆積;第二,合理設置線程池參數(核心線程數=業務峯值QPS/單線程QPS,隊列容量 = 峯值持續時間*單線程QPS),比如秒殺峯值1000QPS,單線程每秒處理10個任務,核心線程數設置為100,
隊列容量設置為500,基本能應對峯值;第三,在提交任務前檢查線程池狀態,如果隊列滿了,就提前返回系統繁忙稍後再試。

總結

核心業務,希望不丟任務,用MQ重試策略,非核心業務,實時場景,用DiscardOldestPolicy,輕鬆場景(不阻塞核心線程),用CallerRunsPolicy,絕對不推薦AbortPolicy和DiscardPolicy

用CallerRunsPolicy別阻塞核心線程,用MQ重試別忘用冪等機制,用動態擴容別超過資源上限。

拒絕策略只是兜底方案,更重要的是,提前優化線程池參數+源頭控制流量,從根本上減少拒絕的發生。

Kafka障礙

基礎

首先,kafka包含生產者集羣、broker集羣、ZK集羣、消費者集羣,生產者集羣發送消息到broker集羣,broker集羣收集後由消費者集羣消費,broker集羣會上報ZK信息,維持心跳。

broker可以理解為kafka本身,接收生產者消息,持久化到磁盤,然後處理消費者的一個拉取的請求。

ZK就是一個註冊中心,用於管理broker集羣,保存topic和partition的一個路由信息。

從消息的角度來看,broker會通過topic去分類。生產者呢,把不同類型的消息發送給對應的topic然後訂閲者去訂閲對應的topic,然後進行消費topic。

從存儲的角度來看,topic內部其實是分區,也就是partition。

這個partition就是消息隊列。

消費

一條消息可以被多個消費者消費,那麼問題就來了,我怎麼知道每個消費者的消費到哪裏了呢,於是,partition中每個消息都有一個唯一標識,叫做偏移量offset。

記錄offset就知道消費到哪了。就是 也就是個單調遞增的整數,於

消息的順序

如果一個topic下有多個partition,每個partition都只保存一部分的數據。broker是做集羣的,所以一個topic下的不同的partition可以分佈到不同的broker節點上,這就解決了單機的性能瓶頸。

只是kafka只能保證同一個partition內部的消息是有序的,也就是説,單個隊列內的消息是有序的,不同的partition之間,也就是不同的隊列之間,消息的順序性無法保證。

高可用/主從/數據備份

那麼難題來了,每個partition都只存一部分數據,那若是broker掛了,那存的這部分數據不就丟失了?kafka是如何保證高可用的呢?

kafka提供了一個多副本機制,就是每個partition的數據都會同步到其他的broker節點上,每一個partition在其他的broker上就有了多個副本,然後這些副本會選擇一個leader出來,讓leader去和生產者消費者去交互,然後leader收到了寫請求,他就會把數據同步給所有的副本,其他副本都是備份。

這裏就是一個主從的概念,嚴格來説是主備,只不過kafka這裏做的是partition層面的主備,而不是broker層面的主備。

那為什麼不讀副本呢,如果説要讀這個副本的partition的話,那你上次讀這個partition,下次讀哪個partition,offset就很難管理了。

消費者組

那一個topic下面有多個分佈到不同broker節點上的partition,那麼多的partition,假設只有一個消費者消費太慢了吧。

怎麼辦呢?

多個消費者可以組成一個消費者組,隨後消費者組內的多個消費者就可以並行的消費topic中不同的partition。

注意,topic中的一個partition只能被一個消費者組的一個消費者消費。

消息的分發和獲取

那麼説消費者組內的數量超過了topic的數量,那麼多餘的消費者就空閒了,就不能消費了,那麼障礙又來了,多個broker,生產者發消息的時候怎麼知道應該發給哪個broker呢?

消費者獲取消息的時候怎麼知道應該從哪個broker中獲取消息呢?

首先broker集羣定時向zk發送心跳包,上報自身信息,那zk就掌管了所有的broker topic和partition的信息。

broker呢,會選舉出一個大哥叫controller,他會監聽zk中的一個topic的一個變化,一旦topic變化,那麼controller就會從zk裏面拉取最新信息。之後廣播給其他的broker,於是每個broker中其實都會存儲最新的集羣信息和路由表。

生產者通過訪問broker其實就能獲取到路由信息,接着根據部署的一個分區策略把消息直接發送給目標topic對應的那個leader partition對應的broker,然後broker收到消息之後寫到partition末尾,然後分配一個offset,接着leader會同步數據給自己的副本 。

消息的一個消費者呢,通過訪問broker就能獲取到訂閲該topic的一個路由信息,就能知道應該去哪個broker中去拉取消息了,隨後利用自己記錄最新的offset就可以去讀取信息了。