簡介
Apache Kafka 是一個分佈式流處理平台,主要用於構建實時數據流管道和應用程序。在收集業務日誌的場景中,Kafka 可以作為一個消息中間件,用於接收、存儲和轉發大量的日誌數據。將 Kafka 與其他系統(如 Elasticsearch、Flume、Spark Streaming 等)集成,以提供更豐富的日誌處理和分析功能。本文提到的是和觀測雲集成,即通過觀測雲的採集器 Datakit 採集 Kafka 中的業務日誌,下面通過一些例子瞭解下觀測雲的快速集成效果。
實踐環境
前置條件
- 註冊觀測雲
軟件和中間件
- Kafka3.2.0
- Datakit採集器
- JDK 8
硬件
- 雲服務器 CentOS7.9 64位 4vCPU,8GB 內存,100GB 雲盤一台。
接入方案
準備 Kafka 環境
安裝 Kafka
下載 3.2.0 版本,解壓即可使用。
wget https://archive.apache.org/dist/kafka/3.2.0/kafka_2.13-3.2.0.tgz
注:目前 Datakit 支持的 Kafka 版本有[version:0.8.2 ~ 3.2.0]
啓動 Zookeeper 服務
$ bin/zookeeper-server-start.sh config/zookeeper.properties
啓動 KafkaServer
$ bin/kafka-server-start.sh config/server.properties
創建 Topic
創建名為 testlog 的 Topic 。
$ bin/kafka-topics.sh --create --topic testlog --bootstrap-server localhost:9092
啓動 Producer
$ bin/kafka-console-producer.sh --topic testlog --bootstrap-server localhost:9092
安裝 DataKit
參考官網文檔安裝 DataKit 採集器
TOKEN 依據你的觀測雲工作空間來填寫
DK_DATAWAY=https://openway.guance.com?token=<TOKEN> bash -c "$(curl -L https://static.guance.com/datakit/install.sh)"
開啓 Kafka 採集器
進入 DataKit 安裝目錄下 (默認是 /usr/local/datakit/conf.d/ ) 的 conf.d/kafkamq 目錄,複製 kafkamq.conf.sample 並命名為 kafkamq.conf 。
類似如下:
-rwxr-xr-x 1 root root 2574 Apr 30 23:52 kafkamq.conf
-rwxr-xr-x 1 root root 2579 May 1 00:40 kafkamq.conf.sample
調製 kafka 採集器配置如下:
- addrs = ["localhost:9092"],該文采集器 DataKit 和 Kafka 安裝到同一台操作系統中,localhost 即可。
- kafka_version = "3.2.0",該文使用 Kafka 的版本。
- [inputs.kafkamq.custom],刪除註釋符號“#”。
- [inputs.kafkamq.custom.log_topic_map],刪除註釋符號“#”。
- "testlog"="log.p",testlog 為 Topic 的名字,log.p 為觀測雲 Pipeline 可編程數據處理器的日誌字段提取規則配置。涉及的業務日誌和 log.p 的內容詳細見下面的《使用 Pipeline》。
# {"version": "1.28.1", "desc": "do NOT edit this line"}
[[inputs.kafkamq]]
addrs = ["localhost:9092"]
# your kafka version:0.8.2 ~ 3.2.0
kafka_version = "3.2.0"
group_id = "datakit-group"
# consumer group partition assignment strategy (range, roundrobin, sticky)
assignor = "roundrobin"
## rate limit.
#limit_sec = 100
## sample
# sampling_rate = 1.0
## kafka tls config
# tls_enable = true
# tls_security_protocol = "SASL_PLAINTEXT"
# tls_sasl_mechanism = "PLAIN"
# tls_sasl_plain_username = "user"
# tls_sasl_plain_password = "pw"
## -1:Offset Newest, -2:Offset Oldest
offsets=-1
## skywalking custom
#[inputs.kafkamq.skywalking]
## Required!send to datakit skywalking input.
#dk_endpoint="http://localhost:9529"
#thread = 8
#topics = [
# "skywalking-metrics",
# "skywalking-profilings",
# "skywalking-segments",
# "skywalking-managements",
# "skywalking-meters",
# "skywalking-logging",
#]
#namespace = ""
## Jaeger from kafka. Please make sure your Datakit Jaeger collector is open !!!
#[inputs.kafkamq.jaeger]
## Required! ipv6 is "[::1]:9529"
#dk_endpoint="http://localhost:9529"
#thread = 8
#source: agent,otel,others...
#source = "agent"
## Required! topics
#topics=["jaeger-spans","jaeger-my-spans"]
## user custom message with PL script.
[inputs.kafkamq.custom]
#spilt_json_body = true
#thread = 8
## spilt_topic_map determines whether to enable log splitting for specific topic based on the values in the spilt_topic_map[topic].
#[inputs.kafkamq.custom.spilt_topic_map]
# "log_topic"=true
# "log01"=false
[inputs.kafkamq.custom.log_topic_map]
"testlog"="log.p"
# "log01"="log_01.p"
#[inputs.kafkamq.custom.metric_topic_map]
# "metric_topic"="metric.p"
# "metric01"="rum_apm.p"
#[inputs.kafkamq.custom.rum_topic_map]
# "rum_topic"="rum_01.p"
# "rum_02"="rum_02.p"
#[inputs.kafkamq.remote_handle]
## Required!
#endpoint="http://localhost:8080"
## Required! topics
#topics=["spans","my-spans"]
# send_message_count = 100
# debug = false
# is_response_point = true
# header_check = false
## Receive and consume OTEL data from kafka.
#[inputs.kafkamq.otel]
#dk_endpoint="http://localhost:9529"
#trace_api="/otel/v1/trace"
#metric_api="/otel/v1/metric"
#trace_topics=["trace1","trace2"]
#metric_topics=["otel-metric","otel-metric1"]
#thread = 8
## todo: add other input-mq
注意:開啓或調整 DataKit 的配置,需重啓採集器(shell 下執行 datakit service -R)。
使用 Pipeline
log.p 規則內容
data = load_json(message)
protocol = data["protocol"]
response_code = data["response_code"]
set_tag(protocol,protocol)
set_tag(response_code,response_code)
group_between(response_code,[200,300],"info","status")
group_between(response_code,[400,499],"warning","status")
group_between(response_code,[500,599],"error","status")
time = data["start_time"]
set_tag(time,time)
default_time(time)
效果展示
發送業務日誌樣例
業務日誌樣例文件如下:
#info
{"protocol":"HTTP/1.1","upstream_local_address":"172.20.32.97:33878","response_flags":"-","istio_policy_status":null,"trace_id":"5532224c1013b9ad6da1efe88778dd64","authority":"server:1338","method":"PUT","response_code":204,"duration":83,"upstream_service_time":"83","user_agent":"Jakarta Commons-HttpClient/3.1","bytes_received":103,"downstream_local_address":"172.21.2.130:1338","start_time":"2024-05-01T00:37:11.230Z","upstream_transport_failure_reason":null,"requested_server_name":null,"bytes_sent":0,"route_name":"routes","x_forwarded_for":"10.0.0.69,10.23.0.31","upstream_cluster":"outbound|1338|svc.cluster.local","request_id":"80ac7d31-a598-4dc8-bb74-1850593f61e4","downstream_remote_address":"10.23.0.31:0","path":"/api/dimensions/items","upstream_host":"172.20.9.101:1338"}
#error
{"protocol":"HTTP/1.1","upstream_local_address":"172.20.32.97:33878","response_flags":"-","istio_policy_status":null,"trace_id":"5532224c1013b9ad6da1efe88778dd64","authority":"server:1338","method":"PUT","response_code":504,"duration":83,"upstream_service_time":"83","user_agent":"Jakarta Commons-HttpClient/3.1","bytes_received":103,"downstream_local_address":"172.21.2.130:1338","start_time":"2024-05-01T00:39:11.230Z","upstream_transport_failure_reason":null,"requested_server_name":null,"bytes_sent":0,"route_name":"routes","x_forwarded_for":"10.0.0.69,10.23.0.31","upstream_cluster":"outbound|1338|svc.cluster.local","request_id":"80ac7d31-a598-4dc8-bb74-1850593f61e4","downstream_remote_address":"10.23.0.31:0","path":"/api/dimensions/items","upstream_host":"172.20.9.101:1338"}
#warn
{"protocol":"HTTP/1.1","upstream_local_address":"172.20.32.97:33878","response_flags":"-","istio_policy_status":null,"trace_id":"5532224c1013b9ad6da1efe88778dd64","authority":"server:1338","method":"PUT","response_code":404,"duration":83,"upstream_service_time":"83","user_agent":"Jakarta Commons-HttpClient/3.1","bytes_received":103,"downstream_local_address":"172.21.2.130:1338","start_time":"2024-05-01T00:38:11.230Z","upstream_transport_failure_reason":null,"requested_server_name":null,"bytes_sent":0,"route_name":"routes","x_forwarded_for":"10.0.0.69,10.23.0.31","upstream_cluster":"outbound|1338|svc.cluster.local","request_id":"80ac7d31-a598-4dc8-bb74-1850593f61e4","downstream_remote_address":"10.23.0.31:0","path":"/api/dimensions/items","upstream_host":"172.20.9.101:1338"}
日誌發送命令
在 Producer 啓動後,分別發送如下三條日誌內容,三條日誌一條為 info 級別("response_code":204),另一條為 error 級別("response_code":504),最後一條為 warn 級別日誌("response_code":404)。
>{"protocol":"HTTP/1.1","upstream_local_address":"172.20.32.97:33878","response_flags":"-","istio_policy_status":null,"trace_id":"5532224c1013b9ad6da1efe88778dd64","authority":"server:1338","method":"PUT","response_code":204,"duration":83,"upstream_service_time":"83","user_agent":"Jakarta Commons-HttpClient/3.1","bytes_received":103,"downstream_local_address":"172.21.2.130:1338","start_time":"2024-04-30T08:47:11.230Z","upstream_transport_failure_reason":null,"requested_server_name":null,"bytes_sent":0,"route_name":"routes","x_forwarded_for":"10.0.0.69,10.23.0.31","upstream_cluster":"outbound|1338|svc.cluster.local","request_id":"80ac7d31-a598-4dc8-bb74-1850593f61e4","downstream_remote_address":"10.23.0.31:0","path":"/api/dimensions/items","upstream_host":"172.20.9.101:1338"}
>
>
>
>
>{"protocol":"HTTP/1.1","upstream_local_address":"172.20.32.97:33878","response_flags":"-","istio_policy_status":null,"trace_id":"5532224c1013b9ad6da1efe88778dd64","authority":"server:1338","method":"PUT","response_code":504,"duration":83,"upstream_service_time":"83","user_agent":"Jakarta Commons-HttpClient/3.1","bytes_received":103,"downstream_local_address":"172.21.2.130:1338","start_time":"2024-04-30T08:47:11.230Z","upstream_transport_failure_reason":null,"requested_server_name":null,"bytes_sent":0,"route_name":"routes","x_forwarded_for":"10.0.0.69,10.23.0.31","upstream_cluster":"outbound|1338|svc.cluster.local","request_id":"80ac7d31-a598-4dc8-bb74-1850593f61e4","downstream_remote_address":"10.23.0.31:0","path":"/api/dimensions/items","upstream_host":"172.20.9.101:1338"}
>
>
>
>{"protocol":"HTTP/1.1","upstream_local_address":"172.20.32.97:33878","response_flags":"-","istio_policy_status":null,"trace_id":"5532224c1013b9ad6da1efe88778dd64","authority":"server:1338","method":"PUT","response_code":404,"duration":83,"upstream_service_time":"83","user_agent":"Jakarta Commons-HttpClient/3.1","bytes_received":103,"downstream_local_address":"172.21.2.130:1338","start_time":"2024-04-30T08:47:11.230Z","upstream_transport_failure_reason":null,"requested_server_name":null,"bytes_sent":0,"route_name":"routes","x_forwarded_for":"10.0.0.69,10.23.0.31","upstream_cluster":"outbound|1338|svc.cluster.local","request_id":"80ac7d31-a598-4dc8-bb74-1850593f61e4","downstream_remote_address":"10.23.0.31:0","path":"/api/dimensions/items","upstream_host":"172.20.9.101:1338"}
通過 DataKit 採集到 Kafka 的三條業務日誌
使用 Pipeline 對業務日誌進行字段提取
下圖 protocol、response_code 以及 time 都是使用 Pipeline 提取後的效果。