文章目錄

  • 統一日誌與鏈路追蹤 Sleuth + Zipkin 實踐
  • 目錄
  • 一、分佈式追蹤核心概念
  • 分佈式追蹤的基本概念
  • 追蹤數據模型
  • 二、Sleuth 自動埋點機制
  • ️ Sleuth 自動配置架構
  • HTTP 請求自動追蹤
  • 三、Trace 上下文傳播原理
  • 上下文傳播機制
  • Feign 客户端集成
  • 四、Zipkin 架構與數據流
  • ️ Zipkin 系統架構
  • Zipkin 集成配置
  • Zipkin 報告器實現
  • ⚡ 五、消息中間件集成
  • Kafka 消息追蹤
  • 六、SkyWalking vs Jaeger 對比
  • 功能對比分析
  • 遷移到 SkyWalking
  • 七、生產環境最佳實踐
  • 生產級配置
  • 性能優化建議
  • 一、分佈式追蹤核心概念
  • 二、Sleuth 自動埋點機制
  • 三、Trace 上下文傳播原理
  • 四、Zipkin 架構與數據流
  • ⚡ 五、消息中間件集成
  • 六、SkyWalking vs Jaeger 對比
  • 七、生產環境最佳實踐

調用鏈追蹤核心元素


Zipkin+Sleuth 鏈路追蹤整合_51CTO博客_HTTP

Span 數據結構定義

/**
* Span 數據模型
* 表示分佈式系統中的單個工作單元
*/
@Data
@Builder
@AllArgsConstructor
public class Span {
// 標識信息
private String traceId;          // 追蹤ID - 全局唯一
private String spanId;           // Span ID - 當前單元標識
private String parentSpanId;     // 父Span ID - 用於構建調用樹
private String name;             // Span名稱 - 操作描述
// 時間信息
private long timestamp;          // 開始時間戳
private long duration;           // 持續時間(微秒)
// 上下文信息
private Kind kind;               // 類型:CLIENT, SERVER, PRODUCER, CONSUMER
private boolean shared;          // 是否共享
private boolean debug;           // 是否調試模式
// 端點信息
private Endpoint localEndpoint;  // 本地服務端點
private Endpoint remoteEndpoint; // 遠程服務端點
// 註解和標籤
private List<Annotation> annotations;  // 時間點註解
  private Map<String, String> tags;      // 業務標籤
    // 狀態信息
    private boolean error;           // 是否發生錯誤
    private String errorMessage;     // 錯誤信息
    /**
    * Span 類型枚舉
    */
    public enum Kind {
    CLIENT,     // 客户端調用
    SERVER,     // 服務端處理
    PRODUCER,   // 消息生產者
    CONSUMER    // 消息消費者
    }
    /**
    * 服務端點信息
    */
    @Data
    @AllArgsConstructor
    public static class Endpoint {
    private String serviceName;  // 服務名稱
    private String ip;           // IP地址
    private int port;            // 端口號
    }
    /**
    * 時間點註解
    */
    @Data
    @AllArgsConstructor
    public static class Annotation {
    private long timestamp;     // 時間戳
    private String value;        // 註解值
    }
    }

️ Sleuth 自動配置架構

Sleuth 自動埋點組件

