文章目錄
- 統一日誌與鏈路追蹤 Sleuth + Zipkin 實踐
- 目錄
- 一、分佈式追蹤核心概念
- 分佈式追蹤的基本概念
- 追蹤數據模型
- 二、Sleuth 自動埋點機制
- ️ Sleuth 自動配置架構
- HTTP 請求自動追蹤
- 三、Trace 上下文傳播原理
- 上下文傳播機制
- Feign 客户端集成
- 四、Zipkin 架構與數據流
- ️ Zipkin 系統架構
- Zipkin 集成配置
- Zipkin 報告器實現
- ⚡ 五、消息中間件集成
- Kafka 消息追蹤
- 六、SkyWalking vs Jaeger 對比
- 功能對比分析
- 遷移到 SkyWalking
- 七、生產環境最佳實踐
- 生產級配置
- 性能優化建議
- 一、分佈式追蹤核心概念
- 二、Sleuth 自動埋點機制
- 三、Trace 上下文傳播原理
- 四、Zipkin 架構與數據流
- ⚡ 五、消息中間件集成
- 六、SkyWalking vs Jaeger 對比
- 七、生產環境最佳實踐
調用鏈追蹤核心元素:
Span 數據結構定義:
/**
* Span 數據模型
* 表示分佈式系統中的單個工作單元
*/
@Data
@Builder
@AllArgsConstructor
public class Span {
// 標識信息
private String traceId; // 追蹤ID - 全局唯一
private String spanId; // Span ID - 當前單元標識
private String parentSpanId; // 父Span ID - 用於構建調用樹
private String name; // Span名稱 - 操作描述
// 時間信息
private long timestamp; // 開始時間戳
private long duration; // 持續時間(微秒)
// 上下文信息
private Kind kind; // 類型:CLIENT, SERVER, PRODUCER, CONSUMER
private boolean shared; // 是否共享
private boolean debug; // 是否調試模式
// 端點信息
private Endpoint localEndpoint; // 本地服務端點
private Endpoint remoteEndpoint; // 遠程服務端點
// 註解和標籤
private List<Annotation> annotations; // 時間點註解
private Map<String, String> tags; // 業務標籤
// 狀態信息
private boolean error; // 是否發生錯誤
private String errorMessage; // 錯誤信息
/**
* Span 類型枚舉
*/
public enum Kind {
CLIENT, // 客户端調用
SERVER, // 服務端處理
PRODUCER, // 消息生產者
CONSUMER // 消息消費者
}
/**
* 服務端點信息
*/
@Data
@AllArgsConstructor
public static class Endpoint {
private String serviceName; // 服務名稱
private String ip; // IP地址
private int port; // 端口號
}
/**
* 時間點註解
*/
@Data
@AllArgsConstructor
public static class Annotation {
private long timestamp; // 時間戳
private String value; // 註解值
}
}
️ Sleuth 自動配置架構
Sleuth 自動埋點組件:
/**
* Sleuth 自動配置核心組件
* 負責自動注入追蹤上下文
*/
@Configuration
@EnableAspectJAutoProxy
@Slf4j
public class SleuthAutoConfiguration {
@Bean
@ConditionalOnMissingBean
public Tracer tracer(Sampler sampler, TraceContext traceContext) {
return new DefaultTracer(sampler, traceContext);
}
@Bean
@ConditionalOnMissingBean
public Sampler sampler() {
return Sampler.ALWAYS_SAMPLE; // 默認全量採樣
}
@Bean
@ConditionalOnMissingBean
public TraceContext traceContext() {
return new DefaultTraceContext();
}
/**
* HTTP 請求攔截器 - 自動注入追蹤頭
*/
@Bean
public TracingFilter tracingFilter(Tracer tracer) {
return new TracingFilter(tracer);
}
/**
* 異步任務追蹤支持
*/
@Bean
@ConditionalOnMissingBean
public TracingAsyncTaskExecutor tracingAsyncTaskExecutor(Tracer tracer) {
return new TracingAsyncTaskExecutor(tracer);
}
/**
* REST Template 攔截器
*/
@Bean
public RestTemplateCustomizer restTemplateCustomizer(Tracer tracer) {
return restTemplate -> {
List<ClientHttpRequestInterceptor> interceptors =
new ArrayList<>(restTemplate.getInterceptors());
interceptors.add(new TracingClientHttpRequestInterceptor(tracer));
restTemplate.setInterceptors(interceptors);
};
}
}
/**
* 默認追蹤器實現
*/
@Component
@Slf4j
public class DefaultTracer implements Tracer {
private final Sampler sampler;
private final TraceContext traceContext;
private final Random random = new Random();
public DefaultTracer(Sampler sampler, TraceContext traceContext) {
this.sampler = sampler;
this.traceContext = traceContext;
}
@Override
public Span nextSpan() {
// 從當前上下文獲取或創建新的Span
Span currentSpan = traceContext.getCurrentSpan();
if (currentSpan != null) {
return createChildSpan(currentSpan);
} else {
return createRootSpan();
}
}
@Override
public Span nextSpan(Span parent) {
if (parent == null) {
return createRootSpan();
}
return createChildSpan(parent);
}
/**
* 創建根Span
*/
private Span createRootSpan() {
if (!sampler.isSampled()) {
return Span.builder().sampled(false).build();
}
String traceId = generateTraceId();
String spanId = generateSpanId();
return Span.builder()
.traceId(traceId)
.spanId(spanId)
.parentSpanId(null)
.name("root")
.timestamp(System.currentTimeMillis())
.kind(Span.Kind.SERVER)
.sampled(true)
.build();
}
/**
* 創建子Span
*/
private Span createChildSpan(Span parent) {
if (!parent.isSampled()) {
return Span.builder().sampled(false).build();
}
String spanId = generateSpanId();
return Span.builder()
.traceId(parent.getTraceId())
.spanId(spanId)
.parentSpanId(parent.getSpanId())
.name("child")
.timestamp(System.currentTimeMillis())
.kind(Span.Kind.CLIENT)
.sampled(true)
.localEndpoint(parent.getLocalEndpoint())
.build();
}
/**
* 生成Trace ID(128位)
*/
private String generateTraceId() {
byte[] bytes = new byte[16];
random.nextBytes(bytes);
return Hex.encodeHexString(bytes);
}
/**
* 生成Span ID(64位)
*/
private String generateSpanId() {
byte[] bytes = new byte[8];
random.nextBytes(bytes);
return Hex.encodeHexString(bytes);
}
@Override
public void close(Span span) {
if (span != null && span.isSampled()) {
span.setDuration(System.currentTimeMillis() - span.getTimestamp());
log.debug("Span關閉: traceId={}, spanId={}, duration={}ms",
span.getTraceId(), span.getSpanId(), span.getDuration());
// 報告Span到收集器
reportSpan(span);
}
}
}
HTTP 攔截器實現:
/**
* HTTP 請求追蹤過濾器
* 自動注入和傳播追蹤頭
*/
@Component
@Slf4j
public class TracingFilter implements Filter {
private final Tracer tracer;
private static final String TRACE_HEADER = "X-B3-TraceId";
private static final String SPAN_HEADER = "X-B3-SpanId";
private static final String PARENT_HEADER = "X-B3-ParentSpanId";
private static final String SAMPLED_HEADER = "X-B3-Sampled";
public TracingFilter(Tracer tracer) {
this.tracer = tracer;
}
@Override
public void doFilter(ServletRequest request, ServletResponse response,
FilterChain chain) throws IOException, ServletException {
HttpServletRequest httpRequest = (HttpServletRequest) request;
HttpServletResponse httpResponse = (HttpServletResponse) response;
// 1. 從請求頭提取或創建Span
Span span = extractOrCreateSpan(httpRequest);
try {
// 2. 將Span放入上下文
tracer.getTraceContext().setCurrentSpan(span);
// 3. 添加追蹤頭到響應
addTracingHeaders(httpResponse, span);
// 4. 記錄服務器接收事件
logServerReceived(span, httpRequest);
// 5. 繼續處理請求
chain.doFilter(request, response);
// 6. 記錄服務器發送事件
logServerSent(span, httpResponse);
} catch (Exception e) {
// 7. 記錄錯誤信息
span.setError(true);
span.setErrorMessage(e.getMessage());
span.tag("error", "true");
throw e;
} finally {
// 8. 關閉Span
tracer.close(span);
// 9. 清理上下文
tracer.getTraceContext().clear();
}
}
/**
* 從請求頭提取或創建新的Span
*/
private Span extractOrCreateSpan(HttpServletRequest request) {
String traceId = request.getHeader(TRACE_HEADER);
String spanId = request.getHeader(SPAN_HEADER);
String parentSpanId = request.getHeader(PARENT_HEADER);
String sampled = request.getHeader(SAMPLED_HEADER);
if (traceId != null && spanId != null) {
// 從頭部信息構建Span
return Span.builder()
.traceId(traceId)
.spanId(spanId)
.parentSpanId(parentSpanId)
.sampled("1".equals(sampled))
.kind(Span.Kind.SERVER)
.timestamp(System.currentTimeMillis())
.name(request.getMethod() + " " + request.getRequestURI())
.localEndpoint(buildEndpoint(request))
.build();
} else {
// 創建新的根Span
return tracer.nextSpan()
.name(request.getMethod() + " " + request.getRequestURI())
.kind(Span.Kind.SERVER)
.localEndpoint(buildEndpoint(request));
}
}
/**
* 添加追蹤頭到響應
*/
private void addTracingHeaders(HttpServletResponse response, Span span) {
if (span.isSampled()) {
response.setHeader(TRACE_HEADER, span.getTraceId());
response.setHeader(SPAN_HEADER, span.getSpanId());
response.setHeader(SAMPLED_HEADER, "1");
}
}
/**
* 記錄服務器接收事件
*/
private void logServerReceived(Span span, HttpServletRequest request) {
if (span.isSampled()) {
span.annotation(new Annotation(System.currentTimeMillis(), "sr"));
// 添加HTTP相關標籤
span.tag("http.method", request.getMethod());
span.tag("http.path", request.getRequestURI());
span.tag("http.host", request.getServerName());
span.tag("http.user_agent", request.getHeader("User-Agent"));
log.debug("服務器接收請求: {} {}, traceId: {}",
request.getMethod(), request.getRequestURI(), span.getTraceId());
}
}
/**
* 記錄服務器發送事件
*/
private void logServerSent(Span span, HttpServletResponse response) {
if (span.isSampled()) {
span.annotation(new Annotation(System.currentTimeMillis(), "ss"));
span.tag("http.status_code", String.valueOf(response.getStatus()));
log.debug("服務器發送響應: status={}, traceId={}",
response.getStatus(), span.getTraceId());
}
}
}
Trace 上下文跨服務傳播:
/**
* 追蹤上下文管理器
* 負責跨線程、跨服務傳播追蹤上下文
*/
@Component
@Slf4j
public class TraceContext {
private final ThreadLocal<Span> currentSpan = new ThreadLocal<>();
private final ThreadLocal<Map<String, String>> extraContext = new ThreadLocal<>();
/**
* 獲取當前Span
*/
public Span getCurrentSpan() {
return currentSpan.get();
}
/**
* 設置當前Span
*/
public void setCurrentSpan(Span span) {
currentSpan.set(span);
if (span != null) {
log.debug("設置當前Span: traceId={}, spanId={}",
span.getTraceId(), span.getSpanId());
}
}
/**
* 清除上下文
*/
public void clear() {
currentSpan.remove();
if (extraContext.get() != null) {
extraContext.get().clear();
}
extraContext.remove();
}
/**
* 注入追蹤頭到HTTP請求
*/
public void inject(HttpHeaders headers) {
Span span = getCurrentSpan();
if (span != null && span.isSampled()) {
headers.set("X-B3-TraceId", span.getTraceId());
headers.set("X-B3-SpanId", span.getSpanId());
headers.set("X-B3-ParentSpanId", span.getParentSpanId());
headers.set("X-B3-Sampled", "1");
// 注入自定義上下文
injectCustomContext(headers);
}
}
/**
* 從HTTP請求頭提取上下文
*/
public Span extract(HttpHeaders headers) {
String traceId = headers.getFirst("X-B3-TraceId");
String spanId = headers.getFirst("X-B3-SpanId");
String parentSpanId = headers.getFirst("X-B3-ParentSpanId");
String sampled = headers.getFirst("X-B3-Sampled");
if (traceId != null && spanId != null) {
return Span.builder()
.traceId(traceId)
.spanId(spanId)
.parentSpanId(parentSpanId)
.sampled("1".equals(sampled))
.build();
}
return null;
}
}
/**
* 跨線程上下文傳播
*/
@Component
@Slf4j
public class TraceContextExecutor {
private final TraceContext traceContext;
/**
* 包裝Runnable以傳播追蹤上下文
*/
public Runnable wrap(Runnable task) {
Span currentSpan = traceContext.getCurrentSpan();
Map<String, String> currentContext = copyExtraContext();
return () -> {
try {
// 在新的線程中恢復上下文
traceContext.setCurrentSpan(currentSpan);
if (currentContext != null) {
traceContext.getExtraContext().putAll(currentContext);
}
task.run();
} finally {
traceContext.clear();
}
};
}
/**
* 包裝Callable以傳播追蹤上下文
*/
public <T> Callable<T> wrap(Callable<T> task) {
Span currentSpan = traceContext.getCurrentSpan();
Map<String, String> currentContext = copyExtraContext();
return () -> {
try {
// 在新的線程中恢復上下文
traceContext.setCurrentSpan(currentSpan);
if (currentContext != null) {
traceContext.getExtraContext().putAll(currentContext);
}
return task.call();
} finally {
traceContext.clear();
}
};
}
/**
* 異步任務執行器
*/
@Component
public class TracingAsyncTaskExecutor implements AsyncTaskExecutor {
private final AsyncTaskExecutor delegate;
private final TraceContextExecutor contextExecutor;
@Override
public void execute(Runnable task, long startTimeout) {
delegate.execute(contextExecutor.wrap(task), startTimeout);
}
@Override
public Future<?> submit(Runnable task) {
return delegate.submit(contextExecutor.wrap(task));
}
@Override
public <T> Future<T> submit(Callable<T> task) {
return delegate.submit(contextExecutor.wrap(task));
}
}
}
Feign 客户端追蹤集成:
/**
* Feign 客户端追蹤攔截器
*/
@Component
@Slf4j
public class TracingFeignInterceptor implements RequestInterceptor {
private final Tracer tracer;
private final TraceContext traceContext;
@Override
public void apply(RequestTemplate template) {
Span currentSpan = traceContext.getCurrentSpan();
if (currentSpan != null && currentSpan.isSampled()) {
// 創建客户端Span
Span clientSpan = tracer.nextSpan(currentSpan)
.name("feign:" + template.method() + " " + template.url())
.kind(Span.Kind.CLIENT)
.timestamp(System.currentTimeMillis());
// 記錄客户端發送事件
clientSpan.annotation(new Annotation(System.currentTimeMillis(), "cs"));
// 注入追蹤頭
template.header("X-B3-TraceId", clientSpan.getTraceId());
template.header("X-B3-SpanId", clientSpan.getSpanId());
template.header("X-B3-ParentSpanId", currentSpan.getSpanId());
template.header("X-B3-Sampled", "1");
// 添加業務標籤
clientSpan.tag("http.method", template.method());
clientSpan.tag("http.url", template.url());
clientSpan.tag("component", "feign");
// 將客户端Span保存到請求屬性中
template.attribute("clientSpan", clientSpan);
log.debug("Feign請求追蹤: {} {}, traceId: {}",
template.method(), template.url(), clientSpan.getTraceId());
}
}
/**
* Feign 響應處理
*/
@Component
public class TracingFeignLogger extends feign.Logger {
@Override
protected void log(String configKey, String format, Object... args) {
if (log.isDebugEnabled()) {
log.debug(format, args);
}
}
@Override
protected void logRequest(String configKey, Level logLevel, Request request) {
// 記錄請求日誌
if (log.isDebugEnabled()) {
super.logRequest(configKey, logLevel, request);
}
}
@Override
protected Response logAndRebufferResponse(String configKey, Level logLevel,
Response response, long elapsedTime) throws IOException {
// 記錄響應並處理Span
Request request = response.request();
Span clientSpan = (Span) request.requestTemplate().attribute("clientSpan");
if (clientSpan != null) {
// 記錄客户端接收事件
clientSpan.annotation(new Annotation(System.currentTimeMillis(), "cr"));
clientSpan.tag("http.status_code", String.valueOf(response.status()));
clientSpan.setDuration(elapsedTime * 1000); // 轉換為微秒
// 關閉客户端Span
tracer.close(clientSpan);
}
return super.logAndRebufferResponse(configKey, logLevel, response, elapsedTime);
}
}
}
️ Zipkin 系統架構
Zipkin 數據流架構:
Spring Cloud Sleuth Zipkin 配置:
# application.yml Zipkin配置
spring:
zipkin:
# Zipkin服務器地址
base-url: http://zipkin-server:9411
# 啓用Zipkin報告
enabled: true
# 服務名稱
service:
name: user-service
# 定位信息
location:
# 自動發現(通過服務發現)
discovery:
enabled: true
# 或者直接指定
host: localhost
port: 9411
# 發送配置
sender:
type: web # 支持web, kafka, rabbit
# 壓縮配置
compression:
enabled: true
# 連接配置
connect-timeout: 5000
read-timeout: 10000
sleuth:
# 採樣率配置
sampler:
probability: 1.0 # 1.0表示100%採樣
# HTTP請求追蹤
web:
client:
enabled: true
# 跳過某些路徑
skip-pattern: /health,/info
# 異步追蹤
async:
enabled: true
# 消息追蹤
messaging:
enabled: true
# 調度任務追蹤
schedule:
enabled: true
# Redis追蹤
redis:
enabled: true
# 數據庫追蹤
jdbc:
enabled: true
# Zipkin客户端高級配置
management:
endpoints:
web:
exposure:
include: health,info,metrics,zipkin
endpoint:
zipkin:
enabled: true
metrics:
export:
zipkin:
enabled: true
# 日誌配置(顯示TraceId)
logging:
pattern:
level: "%5p [${spring.zipkin.service.name:},%X{traceId:-},%X{spanId:-}]"
Zipkin Span 報告器:
/**
* Zipkin Span 報告器
* 負責將Span數據發送到Zipkin服務器
*/
@Component
@Slf4j
public class ZipkinSpanReporter implements SpanReporter {
private final ZipkinRestTemplateSender sender;
private final ObjectMapper objectMapper;
private final MeterRegistry meterRegistry;
// 指標監控
private final Counter spansSentCounter;
private final Counter spansDroppedCounter;
private final Timer sendTimer;
public ZipkinSpanReporter(ZipkinRestTemplateSender sender,
ObjectMapper objectMapper,
MeterRegistry meterRegistry) {
this.sender = sender;
this.objectMapper = objectMapper;
this.meterRegistry = meterRegistry;
// 初始化指標
this.spansSentCounter = meterRegistry.counter("zipkin.spans.sent");
this.spansDroppedCounter = meterRegistry.counter("zipkin.spans.dropped");
this.sendTimer = meterRegistry.timer("zipkin.send.duration");
}
@Override
public void report(Span span) {
if (!span.isSampled()) {
spansDroppedCounter.increment();
return;
}
try {
// 轉換Span為Zipkin格式
zipkin2.Span zipkinSpan = convertToZipkinSpan(span);
// 發送Span到Zipkin
sendTimer.record(() -> {
try {
sender.sendSpans(Collections.singletonList(zipkinSpan));
spansSentCounter.increment();
if (log.isDebugEnabled()) {
log.debug("Span發送成功: traceId={}, spanId={}",
span.getTraceId(), span.getSpanId());
}
} catch (Exception e) {
log.error("Span發送失敗", e);
spansDroppedCounter.increment();
}
});
} catch (Exception e) {
log.error("Span轉換失敗", e);
spansDroppedCounter.increment();
}
}
/**
* 轉換內部Span為Zipkin格式
*/
private zipkin2.Span convertToZipkinSpan(Span span) {
zipkin2.Span.Builder builder = zipkin2.Span.newBuilder()
.traceId(span.getTraceId())
.id(span.getSpanId())
.name(span.getName())
.timestamp(span.getTimestamp() * 1000) // 轉換為微秒
.duration(span.getDuration() * 1000); // 轉換為微秒
// 設置父Span
if (span.getParentSpanId() != null) {
builder.parentId(span.getParentSpanId());
}
// 設置本地端點
if (span.getLocalEndpoint() != null) {
builder.localEndpoint(zipkin2.Endpoint.newBuilder()
.serviceName(span.getLocalEndpoint().getServiceName())
.ip(span.getLocalEndpoint().getIp())
.port(span.getLocalEndpoint().getPort())
.build());
}
// 設置遠程端點
if (span.getRemoteEndpoint() != null) {
builder.remoteEndpoint(zipkin2.Endpoint.newBuilder()
.serviceName(span.getRemoteEndpoint().getServiceName())
.ip(span.getRemoteEndpoint().getIp())
.port(span.getRemoteEndpoint().getPort())
.build());
}
// 添加註解
if (span.getAnnotations() != null) {
for (Annotation annotation : span.getAnnotations()) {
builder.addAnnotation(annotation.getTimestamp() * 1000, annotation.getValue());
}
}
// 添加標籤
if (span.getTags() != null) {
for (Map.Entry<String, String> tag : span.getTags().entrySet()) {
builder.putTag(tag.getKey(), tag.getValue());
}
}
// 設置Kind
if (span.getKind() != null) {
switch (span.getKind()) {
case CLIENT:
builder.kind(zipkin2.Span.Kind.CLIENT);
break;
case SERVER:
builder.kind(zipkin2.Span.Kind.SERVER);
break;
case PRODUCER:
builder.kind(zipkin2.Span.Kind.PRODUCER);
break;
case CONSUMER:
builder.kind(zipkin2.Span.Kind.CONSUMER);
break;
}
}
// 設置共享標誌
if (span.isShared()) {
builder.shared(true);
}
// 設置調試標誌
if (span.isDebug()) {
builder.debug(true);
}
return builder.build();
}
}
/**
* Zipkin REST API 發送器
*/
@Component
@Slf4j
public class ZipkinRestTemplateSender {
private final RestTemplate restTemplate;
private final String zipkinBaseUrl;
private final ObjectMapper objectMapper;
public ZipkinRestTemplateSender(RestTemplate restTemplate,
@Value("${spring.zipkin.base-url}") String zipkinBaseUrl,
ObjectMapper objectMapper) {
this.restTemplate = restTemplate;
this.zipkinBaseUrl = zipkinBaseUrl;
this.objectMapper = objectMapper;
}
/**
* 批量發送Span到Zipkin
*/
public void sendSpans(List<zipkin2.Span> spans) {
if (spans.isEmpty()) {
return;
}
try {
// 序列化Span列表
String jsonSpans = objectMapper.writeValueAsString(spans);
// 構建請求
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
headers.set("Content-Encoding", "gzip");
HttpEntity<byte[]> request = new HttpEntity<>(
gzipCompress(jsonSpans), headers);
// 發送到Zipkin
ResponseEntity<String> response = restTemplate.postForEntity(
zipkinBaseUrl + "/api/v2/spans", request, String.class);
if (!response.getStatusCode().is2xxSuccessful()) {
log.warn("Zipkin響應異常: {}", response.getStatusCode());
}
} catch (Exception e) {
throw new RuntimeException("發送Span到Zipkin失敗", e);
}
}
/**
* GZIP壓縮
*/
private byte[] gzipCompress(String data) throws IOException {
ByteArrayOutputStream bos = new ByteArrayOutputStream(data.length());
GZIPOutputStream gzip = new GZIPOutputStream(bos);
gzip.write(data.getBytes(StandardCharsets.UTF_8));
gzip.close();
return bos.toByteArray();
}
}
⚡ 五、消息中間件集成
Kafka 消息追蹤集成:
/**
* Kafka 消息追蹤支持
* 自動注入和提取追蹤上下文
*/
@Configuration
@Slf4j
public class KafkaTracingConfiguration {
@Bean
public TracingProducerFactory<String, String> tracingProducerFactory(
ProducerFactory<String, String> producerFactory,
Tracer tracer) {
return new TracingProducerFactory<>(producerFactory, tracer);
}
@Bean
public TracingConsumerFactory<String, String> tracingConsumerFactory(
ConsumerFactory<String, String> consumerFactory,
Tracer tracer) {
return new TracingConsumerFactory<>(consumerFactory, tracer);
}
}
/**
* 追蹤生產者工廠
*/
@Component
@Slf4j
public class TracingProducerFactory<K, V> implements ProducerFactory<K, V> {
private final ProducerFactory<K, V> delegate;
private final Tracer tracer;
private static final String TRACE_HEADER = "traceContext";
@Override
public Producer<K, V> createProducer() {
return new TracingProducer<>(delegate.createProducer(), tracer);
}
/**
* 追蹤生產者包裝器
*/
private static class TracingProducer<K, V> implements Producer<K, V> {
private final Producer<K, V> delegate;
private final Tracer tracer;
public TracingProducer(Producer<K, V> delegate, Tracer tracer) {
this.delegate = delegate;
this.tracer = tracer;
}
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
// 注入追蹤上下文到消息頭
ProducerRecord<K, V> tracedRecord = injectTraceContext(record);
return delegate.send(tracedRecord);
}
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
ProducerRecord<K, V> tracedRecord = injectTraceContext(record);
return delegate.send(tracedRecord, callback);
}
private ProducerRecord<K, V> injectTraceContext(ProducerRecord<K, V> record) {
Span currentSpan = tracer.getTraceContext().getCurrentSpan();
if (currentSpan != null && currentSpan.isSampled()) {
// 創建生產者Span
Span producerSpan = tracer.nextSpan(currentSpan)
.name("kafka:produce:" + record.topic())
.kind(Span.Kind.PRODUCER)
.timestamp(System.currentTimeMillis());
// 記錄生產者發送事件
producerSpan.annotation(new Annotation(System.currentTimeMillis(), "ms"));
producerSpan.tag("messaging.system", "kafka");
producerSpan.tag("messaging.destination", record.topic());
producerSpan.tag("messaging.destination_kind", "topic");
// 注入到消息頭
Headers headers = record.headers();
headers.add("X-B3-TraceId", producerSpan.getTraceId().getBytes());
headers.add("X-B3-SpanId", producerSpan.getSpanId().getBytes());
headers.add("X-B3-ParentSpanId", currentSpan.getSpanId().getBytes());
headers.add("X-B3-Sampled", "1".getBytes());
// 添加自定義追蹤頭
headers.add("X-Trace-Service", "user-service".getBytes());
log.debug("Kafka消息追蹤 - 發送: topic={}, traceId={}",
record.topic(), producerSpan.getTraceId());
// 立即關閉生產者Span(異步發送)
tracer.close(producerSpan);
}
return record;
}
}
}
三大追蹤系統對比:
|
特性
|
Spring Cloud Sleuth + Zipkin
|
SkyWalking
|
Jaeger
|
|
架構模式 |
客户端-服務器
|
探針-收集器
|
客户端-收集器
|
|
數據存儲 |
ES, MySQL, Cassandra
|
ES, H2, MySQL, TiDB
|
Cassandra, ES, Kafka
|
|
UI界面 |
Zipkin UI
|
SkyWalking UI
|
Jaeger UI
|
|
語言支持 |
Java為主,多語言支持
|
多語言探針
|
多語言客户端
|
|
性能開銷 |
中等
|
低
|
低-中等
|
|
安裝部署 |
簡單
|
中等
|
簡單
|
|
生態系統 |
Spring Cloud生態
|
Apache項目
|
CNCF項目
|
|
監控維度 |
調用鏈、延遲
|
拓撲圖、指標、追蹤
|
分佈式追蹤
|
SkyWalking 配置示例:
# agent.config
# 服務名稱
agent.service_name=${SW_AGENT_NAME:user-service}
# 後端服務地址
collector.backend_service=${SW_AGENT_COLLECTOR:127.0.0.1:11800}
# 採樣配置
agent.sample_n_per_3_secs=${SW_AGENT_SAMPLE:-1}
# 忽略後綴
agent.ignore_suffix=${SW_AGENT_IGNORE_SUFFIX:.jpg,.jpeg,.png,.gif,.css,.js}
# 跨進程傳播配置
agent.cross_process_propagation_config=${SW_AGENT_CROSS_PROPAGATION:true}
Sleuth + Zipkin 生產配置:
spring:
sleuth:
# 採樣配置
sampler:
probability: 0.1 # 生產環境建議10%採樣率
# 日誌關聯
log:
slf4j:
whitelist-mdc-keys: traceId,spanId,parentSpanId
# 異步配置
async:
enabled: true
configurer:
enabled: true
# 調度任務配置
schedule:
enabled: true
skip-pattern: healthCheckTask
# 消息配置
messaging:
enabled: true
rabbit:
enabled: true
kafka:
enabled: true
zipkin:
# 生產環境Zipkin集羣
base-url: http://zipkin-cluster:9411
# 發送配置
sender:
type: web
# 壓縮啓用
compression:
enabled: true
# 連接池配置
rest-template:
max-total-connections: 100
max-per-route: 20
connection-timeout: 5000
read-timeout: 10000
# 日誌模式配置
logging:
pattern:
level: "%5p [${spring.application.name:-},%X{traceId:-},%X{spanId:-}]"
level:
org.springframework.cloud.sleuth: INFO
brave: WARN
zipkin2: WARN
# 監控配置
management:
endpoints:
web:
exposure:
include: health,metrics,prometheus,sleuth
endpoint:
sleuth:
enabled: true
metrics:
export:
zipkin:
enabled: true
step: 1m
高併發場景優化:
/**
* 高性能追蹤配置
* 針對高併發場景優化
*/
@Configuration
@Slf4j
public class HighPerformanceTracingConfig {
/**
* 異步Span報告器
* 避免阻塞業務線程
*/
@Bean
@Primary
public SpanReporter asyncSpanReporter(SpanReporter delegate) {
return new AsyncSpanReporter(delegate);
}
/**
* 批量Span報告器
* 減少網絡請求
*/
@Bean
public SpanReporter batchingSpanReporter(SpanReporter delegate) {
return BatchingSpanReporter.wrap(delegate)
.maxBatchSize(100) // 最大批量大小
.maxConcurrentBatches(5) // 最大併發批次
.batchInterval(Duration.ofSeconds(5)) // 批量間隔
.build();
}
/**
* 採樣策略優化
*/
@Bean
public Sampler adaptiveSampler() {
return new AdaptiveSampler()
.baseProbability(0.01) // 基礎採樣率1%
.maxProbability(0.5) // 最大采樣率50%
.windowSize(1000) // 採樣窗口
.build();
}
/**
* 自適應採樣器
*/
@Slf4j
public static class AdaptiveSampler implements Sampler {
private final double baseProbability;
private final double maxProbability;
private final int windowSize;
private final AtomicInteger requestCount = new AtomicInteger(0);
private final AtomicInteger sampleCount = new AtomicInteger(0);
private volatile double currentProbability;
public AdaptiveSampler(double baseProbability, double maxProbability, int windowSize) {
this.baseProbability = baseProbability;
this.maxProbability = maxProbability;
this.windowSize = windowSize;
this.currentProbability = baseProbability;
}
@Override
public boolean isSampled() {
int total = requestCount.incrementAndGet();
// 滑動窗口
if (total % windowSize == 0) {
adjustSamplingRate();
}
// 概率採樣
return Math.random() < currentProbability;
}
private void adjustSamplingRate() {
int sampled = sampleCount.get();
double actualRate = (double) sampled / windowSize;
// 動態調整採樣率
if (actualRate < baseProbability * 0.5) {
currentProbability = Math.min(currentProbability * 1.2, maxProbability);
} else if (actualRate > baseProbability * 1.5) {
currentProbability = Math.max(currentProbability * 0.8, baseProbability);
}
// 重置計數器
requestCount.set(0);
sampleCount.set(0);
log.info("調整採樣率: {} -> {}", actualRate, currentProbability);
}
}
}
洞察:分佈式追蹤是微服務可觀測性的核心支柱。合理的採樣策略、高效的數據收集和智能的上下文傳播,是構建生產級追蹤系統的關鍵。理解數據流和性能影響,才能在業務需求和系統開銷之間找到最佳平衡。