动态

详情 返回 返回

[源碼閲讀][vmselect] 從promql 到一條曲線,計算過程是怎麼樣的? - 动态 详情

作者:張富春(ahfuzhang),轉載時請註明作者和引用鏈接,謝謝!

  • cnblogs博客
  • zhihu
  • Github
  • 公眾號:一本正經的瞎扯

以最經典的計算 qps 的曲線為例,vmselect 內部是如何計算的?

1 grafana 通過 query_range 接口發起請求

通常會在 grafana 中配置一個 line chart,然後使用以下的 promql 表達式來計算每分鐘的請求量:

sum by (path) (increase(http_request_total{job="myApp"}[1m]))

grafana 會向所配置數據源的 vmselect 發送類似的請求:

POST /select/0/prometheus/api/v1/query_range HTTP/1.1
Host: xxx
Content-Type: application/x-www-form-urlencoded

start=${開始時間}&end=${結束時間}&step=15s&query=sum by (path) (increase(http_request_total{job="myApp"}[1m]))
  • queuy_range 的 API 格式請看:https://prometheus.io/docs/prometheus/latest/querying/api/#range-queries

2. vmselect 中的處理流程

2.1 函數調用過程:

TL;DR
可以直接跳到下一節看源碼分析

文件 函數 調用代碼 説明
app/vmselect/main.go func main() Main 函數
go httpserver.Serve(listenAddrs, requestHandler 啓動http 服務
func requestHandler http的callback 函數
return selectHandler(qt, startTime, w, r, p, at) 執行 http://vmselect:8481/select/ 這個路徑
func selectHandler 查詢的處理函數
prometheus.QueryRangeHandler(qt, startTime, at, w, r) promql 查詢的處理函數
/query_range 這條API 的處理代碼
app/vmselect/prometheus/
prometheus.go
func QueryRangeHandler
queryRangeHandler(qt, startTime, at, w, query 從http協議中取出參數,執行範圍查詢
func queryRangeHandler
result, err := promql.Exec(qt, ec, query, false) 組織好 promql.EvalConfig 對象
app/vmselect/promql/exec.go func Exec 執行查詢表達式的函數
e, err := parsePromQLWithCache(q) 解析查詢表達式
qid := activeQueriesV.Add(ec, q) 記錄當前正在查詢哪個表達式
rv, err := evalExpr(qt, ec, e) 執行解析後的表達式, metricsql.Expr對象
app/vmselect/promql/eval.go func evalExpr evalExpr會根據 promql的結構嵌套執行,直到葉子節點。
rv, err := evalExprInternal(qt, ec, e)
func evalExprInternal 逐個種類判斷,一共八個種類。 是哪種表達式,就執行對應的分支
rv, err := evalAggrFunc(qtChild, ec, ae) 執行聚合表達式。(選擇最常見的一種表達式來分析)
func evalAggrFunc
callbacks := getIncrementalAggrFuncCallbacks(ae.Name) 根據表達式中的聚合函數名,找到對應的執行代碼。 例如:函數 sum() 對應着一個 golang 的 func
fe, nrf := tryGetArgRollupFuncWithMetricExpr(ae) 如果 sum() 裏面還有類似 increase() 這樣的 rollup 函數,則執行這一步
args, re, err := evalRollupFuncArgs(qt, ec, fe) 先執行 rollup() 函數裏面的表達式
rf, err := nrf(args) 得到表達式的結果後,再執行 rollup() 函數
func evalRollupFuncArgs
ts, err := evalExpr(qt, ec, arg) 嵌套執行表達式,又回到函數 func evalExpr
內層一般都是 metrics 表達式 這裏開始展示執行到了葉子節點的情況。
rv, err := evalRollupFunc(qt, ec, "default_rollup", rollupDefault, e, re, nil)
func evalRollupFunc
return evalRollupFuncWithoutAt(qt, ec, funcName, rf, expr, re, iafc)
func evalRollupFuncWithoutAt
rvs, err = evalRollupFuncWithMetricExpr(qt, ecNew, funcName, rf, expr, me, iafc, re.Window)
func evalRollupFuncWithMetricExpr
tss, err := evalRollupFuncNoCache(qt, ec, funcName, rf, expr, me, iafc, window, pointsPerSeries)
func evalRollupFuncNoCache
tfss := searchutil.ToTagFilterss(me.LabelFilterss) 把 metrics 相關的表達式,變成標籤過濾的對象
sq = storage.NewMultiTenantSearchQuery(ts, minTimestamp, ec.End, tfss, ec.MaxSeries) 把 [][]storage.TagFilter 構造成 SearchQuery 對象
rss, isPartial, err := netstorage.ProcessSearchQuery(qt, ec.DenyPartialResponse, sq, ec.Deadline) 把請求發到 storage 節點,得到了 Results 對象
evalRollupNoIncrementalAggregate(qt, funcName, keepMetricNames, rss, rcs, preFunc, sharedTimestamps)
func evalRollupNoIncrementalAggregate
rss.RunParallel(qt, ... 當 vmstorage 返回 metricBlock 數據塊後,開始並行執行,做各種聚合運算。
app/vmselect/netstorage/
netstorage.go
func (rss *Results) RunParallel
rowsProcessedTotal, err := rss.runParallel(qt, f) 在與核數相匹配的協程中並行執行
func (rss *Results) runParallel
err = tsw.do(&tmpResult.rs, 0) 每個 time series的數據上調用 do 方法
func (tsw *timeseriesWork) do
err := tsw.pts.Unpack(r, rss.tbfs, rss.tr) 把 metricBlock 數據進行反序列化,變成與 data point 數量相等的 []timestamp 和 []values
func (pts *packedTimeseries) Unpack
dedupInterval := storage.GetDedupInterval() 查詢時,拉取全局的 dedup 間隔配置
mergeSortBlocks(dst, sbh, dedupInterval) 去重邏輯。去掉重複的 timestamp

2.2 查詢過程概述

  • 服務啓動流程
    • main() 中啓動了http服務
    • 提供 callback 函數來對應到 /select/0/prometheus/api/v1/xxx 下面的查詢
    • 最終請求觸發時,走到 QueryRangeHandler 中進行處理
  • 查詢過程
    • 通過 Exec() 函數來處理查詢
    • 使用 parsePromQL() 來解析 promql 表達式,把表達式變成 8 中基本語句的嵌套。8種語句包含:
      • metricsql.MetricExpr: metric 的過濾表達式,主要是 tag 層面的過濾
      • metricsql.RollupExpr: 可以理解為 increase(), rate() 這樣的區間聚合函數
      • metricsql.FuncExpr: 執行 metricsQL 內部提供的函數,例如 label_replace() 等
      • metricsql.AggrFuncExpr: 執行聚合函數,例如 max, sum, avg 等
      • metricsql.BinaryOpExpr: 執行布爾表達式的運算,主要有:and / or / unless
      • metricsql.NumberExpr: 數值常量表達式
      • metricsql.StringExpr: 把字符串看成一個獨立的時間序列
      • metricsql.DurationExpr: 產生新的 timestamp 的序列
    • 根據 promQL 表達式的結構,在不同的部分嵌套執行 evalExpr(), 直到執行完成整個表達式
  • 調用存儲後端:
    • 當 promql 的表達式是 MetricExpr 時,通過 vm-select 與 vm-storage 之間的二進制 rpc 協議來通訊
    • vmselect 向所有的 vmstorage 廣播
    • 發送到 vmstorage 上的請求都是要求返回 metricName + timestamp + value 這樣形式的請求
    • vmstorage 根據 tag 的過濾表達式,先在索引中找到符合條件的 tsid
    • 再根據 tsid 在數據去尋找每個 tsid 對應的 block,然後直接返回未解碼的 block 數據
    • vmselect 會收到來自 vmstorage 的 MetricBlock 結構
    • 最終收到的數據類似這樣的結構: map[metricName] -> block list
  • 數據反序列化及其去重
    • 當收到多個 vmstorage 節點返回的數據後,創建 N 個協程(N一般與CPU核數相等)來做反序列化、dedup 和 表達式計算
    • 每條 metricName 下的 block list 會逐個調用 unpack 來變成 []timestamp 和 []values 的數組
    • 所有的 block list 會建立成一個堆,按照 timestamp 來排序
    • 以 dedupInterval 為時間窗口,逐個檢查 timestamp。在 dedupInterval 時間窗口內的多條數據,只取一條
  • 表達式計算
    • 重新回到 evalExpr() 函數,做 sum 等表達式的計算
    • 全部計算完成後返回 json 數據給 grafana

3. 總結

  • vmselect / vmstorage 是存算分離的架構
    • vmstorage 內部組織了索引和數據兩部分
    • vmstorage 通過 tag 來在索引中過濾,找到後返回整個 block 數據
    • vmselect 把查詢廣播到所有的 vmstorage,得到 metricBlock 後進行 promql 表達式的計算
  • promQL 被解析為一顆樹,通過遞歸執行。
    • 表達式從樹梢執行到樹根

最後,為了便於分析 VictoriaMetrics 系列的源碼,我又專門建立了一個源碼倉庫來存放增加了註釋的源碼:
https://github.com/ahfuzhang/code_comments

Have Fun. 😃

user avatar ligaai 头像 shumile_5f6954c414184 头像 eolink 头像 tssc 头像 runyubingxue 头像 dalideshoushudao 头像 wnhyang 头像 aitibao_shichangyingxiao 头像 shouke 头像 fanjiapeng 头像 javalover 头像 yejianfeixue 头像
点赞 63 用户, 点赞了这篇动态!
点赞

Add a new 评论

Some HTML is okay.