/**
* Sleuth 自動配置核心組件
* 負責自動注入追蹤上下文
*/
@Configuration
@EnableAspectJAutoProxy
@Slf4j
public class SleuthAutoConfiguration {
@Bean
@ConditionalOnMissingBean
public Tracer tracer(Sampler sampler, TraceContext traceContext) {
return new DefaultTracer(sampler, traceContext);
}
@Bean
@ConditionalOnMissingBean
public Sampler sampler() {
return Sampler.ALWAYS_SAMPLE; // 默認全量採樣
}
@Bean
@ConditionalOnMissingBean
public TraceContext traceContext() {
return new DefaultTraceContext();
}
/**
* HTTP 請求攔截器 - 自動注入追蹤頭
*/
@Bean
public TracingFilter tracingFilter(Tracer tracer) {
return new TracingFilter(tracer);
}
/**
* 異步任務追蹤支持
*/
@Bean
@ConditionalOnMissingBean
public TracingAsyncTaskExecutor tracingAsyncTaskExecutor(Tracer tracer) {
return new TracingAsyncTaskExecutor(tracer);
}
/**
* REST Template 攔截器
*/
@Bean
public RestTemplateCustomizer restTemplateCustomizer(Tracer tracer) {
return restTemplate -> {
List<ClientHttpRequestInterceptor> interceptors =
  new ArrayList<>(restTemplate.getInterceptors());
    interceptors.add(new TracingClientHttpRequestInterceptor(tracer));
    restTemplate.setInterceptors(interceptors);
    };
    }
    }
    /**
    * 默認追蹤器實現
    */
    @Component
    @Slf4j
    public class DefaultTracer implements Tracer {
    private final Sampler sampler;
    private final TraceContext traceContext;
    private final Random random = new Random();
    public DefaultTracer(Sampler sampler, TraceContext traceContext) {
    this.sampler = sampler;
    this.traceContext = traceContext;
    }
    @Override
    public Span nextSpan() {
    // 從當前上下文獲取或創建新的Span
    Span currentSpan = traceContext.getCurrentSpan();
    if (currentSpan != null) {
    return createChildSpan(currentSpan);
    } else {
    return createRootSpan();
    }
    }
    @Override
    public Span nextSpan(Span parent) {
    if (parent == null) {
    return createRootSpan();
    }
    return createChildSpan(parent);
    }
    /**
    * 創建根Span
    */
    private Span createRootSpan() {
    if (!sampler.isSampled()) {
    return Span.builder().sampled(false).build();
    }
    String traceId = generateTraceId();
    String spanId = generateSpanId();
    return Span.builder()
    .traceId(traceId)
    .spanId(spanId)
    .parentSpanId(null)
    .name("root")
    .timestamp(System.currentTimeMillis())
    .kind(Span.Kind.SERVER)
    .sampled(true)
    .build();
    }
    /**
    * 創建子Span
    */
    private Span createChildSpan(Span parent) {
    if (!parent.isSampled()) {
    return Span.builder().sampled(false).build();
    }
    String spanId = generateSpanId();
    return Span.builder()
    .traceId(parent.getTraceId())
    .spanId(spanId)
    .parentSpanId(parent.getSpanId())
    .name("child")
    .timestamp(System.currentTimeMillis())
    .kind(Span.Kind.CLIENT)
    .sampled(true)
    .localEndpoint(parent.getLocalEndpoint())
    .build();
    }
    /**
    * 生成Trace ID(128位)
    */
    private String generateTraceId() {
    byte[] bytes = new byte[16];
    random.nextBytes(bytes);
    return Hex.encodeHexString(bytes);
    }
    /**
    * 生成Span ID(64位)
    */
    private String generateSpanId() {
    byte[] bytes = new byte[8];
    random.nextBytes(bytes);
    return Hex.encodeHexString(bytes);
    }
    @Override
    public void close(Span span) {
    if (span != null && span.isSampled()) {
    span.setDuration(System.currentTimeMillis() - span.getTimestamp());
    log.debug("Span關閉: traceId={}, spanId={}, duration={}ms",
    span.getTraceId(), span.getSpanId(), span.getDuration());
    // 報告Span到收集器
    reportSpan(span);
    }
    }
    }

HTTP 攔截器實現

