Stories

Detail Return Return

KafkaMQ 日誌採集最佳實踐 - Stories Detail

概述

Kafka 是由 LinkedIn 開發、後由Apache軟件基金會維護的分佈式流處理平台,採用Scala和Java編寫。它本質是一個高吞吐、持久化的發佈-訂閲消息系統,專注於處理實時數據流(如用户行為日誌、點擊流等)。在收集日誌的場景中,Kafka 可以作為一個消息中間件,用於接收、存儲和轉發大量的日誌,鏈路,指標數據。

觀測雲

觀測雲是一款專為 IT 工程師打造的全鏈路可觀測產品,它集成了基礎設施監控、應用程序性能監控和日誌管理,為整個技術棧提供實時可觀察性。這款產品能夠幫助工程師全面瞭解端到端的用户體驗追蹤,瞭解應用內函數的每一次調用,以及全面監控雲時代的基礎設施。此外,觀測雲還具備快速發現系統安全風險的能力,為數字化時代提供安全保障。

本實踐主要是通過觀測雲消費 Kafka 隊列收集到的日誌數據,並將數據通過 Pipeline 進行字段提取和分類,便於用户對日誌數據進行可視化分析。

部署 Kafka

目前 DataKit 支持的 Kafka 版本有 [ version:0.8.2 ~ 3.2.0 ]。

下載 3.2.0 版本,解壓即可使用。

wget https://archive.apache.org/dist/kafka/3.2.0/kafka_2.13-3.2.0.tgz

1、啓動 Zookeeper 服務

$ bin/zookeeper-server-start.sh config/zookeeper.properties

2、啓動 KafkaServer

$ bin/kafka-server-start.sh config/server.properties

3、創建 Topic

創建名為 testlog 的 Topic 。

$ bin/kafka-topics.sh --create --topic testlog --bootstrap-server localhost:9092

4、啓動 Producer

$ bin/kafka-console-producer.sh --topic testlog --bootstrap-server localhost:9092

部署 DataKit

DataKit 是一個開源的、跨平台的數據收集和監控工具,由觀測雲開發並維護。它旨在幫助用户收集、處理和分析各種數據源,如日誌、指標和事件,以便進行有效的監控和故障排查。DataKit 支持多種數據輸入和輸出格式,可以輕鬆集成到現有的監控系統中。

登錄觀測雲控制枱,在「集成」 - 「DataKit」選擇對應安裝方式,當前採用 Linux 主機部署 DataKit。

圖片

開啓 KafkaMQ 採集器

進入 DataKit 安裝目錄下 (默認是 /usr/local/datakit/conf.d/ ) 的 conf.d/kafkamq 目錄,複製 kafkamq.conf.sample 並命名為 kafkamq.conf

類似如下:

-rwxr-xr-x 1 root root 2574 Apr 30 23:52 kafkamq.conf
-rwxr-xr-x 1 root root 2579 May  1 00:40 kafkamq.conf.sample

調整 kafkamq 採集器配置如下:

  • addrs = ["localhost:9092"],該文采集器 DataKit 和 Kafka 安裝到同一台操作系統中,localhost 即可。
  • kafka_version = "3.2.0",該文使用 Kafka 的版本。
  • [inputs.kafkamq.custom],刪除註釋符號“#”。
  • [inputs.kafkamq.custom.log_topic_map],刪除註釋符號“#”。
  • "testlog"="log.p",testlog 為 Topic 的名字,log.p 為觀測雲 Pipeline 可編程數據處理器的日誌字段提取規則配置。涉及的業務日誌和 log.p 的內容詳細見下面的《使用 Pipeline》。
  • 其他一些配置説明:

    • group_id = "datakit-group":消費者組名稱,相同組內消費者共享分區消費進度。不同消費者組可獨立消費同一主題。
    • assignor = "roundrobin":分區輪詢分配給消費者,​適合組內消費者訂閲相同主題列表​,實現負載均衡。

注意:開啓或調整 DataKit 的配置,需重啓採集器(shell 下執行 datakit service -R)。

