作者:張富春(ahfuzhang),轉載時請註明作者和引用鏈接,謝謝!
- cnblogs博客
- zhihu
- Github
- 公眾號:一本正經的瞎扯
![]()
以最經典的計算 qps 的曲線為例,vmselect 內部是如何計算的?
1 grafana 通過 query_range 接口發起請求
通常會在 grafana 中配置一個 line chart,然後使用以下的 promql 表達式來計算每分鐘的請求量:
sum by (path) (increase(http_request_total{job="myApp"}[1m]))
grafana 會向所配置數據源的 vmselect 發送類似的請求:
POST /select/0/prometheus/api/v1/query_range HTTP/1.1
Host: xxx
Content-Type: application/x-www-form-urlencoded
start=${開始時間}&end=${結束時間}&step=15s&query=sum by (path) (increase(http_request_total{job="myApp"}[1m]))
- queuy_range 的 API 格式請看:https://prometheus.io/docs/prometheus/latest/querying/api/#range-queries
2. vmselect 中的處理流程
2.1 函數調用過程:
TL;DR
可以直接跳到下一節看源碼分析
| 文件 | 函數 | 調用代碼 | 説明 |
|---|---|---|---|
| app/vmselect/main.go | func main() | Main 函數 | |
| go httpserver.Serve(listenAddrs, requestHandler | 啓動http 服務 | ||
| func requestHandler | http的callback 函數 | ||
| return selectHandler(qt, startTime, w, r, p, at) | 執行 http://vmselect:8481/select/ 這個路徑 | ||
| func selectHandler | 查詢的處理函數 | ||
| prometheus.QueryRangeHandler(qt, startTime, at, w, r) | promql 查詢的處理函數
/query_range 這條API 的處理代碼 |
||
| app/vmselect/prometheus/
prometheus.go |
func QueryRangeHandler | ||
| queryRangeHandler(qt, startTime, at, w, query | 從http協議中取出參數,執行範圍查詢 | ||
| func queryRangeHandler | |||
| result, err := promql.Exec(qt, ec, query, false) | 組織好 promql.EvalConfig 對象 | ||
| app/vmselect/promql/exec.go | func Exec | 執行查詢表達式的函數 | |
| e, err := parsePromQLWithCache(q) | 解析查詢表達式 | ||
| qid := activeQueriesV.Add(ec, q) | 記錄當前正在查詢哪個表達式 | ||
| rv, err := evalExpr(qt, ec, e) | 執行解析後的表達式, metricsql.Expr對象 | ||
| app/vmselect/promql/eval.go | func evalExpr | evalExpr會根據 promql的結構嵌套執行,直到葉子節點。 | |
| rv, err := evalExprInternal(qt, ec, e) | |||
| func evalExprInternal | 逐個種類判斷,一共八個種類。 是哪種表達式,就執行對應的分支 | ||
| rv, err := evalAggrFunc(qtChild, ec, ae) | 執行聚合表達式。(選擇最常見的一種表達式來分析) | ||
| func evalAggrFunc | |||
| callbacks := getIncrementalAggrFuncCallbacks(ae.Name) | 根據表達式中的聚合函數名,找到對應的執行代碼。 例如:函數 sum() 對應着一個 golang 的 func | ||
| fe, nrf := tryGetArgRollupFuncWithMetricExpr(ae) | 如果 sum() 裏面還有類似 increase() 這樣的 rollup 函數,則執行這一步 | ||
| args, re, err := evalRollupFuncArgs(qt, ec, fe) | 先執行 rollup() 函數裏面的表達式 | ||
| rf, err := nrf(args) | 得到表達式的結果後,再執行 rollup() 函數 | ||
| func evalRollupFuncArgs | |||
| ts, err := evalExpr(qt, ec, arg) | 嵌套執行表達式,又回到函數 func evalExpr | ||
| 內層一般都是 metrics 表達式 | 這裏開始展示執行到了葉子節點的情況。 | ||
| rv, err := evalRollupFunc(qt, ec, "default_rollup", rollupDefault, e, re, nil) | |||
| func evalRollupFunc | |||
| return evalRollupFuncWithoutAt(qt, ec, funcName, rf, expr, re, iafc) | |||
| func evalRollupFuncWithoutAt | |||
| rvs, err = evalRollupFuncWithMetricExpr(qt, ecNew, funcName, rf, expr, me, iafc, re.Window) | |||
| func evalRollupFuncWithMetricExpr | |||
| tss, err := evalRollupFuncNoCache(qt, ec, funcName, rf, expr, me, iafc, window, pointsPerSeries) | |||
| func evalRollupFuncNoCache | |||
| tfss := searchutil.ToTagFilterss(me.LabelFilterss) | 把 metrics 相關的表達式,變成標籤過濾的對象 | ||
| sq = storage.NewMultiTenantSearchQuery(ts, minTimestamp, ec.End, tfss, ec.MaxSeries) | 把 [][]storage.TagFilter 構造成 SearchQuery 對象 | ||
| rss, isPartial, err := netstorage.ProcessSearchQuery(qt, ec.DenyPartialResponse, sq, ec.Deadline) | 把請求發到 storage 節點,得到了 Results 對象 | ||
| evalRollupNoIncrementalAggregate(qt, funcName, keepMetricNames, rss, rcs, preFunc, sharedTimestamps) | |||
| func evalRollupNoIncrementalAggregate | |||
| rss.RunParallel(qt, ... | 當 vmstorage 返回 metricBlock 數據塊後,開始並行執行,做各種聚合運算。 | ||
| app/vmselect/netstorage/
netstorage.go |
func (rss *Results) RunParallel | ||
| rowsProcessedTotal, err := rss.runParallel(qt, f) | 在與核數相匹配的協程中並行執行 | ||
| func (rss *Results) runParallel | |||
| err = tsw.do(&tmpResult.rs, 0) | 每個 time series的數據上調用 do 方法 | ||
| func (tsw *timeseriesWork) do | |||
| err := tsw.pts.Unpack(r, rss.tbfs, rss.tr) | 把 metricBlock 數據進行反序列化,變成與 data point 數量相等的 []timestamp 和 []values | ||
| func (pts *packedTimeseries) Unpack | |||
| dedupInterval := storage.GetDedupInterval() | 查詢時,拉取全局的 dedup 間隔配置 | ||
| mergeSortBlocks(dst, sbh, dedupInterval) | 去重邏輯。去掉重複的 timestamp |
2.2 查詢過程概述
- 服務啓動流程
- main() 中啓動了http服務
- 提供 callback 函數來對應到 /select/0/prometheus/api/v1/xxx 下面的查詢
- 最終請求觸發時,走到
QueryRangeHandler中進行處理
- 查詢過程
- 通過
Exec()函數來處理查詢 - 使用
parsePromQL()來解析 promql 表達式,把表達式變成 8 中基本語句的嵌套。8種語句包含:metricsql.MetricExpr: metric 的過濾表達式,主要是 tag 層面的過濾metricsql.RollupExpr: 可以理解為 increase(), rate() 這樣的區間聚合函數metricsql.FuncExpr: 執行 metricsQL 內部提供的函數,例如 label_replace() 等metricsql.AggrFuncExpr: 執行聚合函數,例如 max, sum, avg 等metricsql.BinaryOpExpr: 執行布爾表達式的運算,主要有:and / or / unlessmetricsql.NumberExpr: 數值常量表達式metricsql.StringExpr: 把字符串看成一個獨立的時間序列metricsql.DurationExpr: 產生新的 timestamp 的序列
- 根據 promQL 表達式的結構,在不同的部分嵌套執行
evalExpr(), 直到執行完成整個表達式
- 通過
- 調用存儲後端:
- 當 promql 的表達式是 MetricExpr 時,通過 vm-select 與 vm-storage 之間的二進制 rpc 協議來通訊
- vmselect 向所有的 vmstorage 廣播
- 發送到 vmstorage 上的請求都是要求返回 metricName + timestamp + value 這樣形式的請求
- vmstorage 根據 tag 的過濾表達式,先在索引中找到符合條件的 tsid
- 再根據 tsid 在數據去尋找每個 tsid 對應的 block,然後直接返回未解碼的 block 數據
- vmselect 會收到來自 vmstorage 的 MetricBlock 結構
- 最終收到的數據類似這樣的結構: map[metricName] -> block list
- 數據反序列化及其去重
- 當收到多個 vmstorage 節點返回的數據後,創建 N 個協程(N一般與CPU核數相等)來做反序列化、dedup 和 表達式計算
- 每條 metricName 下的 block list 會逐個調用 unpack 來變成 []timestamp 和 []values 的數組
- 所有的 block list 會建立成一個堆,按照 timestamp 來排序
- 以 dedupInterval 為時間窗口,逐個檢查 timestamp。在 dedupInterval 時間窗口內的多條數據,只取一條
- 表達式計算
- 重新回到
evalExpr()函數,做 sum 等表達式的計算 - 全部計算完成後返回 json 數據給 grafana
- 重新回到
3. 總結
- vmselect / vmstorage 是存算分離的架構
- vmstorage 內部組織了索引和數據兩部分
- vmstorage 通過 tag 來在索引中過濾,找到後返回整個 block 數據
- vmselect 把查詢廣播到所有的 vmstorage,得到 metricBlock 後進行 promql 表達式的計算
- promQL 被解析為一顆樹,通過遞歸執行。
- 表達式從樹梢執行到樹根
最後,為了便於分析 VictoriaMetrics 系列的源碼,我又專門建立了一個源碼倉庫來存放增加了註釋的源碼:
https://github.com/ahfuzhang/code_comments
Have Fun. 😃