/**
* HTTP 請求追蹤過濾器
* 自動注入和傳播追蹤頭
*/
@Component
@Slf4j
public class TracingFilter implements Filter {
private final Tracer tracer;
private static final String TRACE_HEADER = "X-B3-TraceId";
private static final String SPAN_HEADER = "X-B3-SpanId";
private static final String PARENT_HEADER = "X-B3-ParentSpanId";
private static final String SAMPLED_HEADER = "X-B3-Sampled";
public TracingFilter(Tracer tracer) {
this.tracer = tracer;
}
@Override
public void doFilter(ServletRequest request, ServletResponse response,
FilterChain chain) throws IOException, ServletException {
HttpServletRequest httpRequest = (HttpServletRequest) request;
HttpServletResponse httpResponse = (HttpServletResponse) response;
// 1. 從請求頭提取或創建Span
Span span = extractOrCreateSpan(httpRequest);
try {
// 2. 將Span放入上下文
tracer.getTraceContext().setCurrentSpan(span);
// 3. 添加追蹤頭到響應
addTracingHeaders(httpResponse, span);
// 4. 記錄服務器接收事件
logServerReceived(span, httpRequest);
// 5. 繼續處理請求
chain.doFilter(request, response);
// 6. 記錄服務器發送事件
logServerSent(span, httpResponse);
} catch (Exception e) {
// 7. 記錄錯誤信息
span.setError(true);
span.setErrorMessage(e.getMessage());
span.tag("error", "true");
throw e;
} finally {
// 8. 關閉Span
tracer.close(span);
// 9. 清理上下文
tracer.getTraceContext().clear();
}
}
/**
* 從請求頭提取或創建新的Span
*/
private Span extractOrCreateSpan(HttpServletRequest request) {
String traceId = request.getHeader(TRACE_HEADER);
String spanId = request.getHeader(SPAN_HEADER);
String parentSpanId = request.getHeader(PARENT_HEADER);
String sampled = request.getHeader(SAMPLED_HEADER);
if (traceId != null && spanId != null) {
// 從頭部信息構建Span
return Span.builder()
.traceId(traceId)
.spanId(spanId)
.parentSpanId(parentSpanId)
.sampled("1".equals(sampled))
.kind(Span.Kind.SERVER)
.timestamp(System.currentTimeMillis())
.name(request.getMethod() + " " + request.getRequestURI())
.localEndpoint(buildEndpoint(request))
.build();
} else {
// 創建新的根Span
return tracer.nextSpan()
.name(request.getMethod() + " " + request.getRequestURI())
.kind(Span.Kind.SERVER)
.localEndpoint(buildEndpoint(request));
}
}
/**
* 添加追蹤頭到響應
*/
private void addTracingHeaders(HttpServletResponse response, Span span) {
if (span.isSampled()) {
response.setHeader(TRACE_HEADER, span.getTraceId());
response.setHeader(SPAN_HEADER, span.getSpanId());
response.setHeader(SAMPLED_HEADER, "1");
}
}
/**
* 記錄服務器接收事件
*/
private void logServerReceived(Span span, HttpServletRequest request) {
if (span.isSampled()) {
span.annotation(new Annotation(System.currentTimeMillis(), "sr"));
// 添加HTTP相關標籤
span.tag("http.method", request.getMethod());
span.tag("http.path", request.getRequestURI());
span.tag("http.host", request.getServerName());
span.tag("http.user_agent", request.getHeader("User-Agent"));
log.debug("服務器接收請求: {} {}, traceId: {}",
request.getMethod(), request.getRequestURI(), span.getTraceId());
}
}
/**
* 記錄服務器發送事件
*/
private void logServerSent(Span span, HttpServletResponse response) {
if (span.isSampled()) {
span.annotation(new Annotation(System.currentTimeMillis(), "ss"));
span.tag("http.status_code", String.valueOf(response.getStatus()));
log.debug("服務器發送響應: status={}, traceId={}",
response.getStatus(), span.getTraceId());
}
}
}

Trace 上下文跨服務傳播

