一、背景在推薦系統中,樣本拼接是銜接在線服務與算法模型的重要一個環節,主要職責是樣本拼接和業務相關的ETL處理等,模塊位置如下圖紅框所示。
推薦系統通過學習埋點數據來達到個性化精準推薦的目的,因此需要知道服務端推薦下發的內容,是否有一系列的行為(曝光,點擊,播放,點贊,收藏,加購等等),把被推薦內容的埋點數據與當下的特徵拼接起來的過程,一般稱為樣本拼接,一個簡化的流程如下:
推薦的過程可以檢驗概括為以下幾點:後台服務rank 推薦內容給app客户端,同時把內容對應的特徵快照保存起來;app接收到內容後,埋點日誌被上報到消息中間件;樣本拼接負責將特徵與埋點日誌拼接起來,定義正負樣本,格式轉換;模型接收樣本訓練,將使用最新的模型做推薦。為了保證較高的拼接率和穩定性,我們的拼接架構也經過了長時間的迭代,這篇文章我將給大家介紹vivo特徵拼接架構的發展歷程、當前方案、當前方案遇到的問題和解決方案,以及未來的規劃和展望,希望能幫助到業內的同學。二、拼接方案選型2.1 小時粒度拼接小時拼接是將埋點日誌和特徵快照都保存到Hive並以小時分區,每小時調度一個Spark任務來處理兩個表相應分區的數據做拼接,由於是小時拼接,實時性較低,Spark作業本身也依賴於上游Hive表小時分區生成,每個小時末尾的請求埋點有可能是落在當前小時,也有可能落在下個小時。舉個例子:19點50分下發了一個視頻,客户端在19:59分點擊了,但是視頻播放卻是在20點03分完成的,這個時候就會存在拼接不上的問題。
2.2 基於 Redis 的流式拼接為了提升拼接率,且達到實時拼接,節點故障容災,完備監控等特性,Flink是一個很好的替代方案,也是最近幾年比較主流的實現。最初在實時推薦場景中,Kafka中的特徵快照通過Flink任務寫入到Redis,另一個Flink任務消費曝光埋點數據和點擊埋點數據並讀取存在Redis中的特徵快照數據做拼接,拼接後的數據作為拼接特徵被寫入到下游的Kafka中,提供給後續的算法做模型的訓練,架構圖如下:
經過一段時間實踐,以上的方案出現了兩個痛點:Redis中存儲了幾十T的數據,Redis的成本高;業務數據流量會波動,經常需要DBA對Redis集羣進行擴容,涉及大量數據的遷移,運維成本高。2.3 基於 RocksDB 大狀態流式拼接為了解決基於Redis的作為中間數據的存儲存在的問題,我們採用Flink狀態來存儲特徵快照,整個架構中不再需要外部的Redis,由於我們需要存儲的數據量達幾十T,這裏我們選用適合大數據量存儲的RocksDB類型的狀態後端,調整後架構更加簡潔,如下圖所示:
流程如下:首先將曝光流點點擊流以及特徵在Flink 任務中做union並做keyby;在processElement方法中如果接收到曝光流就將數據保存到state中,如果接收到曝光流就將數據保存到state中,如果接收到特徵就去state中查詢相應的曝光和點擊數據;如果能找到就發送到下游並將狀態數據清理掉,沒找到就將特徵保存到state中,並註冊一個定時器;定時器觸發時去state中查詢相應的曝光和點擊數據,如果找到就發到下游,並將狀態數據清理掉。由於RocksDB可以同時利用內存和磁盤來存儲數據,所以對於內存的使用量大幅下降,由於RocksDB是嵌入式的數據庫,每個TM上的RocksDB數據庫只存儲shuffe到該TM上的數據,無需再關注擴縮容的問題。當然隨着數據上漲,Flink流式拼接在實際的生產過程中也遇到了一系列的問題,為了保證業務的可用性,我們花了較長的時間對這些問題進行攻克,目前任務穩定性達到99.99% ,拼接率長期穩定在99%以上,對拼接效果提升較大。下面我將列舉我們遇到的問題和解決方案,希望能夠幫助到業內的其他團隊。三、問題及解決方案3.1 TM Lost問題3.1.1 現象在方案實施之初,我們發現這些特徵拼接的任務頻繁出現TM was Lost異常導致任務重啓,我們看了日誌,發現都是TM內存超出了YARN的內存限制被kill。3.1.2 問題分析那麼我們的疑問就來了,為啥這部分任務的內存很容易超出,超出的那部分內存又是誰在用呢?下面這張圖是來自Flink的官網,因為我們在平台使用Flink的時,我們只設置了總的內存,並沒有關注其他各個局部的內存,那麼這些部位的內存是如何分配的?為了搞清楚這個問題,有必要梳理一下每個模塊內存計算的邏輯。
圖片引用自FlinkFlink內存分配邏輯一般在YARN上提交的任務是含有taskmanager.memory.process.size 參數的配置的,所以Flink在分配內存時,會以調用deriveProcessSpecWithTotalProcessMemory 方法分配。通過配置參數獲得meatspace 的大小,通過jobmanager.memory.jvm-overhead.fraction 的比例計算overhead的內存,totalFlinkMemory通過總的進程的內存減去meatspace + overhead的內存得到。通過配置中的參數獲取 frameworkHeapMemory-Size、frameworkOffHeapMemorySize 、task-OffHeapMemorySize 的大小。通過managedmemory的配置獲取託管內存的值, 通過networkbuffer的配置獲取networkbuffer的值 。totalFlinkMemory 減去所有需要排除的內存,剩下的內存分配給堆。內存分配邏輯,以及每塊內存的設置方法如下圖:
到此TM的各個內存模塊的內存已經劃分完成。有上面的分析我們可以得出以下的結論:totalProcessMemorySize = totalFlinkMemorySize + JvmMetaspaceSize + JvmOverheadSize
totalFlinkMemorySize = frameworkOffHeapMemorySize + taskOffHeapMemorySize + managedMemorySize + networkMemorySize + frameworkHeapMemorySize + taskHeapMemorySize這裏重點將一下JVMOverhead,JVMOverhead並沒有具體的作用,是一個預留值,它是一個緩衝區,可以避免在Flink運行在容器中是因為短時時間的內存超出了容器的限制而被kill。frameworkOffHeapMemorySize和taskOff-HeapMemorySize 也是預留值,offheap在概念上的主要是指native內存。frameworkHeap-MemorySize 也是預留值。由此可以看出雖然Flink官方將TM的內存劃分的較細緻,但是像JvmOverheadSize frameworkOffHeap-MemorySize,taskOffHeapMemorySize,frameworkHeapMemorySize 都只是邏輯上的預留,並沒有從操作系統層面實現隔離。RocksDB內存分配邏輯因為堆內存不足時一般會報out of memory的異常,所以到這一步我們推測應該是堆外內存溢出了,而堆外內存最大的一塊就是RocksDB使用的,而從Flink的官網的介紹可以知道託管內存就是給RocksDB使用的,下面我們再看一下託管內存是如何分配給RocksDB的。cacheMemory = (1-(1/3)(writeBufferRatio)) managedMemory
bufferMemory = (2/3)(writeBufferRatio) managedMemory
讀寫緩存總內存 = bufferMemory + cacheMemory = (1 +(1/3)(writeBufferRatio)) managedMemory由上面的代碼可以看出,managed memory 是通過一定的比例給RocksDB的各個部分來分配內存的,writeBufferRatio會影響讀緩存和寫緩存的大小,理論上讀寫緩存總內存有可能會超過managedMemory的大小。通過上面的公式可以看出讀寫緩存總內存最多超出managedMemory的1/3,這裏很容易想到,那麼我們在排查overhead的時候配置大於managedMemory的1/3不就能你面內存溢出了,但是在實踐中,我們這樣配置並並沒有完全的解決物理內存溢出的問題,下面關於RocksDB內存的資料,終於找到了是還有哪部分內存容易溢出了,是因為部分區域的內存難以限制導致的。RocksDB 的內存佔用有 4 個部分:Block Cache: OS PageCache 之上的一層緩存,緩存未壓縮的數據 Block;Indexes and filter blocks: 索引及布隆過濾器,用於優化讀性能;MemTable: 類似寫緩存;Blocks pinned by Iterator: 觸發 RocksDB 遍歷操作(比如遍歷 RocksDBMapState 的所有 key)時,Iterator 在其生命週期內會阻止其引用到的 Block 和 MemTable 被釋放,導致額外的內存佔用。前三個區域的內存都是可配置的,但 Iterator 鎖定的資源則要取決於應用業務使用模式,且沒有提供一個硬限制,因此 Flink 在計算 RocksDB StateBackend 內存時沒有將這部分納入考慮,其次是 RocksDB Block Cache 的一個 bug,它會導致 Cache 大小無法嚴格控制,有可能短時間內超出設置的內存容量,相當於軟限制,原來是迭代器的內存限制的不好,導致的內存溢出。3.1.3 解決方案我們在使用Flink 的RocksDB狀態後端時,是通過managed memory來控制RocksDB各個部分的內存的,所以managed memory內存越小分配給各個部分的內存也就越小,迭代器內存越不容易溢出。到此我們對Flink的RocksDB狀態後端的內存有了一定的認知:當性能可以滿足的情況下,Flink的Manaed memory應該越小越好。但是上滿形成的經驗很難高效的在業務上落地,原因是“Flink的Manaed memory應該越小越好”很難去確定。於是我們聯想到了之前的JVMoverhead,在我們的實際實踐中過程中,我們是通過調大JVMoverhead,和jemalloc內存分配器來解決內存溢出問題的。在Flink1.12之後Flink on k8s的內存分配器已經默認改成了jemalloc,可以避免內存的分配過程中出現64M問題。但是要注意:由於我們的Java版本是JAVA8小版本是192,在最新版本的jemalloc5.3上出現了死鎖的問題,後來我們採用jemalloc4.5 如果!就沒有問題了。據瞭解業界有些公司使用的JAVA8小版本是256採用jemalloc5.3沒有遇到死鎖問題。