[[inputs.kafkamq]]
  addrs = ["localhost:9092"]
  # your kafka version:0.8.2 ~ 3.2.0
  kafka_version = "3.2.0"
  group_id = "datakit-group"
  # consumer group partition assignment strategy (range, roundrobin, sticky)
  

  ## rate limit.
  #limit_sec = 100
  ## sample
  # sampling_rate = 1.0

  ## kafka tls config
  # tls_enable = true
  ## PLAINTEXT/SASL_SSL/SASL_PLAINTEXT
  # tls_security_protocol = "SASL_PLAINTEXT"
  ## PLAIN/SCRAM-SHA-256/SCRAM-SHA-512/OAUTHBEARER,default is PLAIN.
  # tls_sasl_mechanism = "PLAIN"
  # tls_sasl_plain_username = "user"
  # tls_sasl_plain_password = "pw"
  ## If tls_security_protocol is SASL_SSL, then ssl_cert must be configured.
  # ssl_cert = "/path/to/host.cert"

  ## -1:Offset Newest, -2:Offset Oldest
  offsets=-1

  ## skywalking custom
  #[inputs.kafkamq.skywalking]
  ## Required: send to datakit skywalking input.
  #  dk_endpoint="http://localhost:9529"
  #  thread = 8 
  #  topics = [
  #    "skywalking-metrics",
  #    "skywalking-profilings",
  #    "skywalking-segments",
  #    "skywalking-managements",
  #    "skywalking-meters",
  #    "skywalking-logging",
  #  ]
  #  namespace = ""

  ## Jaeger from kafka. Please make sure your Datakit Jaeger collector is open!
  #[inputs.kafkamq.jaeger]
  ## Required: ipv6 is "[::1]:9529"
  #  dk_endpoint="http://localhost:9529"
  #  thread = 8 
  #  source: agent,otel,others...
  #  source = "agent"
  #  # Required: topics
  #  topics=["jaeger-spans","jaeger-my-spans"]

  ## user custom message with PL script.
  [inputs.kafkamq.custom]
    #spilt_json_body = true
    #thread = 8
    #storage_index = "" # NOTE: only working on logging collection

    ## spilt_topic_map determines whether to enable log splitting for specific topic based on the values in the spilt_topic_map[topic].
    #[inputs.kafkamq.custom.spilt_topic_map]
    #  "log_topic"=true
    #  "log01"=false
    [inputs.kafkamq.custom.log_topic_map]
       "test_log"="log.p"
    #  "log01"="log_01.p"
    #[inputs.kafkamq.custom.metric_topic_map]
    #  "metric_topic"="metric.p"
    #  "metric01"="rum_apm.p"
    #[inputs.kafkamq.custom.rum_topic_map]
    #  "rum_topic"="rum_01.p"
    #  "rum_02"="rum_02.p"

  #[inputs.kafkamq.remote_handle]
    ## Required
    #endpoint="http://localhost:8080"
    ## Required topics
    #topics=["spans","my-spans"]
    # send_message_count = 100
    # debug = false
    # is_response_point = true
    # header_check = false

  ## Receive and consume OTEL data from kafka.
  #[inputs.kafkamq.otel]
    #dk_endpoint="http://localhost:9529"
    #trace_api="/otel/v1/traces"
    #metric_api="/otel/v1/metrics"
    #trace_topics=["trace1","trace2"]
    #metric_topics=["otel-metric","otel-metric1"]
    #thread = 8 

  ## todo: add other input-mq

編寫 Pipeline

log.p 規則內容:

data = load_json(message)
protocol = data["protocol"]
response_code = data["response_code"]
set_tag(protocol,protocol)set_tag(response_code,response_code)
group_between(response_code,[200,300],"info","status")
group_between(response_code,[400,499],"warning","status")
group_between(response_code,[500,599],"error","status")
time = data["start_time"]
set_tag(time,time)
default_time(time)

效果展示

發送業務日誌樣例

業務日誌樣例文件如下:

#info
{"protocol":"HTTP/1.1","upstream_local_address":"172.20.32.97:33878","response_flags":"-","istio_policy_status":null,"trace_id":"5532224c1013b9ad6da1efe88778dd64","authority":"server:1338","method":"PUT","response_code":204,"duration":83,"upstream_service_time":"83","user_agent":"Jakarta Commons-HttpClient/3.1","bytes_received":103,"downstream_local_address":"172.21.2.130:1338","start_time":"2024-05-01T00:37:11.230Z","upstream_transport_failure_reason":null,"requested_server_name":null,"bytes_sent":0,"route_name":"routes","x_forwarded_for":"10.0.0.69,10.23.0.31","upstream_cluster":"outbound|1338|svc.cluster.local","request_id":"80ac7d31-a598-4dc8-bb74-1850593f61e4","downstream_remote_address":"10.23.0.31:0","path":"/api/dimensions/items","upstream_host":"172.20.9.101:1338"}
#error
{"protocol":"HTTP/1.1","upstream_local_address":"172.20.32.97:33878","response_flags":"-","istio_policy_status":null,"trace_id":"5532224c1013b9ad6da1efe88778dd64","authority":"server:1338","method":"PUT","response_code":504,"duration":83,"upstream_service_time":"83","user_agent":"Jakarta Commons-HttpClient/3.1","bytes_received":103,"downstream_local_address":"172.21.2.130:1338","start_time":"2024-05-01T00:39:11.230Z","upstream_transport_failure_reason":null,"requested_server_name":null,"bytes_sent":0,"route_name":"routes","x_forwarded_for":"10.0.0.69,10.23.0.31","upstream_cluster":"outbound|1338|svc.cluster.local","request_id":"80ac7d31-a598-4dc8-bb74-1850593f61e4","downstream_remote_address":"10.23.0.31:0","path":"/api/dimensions/items","upstream_host":"172.20.9.101:1338"}
#warn
{"protocol":"HTTP/1.1","upstream_local_address":"172.20.32.97:33878","response_flags":"-","istio_policy_status":null,"trace_id":"5532224c1013b9ad6da1efe88778dd64","authority":"server:1338","method":"PUT","response_code":404,"duration":83,"upstream_service_time":"83","user_agent":"Jakarta Commons-HttpClient/3.1","bytes_received":103,"downstream_local_address":"172.21.2.130:1338","start_time":"2024-05-01T00:38:11.230Z","upstream_transport_failure_reason":null,"requested_server_name":null,"bytes_sent":0,"route_name":"routes","x_forwarded_for":"10.0.0.69,10.23.0.31","upstream_cluster":"outbound|1338|svc.cluster.local","request_id":"80ac7d31-a598-4dc8-bb74-1850593f61e4","downstream_remote_address":"10.23.0.31:0","path":"/api/dimensions/items","upstream_host":"172.20.9.101:1338"}