/**
* 追蹤上下文管理器
* 負責跨線程、跨服務傳播追蹤上下文
*/
@Component
@Slf4j
public class TraceContext {
private final ThreadLocal<Span> currentSpan = new ThreadLocal<>();
  private final ThreadLocal<Map<String, String>> extraContext = new ThreadLocal<>();
    /**
    * 獲取當前Span
    */
    public Span getCurrentSpan() {
    return currentSpan.get();
    }
    /**
    * 設置當前Span
    */
    public void setCurrentSpan(Span span) {
    currentSpan.set(span);
    if (span != null) {
    log.debug("設置當前Span: traceId={}, spanId={}",
    span.getTraceId(), span.getSpanId());
    }
    }
    /**
    * 清除上下文
    */
    public void clear() {
    currentSpan.remove();
    if (extraContext.get() != null) {
    extraContext.get().clear();
    }
    extraContext.remove();
    }
    /**
    * 注入追蹤頭到HTTP請求
    */
    public void inject(HttpHeaders headers) {
    Span span = getCurrentSpan();
    if (span != null && span.isSampled()) {
    headers.set("X-B3-TraceId", span.getTraceId());
    headers.set("X-B3-SpanId", span.getSpanId());
    headers.set("X-B3-ParentSpanId", span.getParentSpanId());
    headers.set("X-B3-Sampled", "1");
    // 注入自定義上下文
    injectCustomContext(headers);
    }
    }
    /**
    * 從HTTP請求頭提取上下文
    */
    public Span extract(HttpHeaders headers) {
    String traceId = headers.getFirst("X-B3-TraceId");
    String spanId = headers.getFirst("X-B3-SpanId");
    String parentSpanId = headers.getFirst("X-B3-ParentSpanId");
    String sampled = headers.getFirst("X-B3-Sampled");
    if (traceId != null && spanId != null) {
    return Span.builder()
    .traceId(traceId)
    .spanId(spanId)
    .parentSpanId(parentSpanId)
    .sampled("1".equals(sampled))
    .build();
    }
    return null;
    }
    }
    /**
    * 跨線程上下文傳播
    */
    @Component
    @Slf4j
    public class TraceContextExecutor {
    private final TraceContext traceContext;
    /**
    * 包裝Runnable以傳播追蹤上下文
    */
    public Runnable wrap(Runnable task) {
    Span currentSpan = traceContext.getCurrentSpan();
    Map<String, String> currentContext = copyExtraContext();
      return () -> {
      try {
      // 在新的線程中恢復上下文
      traceContext.setCurrentSpan(currentSpan);
      if (currentContext != null) {
      traceContext.getExtraContext().putAll(currentContext);
      }
      task.run();
      } finally {
      traceContext.clear();
      }
      };
      }
      /**
      * 包裝Callable以傳播追蹤上下文
      */
      public <T> Callable<T> wrap(Callable<T> task) {
        Span currentSpan = traceContext.getCurrentSpan();
        Map<String, String> currentContext = copyExtraContext();
          return () -> {
          try {
          // 在新的線程中恢復上下文
          traceContext.setCurrentSpan(currentSpan);
          if (currentContext != null) {
          traceContext.getExtraContext().putAll(currentContext);
          }
          return task.call();
          } finally {
          traceContext.clear();
          }
          };
          }
          /**
          * 異步任務執行器
          */
          @Component
          public class TracingAsyncTaskExecutor implements AsyncTaskExecutor {
          private final AsyncTaskExecutor delegate;
          private final TraceContextExecutor contextExecutor;
          @Override
          public void execute(Runnable task, long startTimeout) {
          delegate.execute(contextExecutor.wrap(task), startTimeout);
          }
          @Override
          public Future<?> submit(Runnable task) {
            return delegate.submit(contextExecutor.wrap(task));
            }
            @Override
            public <T> Future<T> submit(Callable<T> task) {
              return delegate.submit(contextExecutor.wrap(task));
              }
              }
              }

Feign 客户端追蹤集成

/**
* Feign 客户端追蹤攔截器
*/
@Component
@Slf4j
public class TracingFeignInterceptor implements RequestInterceptor {
private final Tracer tracer;
private final TraceContext traceContext;
@Override
public void apply(RequestTemplate template) {
Span currentSpan = traceContext.getCurrentSpan();
if (currentSpan != null && currentSpan.isSampled()) {
// 創建客户端Span
Span clientSpan = tracer.nextSpan(currentSpan)
.name("feign:" + template.method() + " " + template.url())
.kind(Span.Kind.CLIENT)
.timestamp(System.currentTimeMillis());
// 記錄客户端發送事件
clientSpan.annotation(new Annotation(System.currentTimeMillis(), "cs"));
// 注入追蹤頭
template.header("X-B3-TraceId", clientSpan.getTraceId());
template.header("X-B3-SpanId", clientSpan.getSpanId());
template.header("X-B3-ParentSpanId", currentSpan.getSpanId());
template.header("X-B3-Sampled", "1");
// 添加業務標籤
clientSpan.tag("http.method", template.method());
clientSpan.tag("http.url", template.url());
clientSpan.tag("component", "feign");
// 將客户端Span保存到請求屬性中
template.attribute("clientSpan", clientSpan);
log.debug("Feign請求追蹤: {} {}, traceId: {}",
template.method(), template.url(), clientSpan.getTraceId());
}
}
/**
* Feign 響應處理
*/
@Component
public class TracingFeignLogger extends feign.Logger {
@Override
protected void log(String configKey, String format, Object... args) {
if (log.isDebugEnabled()) {
log.debug(format, args);
}
}
@Override
protected void logRequest(String configKey, Level logLevel, Request request) {
// 記錄請求日誌
if (log.isDebugEnabled()) {
super.logRequest(configKey, logLevel, request);
}
}
@Override
protected Response logAndRebufferResponse(String configKey, Level logLevel,
Response response, long elapsedTime) throws IOException {
// 記錄響應並處理Span
Request request = response.request();
Span clientSpan = (Span) request.requestTemplate().attribute("clientSpan");
if (clientSpan != null) {
// 記錄客户端接收事件
clientSpan.annotation(new Annotation(System.currentTimeMillis(), "cr"));
clientSpan.tag("http.status_code", String.valueOf(response.status()));
clientSpan.setDuration(elapsedTime * 1000); // 轉換為微秒
// 關閉客户端Span
tracer.close(clientSpan);
}
return super.logAndRebufferResponse(configKey, logLevel, response, elapsedTime);
}
}
}

️ Zipkin 系統架構

Zipkin 數據流架構


Zipkin+Sleuth 鏈路追蹤整合_51CTO博客_HTTP_02

Spring Cloud Sleuth Zipkin 配置

# application.yml Zipkin配置
spring:
zipkin:
# Zipkin服務器地址
base-url: http://zipkin-server:9411
# 啓用Zipkin報告
enabled: true
# 服務名稱
service:
name: user-service
# 定位信息
location:
# 自動發現(通過服務發現)
discovery:
enabled: true
# 或者直接指定
host: localhost
port: 9411
# 發送配置
sender:
type: web # 支持web, kafka, rabbit
# 壓縮配置
compression:
enabled: true
# 連接配置
connect-timeout: 5000
read-timeout: 10000
sleuth:
# 採樣率配置
sampler:
probability: 1.0 # 1.0表示100%採樣
# HTTP請求追蹤
web:
client:
enabled: true
# 跳過某些路徑
skip-pattern: /health,/info
# 異步追蹤
async:
enabled: true
# 消息追蹤
messaging:
enabled: true
# 調度任務追蹤
schedule:
enabled: true
# Redis追蹤
redis:
enabled: true
# 數據庫追蹤
jdbc:
enabled: true
# Zipkin客户端高級配置
management:
endpoints:
web:
exposure:
include: health,info,metrics,zipkin
endpoint:
zipkin:
enabled: true
metrics:
export:
zipkin:
enabled: true
# 日誌配置(顯示TraceId)
logging:
pattern:
level: "%5p [${spring.zipkin.service.name:},%X{traceId:-},%X{spanId:-}]"

Zipkin Span 報告器