日誌發送命令

在 Producer 啓動後,分別發送如下三條日誌內容,三條日誌一條為 info 級別("response_code":204),另一條為 error 級別("response_code":504),最後一條為 warn 級別日誌("response_code":404)。

>{"protocol":"HTTP/1.1","upstream_local_address":"172.20.32.97:33878","response_flags":"-","istio_policy_status":null,"trace_id":"5532224c1013b9ad6da1efe88778dd64","authority":"server:1338","method":"PUT","response_code":204,"duration":83,"upstream_service_time":"83","user_agent":"Jakarta Commons-HttpClient/3.1","bytes_received":103,"downstream_local_address":"172.21.2.130:1338","start_time":"2024-04-30T08:47:11.230Z","upstream_transport_failure_reason":null,"requested_server_name":null,"bytes_sent":0,"route_name":"routes","x_forwarded_for":"10.0.0.69,10.23.0.31","upstream_cluster":"outbound|1338|svc.cluster.local","request_id":"80ac7d31-a598-4dc8-bb74-1850593f61e4","downstream_remote_address":"10.23.0.31:0","path":"/api/dimensions/items","upstream_host":"172.20.9.101:1338"}
>{"protocol":"HTTP/1.1","upstream_local_address":"172.20.32.97:33878","response_flags":"-","istio_policy_status":null,"trace_id":"5532224c1013b9ad6da1efe88778dd64","authority":"server:1338","method":"PUT","response_code":504,"duration":83,"upstream_service_time":"83","user_agent":"Jakarta Commons-HttpClient/3.1","bytes_received":103,"downstream_local_address":"172.21.2.130:1338","start_time":"2024-04-30T08:47:11.230Z","upstream_transport_failure_reason":null,"requested_server_name":null,"bytes_sent":0,"route_name":"routes","x_forwarded_for":"10.0.0.69,10.23.0.31","upstream_cluster":"outbound|1338|svc.cluster.local","request_id":"80ac7d31-a598-4dc8-bb74-1850593f61e4","downstream_remote_address":"10.23.0.31:0","path":"/api/dimensions/items","upstream_host":"172.20.9.101:1338"}
>{"protocol":"HTTP/1.1","upstream_local_address":"172.20.32.97:33878","response_flags":"-","istio_policy_status":null,"trace_id":"5532224c1013b9ad6da1efe88778dd64","authority":"server:1338","method":"PUT","response_code":404,"duration":83,"upstream_service_time":"83","user_agent":"Jakarta Commons-HttpClient/3.1","bytes_received":103,"downstream_local_address":"172.21.2.130:1338","start_time":"2024-04-30T08:47:11.230Z","upstream_transport_failure_reason":null,"requested_server_name":null,"bytes_sent":0,"route_name":"routes","x_forwarded_for":"10.0.0.69,10.23.0.31","upstream_cluster":"outbound|1338|svc.cluster.local","request_id":"80ac7d31-a598-4dc8-bb74-1850593f61e4","downstream_remote_address":"10.23.0.31:0","path":"/api/dimensions/items","upstream_host":"172.20.9.101:1338"}

效果

  • 通過 DataKit 採集到 Kafka 的三條業務日誌

圖片

  • 使用 Pipeline 對日誌進行字段提取的效果展示

下圖 protocol、response_code 以及 time 都是使用 Pipeline 提取後的效果。

圖片

結語

觀測雲通過集成 KafkaMQ ,實現了 Kafka 隊列日誌數據的高效採集和處理,並結合觀測雲的 Pipeline 功能,能夠實時採集業務日誌並進行字段提取和分類,便於後續分析和可視化;此外,DataKit 的 KafkaMQ 採集器可擴展應用於其他數據處理場景,如還支持鏈路(如開源 otel,skywalking,jaeger),指標,RUM 等數據的消費,這種集成方案提升了系統的可觀測性,同時反映了觀測雲平台的開放和包容性,加速了企業的數字化轉型。

user avatar automq Avatar
Favorites 1 users favorite the story!
Favorites

Add a new Comments

Some HTML is okay.