/**
* Zipkin Span 報告器
* 負責將Span數據發送到Zipkin服務器
*/
@Component
@Slf4j
public class ZipkinSpanReporter implements SpanReporter {
private final ZipkinRestTemplateSender sender;
private final ObjectMapper objectMapper;
private final MeterRegistry meterRegistry;
// 指標監控
private final Counter spansSentCounter;
private final Counter spansDroppedCounter;
private final Timer sendTimer;
public ZipkinSpanReporter(ZipkinRestTemplateSender sender,
ObjectMapper objectMapper,
MeterRegistry meterRegistry) {
this.sender = sender;
this.objectMapper = objectMapper;
this.meterRegistry = meterRegistry;
// 初始化指標
this.spansSentCounter = meterRegistry.counter("zipkin.spans.sent");
this.spansDroppedCounter = meterRegistry.counter("zipkin.spans.dropped");
this.sendTimer = meterRegistry.timer("zipkin.send.duration");
}
@Override
public void report(Span span) {
if (!span.isSampled()) {
spansDroppedCounter.increment();
return;
}
try {
// 轉換Span為Zipkin格式
zipkin2.Span zipkinSpan = convertToZipkinSpan(span);
// 發送Span到Zipkin
sendTimer.record(() -> {
try {
sender.sendSpans(Collections.singletonList(zipkinSpan));
spansSentCounter.increment();
if (log.isDebugEnabled()) {
log.debug("Span發送成功: traceId={}, spanId={}",
span.getTraceId(), span.getSpanId());
}
} catch (Exception e) {
log.error("Span發送失敗", e);
spansDroppedCounter.increment();
}
});
} catch (Exception e) {
log.error("Span轉換失敗", e);
spansDroppedCounter.increment();
}
}
/**
* 轉換內部Span為Zipkin格式
*/
private zipkin2.Span convertToZipkinSpan(Span span) {
zipkin2.Span.Builder builder = zipkin2.Span.newBuilder()
.traceId(span.getTraceId())
.id(span.getSpanId())
.name(span.getName())
.timestamp(span.getTimestamp() * 1000) // 轉換為微秒
.duration(span.getDuration() * 1000);  // 轉換為微秒
// 設置父Span
if (span.getParentSpanId() != null) {
builder.parentId(span.getParentSpanId());
}
// 設置本地端點
if (span.getLocalEndpoint() != null) {
builder.localEndpoint(zipkin2.Endpoint.newBuilder()
.serviceName(span.getLocalEndpoint().getServiceName())
.ip(span.getLocalEndpoint().getIp())
.port(span.getLocalEndpoint().getPort())
.build());
}
// 設置遠程端點
if (span.getRemoteEndpoint() != null) {
builder.remoteEndpoint(zipkin2.Endpoint.newBuilder()
.serviceName(span.getRemoteEndpoint().getServiceName())
.ip(span.getRemoteEndpoint().getIp())
.port(span.getRemoteEndpoint().getPort())
.build());
}
// 添加註解
if (span.getAnnotations() != null) {
for (Annotation annotation : span.getAnnotations()) {
builder.addAnnotation(annotation.getTimestamp() * 1000, annotation.getValue());
}
}
// 添加標籤
if (span.getTags() != null) {
for (Map.Entry<String, String> tag : span.getTags().entrySet()) {
  builder.putTag(tag.getKey(), tag.getValue());
  }
  }
  // 設置Kind
  if (span.getKind() != null) {
  switch (span.getKind()) {
  case CLIENT:
  builder.kind(zipkin2.Span.Kind.CLIENT);
  break;
  case SERVER:
  builder.kind(zipkin2.Span.Kind.SERVER);
  break;
  case PRODUCER:
  builder.kind(zipkin2.Span.Kind.PRODUCER);
  break;
  case CONSUMER:
  builder.kind(zipkin2.Span.Kind.CONSUMER);
  break;
  }
  }
  // 設置共享標誌
  if (span.isShared()) {
  builder.shared(true);
  }
  // 設置調試標誌
  if (span.isDebug()) {
  builder.debug(true);
  }
  return builder.build();
  }
  }
  /**
  * Zipkin REST API 發送器
  */
  @Component
  @Slf4j
  public class ZipkinRestTemplateSender {
  private final RestTemplate restTemplate;
  private final String zipkinBaseUrl;
  private final ObjectMapper objectMapper;
  public ZipkinRestTemplateSender(RestTemplate restTemplate,
  @Value("${spring.zipkin.base-url}") String zipkinBaseUrl,
  ObjectMapper objectMapper) {
  this.restTemplate = restTemplate;
  this.zipkinBaseUrl = zipkinBaseUrl;
  this.objectMapper = objectMapper;
  }
  /**
  * 批量發送Span到Zipkin
  */
  public void sendSpans(List<zipkin2.Span> spans) {
    if (spans.isEmpty()) {
    return;
    }
    try {
    // 序列化Span列表
    String jsonSpans = objectMapper.writeValueAsString(spans);
    // 構建請求
    HttpHeaders headers = new HttpHeaders();
    headers.setContentType(MediaType.APPLICATION_JSON);
    headers.set("Content-Encoding", "gzip");
    HttpEntity<byte[]> request = new HttpEntity<>(
      gzipCompress(jsonSpans), headers);
      // 發送到Zipkin
      ResponseEntity<String> response = restTemplate.postForEntity(
        zipkinBaseUrl + "/api/v2/spans", request, String.class);
        if (!response.getStatusCode().is2xxSuccessful()) {
        log.warn("Zipkin響應異常: {}", response.getStatusCode());
        }
        } catch (Exception e) {
        throw new RuntimeException("發送Span到Zipkin失敗", e);
        }
        }
        /**
        * GZIP壓縮
        */
        private byte[] gzipCompress(String data) throws IOException {
        ByteArrayOutputStream bos = new ByteArrayOutputStream(data.length());
        GZIPOutputStream gzip = new GZIPOutputStream(bos);
        gzip.write(data.getBytes(StandardCharsets.UTF_8));
        gzip.close();
        return bos.toByteArray();
        }
        }

⚡ 五、消息中間件集成

Kafka 消息追蹤集成

/**
* Kafka 消息追蹤支持
* 自動注入和提取追蹤上下文
*/
@Configuration
@Slf4j
public class KafkaTracingConfiguration {
@Bean
public TracingProducerFactory<String, String> tracingProducerFactory(
  ProducerFactory<String, String> producerFactory,
    Tracer tracer) {
    return new TracingProducerFactory<>(producerFactory, tracer);
      }
      @Bean
      public TracingConsumerFactory<String, String> tracingConsumerFactory(
        ConsumerFactory<String, String> consumerFactory,
          Tracer tracer) {
          return new TracingConsumerFactory<>(consumerFactory, tracer);
            }
            }
            /**
            * 追蹤生產者工廠
            */
            @Component
            @Slf4j
            public class TracingProducerFactory<K, V> implements ProducerFactory<K, V> {
              private final ProducerFactory<K, V> delegate;
                private final Tracer tracer;
                private static final String TRACE_HEADER = "traceContext";
                @Override
                public Producer<K, V> createProducer() {
                  return new TracingProducer<>(delegate.createProducer(), tracer);
                    }
                    /**
                    * 追蹤生產者包裝器
                    */
                    private static class TracingProducer<K, V> implements Producer<K, V> {
                      private final Producer<K, V> delegate;
                        private final Tracer tracer;
                        public TracingProducer(Producer<K, V> delegate, Tracer tracer) {
                          this.delegate = delegate;
                          this.tracer = tracer;
                          }
                          @Override
                          public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
                            // 注入追蹤上下文到消息頭
                            ProducerRecord<K, V> tracedRecord = injectTraceContext(record);
                              return delegate.send(tracedRecord);
                              }
                              @Override
                              public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
                                ProducerRecord<K, V> tracedRecord = injectTraceContext(record);
                                  return delegate.send(tracedRecord, callback);
                                  }
                                  private ProducerRecord<K, V> injectTraceContext(ProducerRecord<K, V> record) {
                                    Span currentSpan = tracer.getTraceContext().getCurrentSpan();
                                    if (currentSpan != null && currentSpan.isSampled()) {
                                    // 創建生產者Span
                                    Span producerSpan = tracer.nextSpan(currentSpan)
                                    .name("kafka:produce:" + record.topic())
                                    .kind(Span.Kind.PRODUCER)
                                    .timestamp(System.currentTimeMillis());
                                    // 記錄生產者發送事件
                                    producerSpan.annotation(new Annotation(System.currentTimeMillis(), "ms"));
                                    producerSpan.tag("messaging.system", "kafka");
                                    producerSpan.tag("messaging.destination", record.topic());
                                    producerSpan.tag("messaging.destination_kind", "topic");
                                    // 注入到消息頭
                                    Headers headers = record.headers();
                                    headers.add("X-B3-TraceId", producerSpan.getTraceId().getBytes());
                                    headers.add("X-B3-SpanId", producerSpan.getSpanId().getBytes());
                                    headers.add("X-B3-ParentSpanId", currentSpan.getSpanId().getBytes());
                                    headers.add("X-B3-Sampled", "1".getBytes());
                                    // 添加自定義追蹤頭
                                    headers.add("X-Trace-Service", "user-service".getBytes());
                                    log.debug("Kafka消息追蹤 - 發送: topic={}, traceId={}",
                                    record.topic(), producerSpan.getTraceId());
                                    // 立即關閉生產者Span(異步發送)
                                    tracer.close(producerSpan);
                                    }
                                    return record;
                                    }
                                    }
                                    }

三大追蹤系統對比

特性

Spring Cloud Sleuth + Zipkin

SkyWalking

Jaeger

架構模式

客户端-服務器

探針-收集器

客户端-收集器

數據存儲

ES, MySQL, Cassandra

ES, H2, MySQL, TiDB

Cassandra, ES, Kafka

UI界面

Zipkin UI

SkyWalking UI

Jaeger UI

語言支持

Java為主,多語言支持

多語言探針

多語言客户端

性能開銷

中等


低-中等

安裝部署

簡單

中等

簡單

生態系統

Spring Cloud生態

Apache項目

CNCF項目

監控維度

調用鏈、延遲

拓撲圖、指標、追蹤

分佈式追蹤

SkyWalking 配置示例

# agent.config
# 服務名稱
agent.service_name=${SW_AGENT_NAME:user-service}
# 後端服務地址
collector.backend_service=${SW_AGENT_COLLECTOR:127.0.0.1:11800}
# 採樣配置
agent.sample_n_per_3_secs=${SW_AGENT_SAMPLE:-1}
# 忽略後綴
agent.ignore_suffix=${SW_AGENT_IGNORE_SUFFIX:.jpg,.jpeg,.png,.gif,.css,.js}
# 跨進程傳播配置
agent.cross_process_propagation_config=${SW_AGENT_CROSS_PROPAGATION:true}

Sleuth + Zipkin 生產配置

spring:
sleuth:
# 採樣配置
sampler:
probability: 0.1  # 生產環境建議10%採樣率
# 日誌關聯
log:
slf4j:
whitelist-mdc-keys: traceId,spanId,parentSpanId
# 異步配置
async:
enabled: true
configurer:
enabled: true
# 調度任務配置
schedule:
enabled: true
skip-pattern: healthCheckTask
# 消息配置
messaging:
enabled: true
rabbit:
enabled: true
kafka:
enabled: true
zipkin:
# 生產環境Zipkin集羣
base-url: http://zipkin-cluster:9411
# 發送配置
sender:
type: web
# 壓縮啓用
compression:
enabled: true
# 連接池配置
rest-template:
max-total-connections: 100
max-per-route: 20
connection-timeout: 5000
read-timeout: 10000
# 日誌模式配置
logging:
pattern:
level: "%5p [${spring.application.name:-},%X{traceId:-},%X{spanId:-}]"
level:
org.springframework.cloud.sleuth: INFO
brave: WARN
zipkin2: WARN
# 監控配置
management:
endpoints:
web:
exposure:
include: health,metrics,prometheus,sleuth
endpoint:
sleuth:
enabled: true
metrics:
export:
zipkin:
enabled: true
step: 1m

高併發場景優化

/**
* 高性能追蹤配置
* 針對高併發場景優化
*/
@Configuration
@Slf4j
public class HighPerformanceTracingConfig {
/**
* 異步Span報告器
* 避免阻塞業務線程
*/
@Bean
@Primary
public SpanReporter asyncSpanReporter(SpanReporter delegate) {
return new AsyncSpanReporter(delegate);
}
/**
* 批量Span報告器
* 減少網絡請求
*/
@Bean
public SpanReporter batchingSpanReporter(SpanReporter delegate) {
return BatchingSpanReporter.wrap(delegate)
.maxBatchSize(100)           // 最大批量大小
.maxConcurrentBatches(5)     // 最大併發批次
.batchInterval(Duration.ofSeconds(5))  // 批量間隔
.build();
}
/**
* 採樣策略優化
*/
@Bean
public Sampler adaptiveSampler() {
return new AdaptiveSampler()
.baseProbability(0.01)      // 基礎採樣率1%
.maxProbability(0.5)        // 最大采樣率50%
.windowSize(1000)           // 採樣窗口
.build();
}
/**
* 自適應採樣器
*/
@Slf4j
public static class AdaptiveSampler implements Sampler {
private final double baseProbability;
private final double maxProbability;
private final int windowSize;
private final AtomicInteger requestCount = new AtomicInteger(0);
private final AtomicInteger sampleCount = new AtomicInteger(0);
private volatile double currentProbability;
public AdaptiveSampler(double baseProbability, double maxProbability, int windowSize) {
this.baseProbability = baseProbability;
this.maxProbability = maxProbability;
this.windowSize = windowSize;
this.currentProbability = baseProbability;
}
@Override
public boolean isSampled() {
int total = requestCount.incrementAndGet();
// 滑動窗口
if (total % windowSize == 0) {
adjustSamplingRate();
}
// 概率採樣
return Math.random() < currentProbability;
}
private void adjustSamplingRate() {
int sampled = sampleCount.get();
double actualRate = (double) sampled / windowSize;
// 動態調整採樣率
if (actualRate < baseProbability * 0.5) {
currentProbability = Math.min(currentProbability * 1.2, maxProbability);
} else if (actualRate > baseProbability * 1.5) {
currentProbability = Math.max(currentProbability * 0.8, baseProbability);
}
// 重置計數器
requestCount.set(0);
sampleCount.set(0);
log.info("調整採樣率: {} -> {}", actualRate, currentProbability);
}
}
}

洞察:分佈式追蹤是微服務可觀測性的核心支柱。合理的採樣策略、高效的數據收集和智能的上下文傳播,是構建生產級追蹤系統的關鍵。理解數據流和性能影響,才能在業務需求和系統開銷之間找到最佳平衡。