Flink CDC系列之:Kafka的Debezium JSON 結構定義類DebeziumJsonStruct

這是一個 Debezium JSON 結構定義類,使用枚舉來定義 Debezium JSON 格式的字段結構和位置信息。

類概述

public class DebeziumJsonStruct

這個類通過嵌套枚舉定義了 Debezium JSON 格式的完整結構,包括字段名稱和位置信息。

嵌套枚舉詳解
DebeziumStruct - 頂層結構

enum DebeziumStruct {
    SCHEMA(0, "schema"),
    PAYLOAD(1, "payload");

作用:定義 Debezium JSON 的頂層結構字段。

字段説明:

  • SCHEMA(0, “schema”):表結構信息(位置0,字段名"schema")
  • PAYLOAD(1, “payload”):數據負載信息(位置1,字段名"payload")

對應的 JSON 結構:

{
  "schema": { ... },    // SCHEMA 字段
  "payload": { ... }    // PAYLOAD 字段
}

DebeziumPayload - 數據負載結構

enum DebeziumPayload {
    BEFORE(0, "before"),
    AFTER(1, "after"),
    OPERATION(2, "op"),
    SOURCE(3, "source");

作用:定義 payload 對象內部的字段結構。

字段説明:

  • BEFORE(0, “before”):變更前的數據(位置0,字段名"before")
  • AFTER(1, “after”):變更後的數據(位置1,字段名"after")
  • OPERATION(2, “op”):操作類型(位置2,字段名"op")
  • SOURCE(3, “source”):源數據庫信息(位置3,字段名"source")

對應的 JSON 結構:

{
  "payload": {
    "before": { ... },      // BEFORE 字段
    "after": { ... },       // AFTER 字段  
    "op": "c",             // OPERATION 字段
    "source": { ... }      // SOURCE 字段
  }
}

DebeziumSource - 源信息結構

enum DebeziumSource {
    DATABASE(0, "db"),
    TABLE(1, "table");

作用:定義 source 對象內部的字段結構。

字段説明:

  • DATABASE(0, “db”):數據庫名稱(位置0,字段名"db")
  • TABLE(1, “table”):表名稱(位置1,字段名"table")

對應的 JSON 結構:

{
  "source": {
    "db": "inventory",     // DATABASE 字段
    "table": "users"       // TABLE 字段
  }
}

枚舉設計特點

位置和名稱雙重映射

private final int position;    // 位置索引
private final String fieldName; // 字段名稱

優勢:

  • 可以通過位置快速訪問
  • 可以通過名稱清晰標識

類型安全

使用枚舉替代字符串常量,避免拼寫錯誤:

// 安全的方式
DebeziumPayload.BEFORE.getFieldName()  // 返回 "before"

// 不安全的方式(容易拼錯)
String field = "befor";  // 拼寫錯誤

易於擴展
添加新字段只需在對應枚舉中添加新值:

enum DebeziumPayload {
    // 現有字段...
    TIMESTAMP(4, "ts_ms"),      // 新增時間戳字段
    TRANSACTION(5, "transaction"); // 新增事務字段

實際使用場景
在序列化器中的使用

public class DebeziumJsonSerializationSchema {
    
    public byte[] serialize(Event event) {
        // 構建 Debezium JSON 結構
        ObjectNode rootNode = objectMapper.createObjectNode();
        
        // 使用枚舉確保字段名稱一致
        rootNode.set(
            DebeziumStruct.SCHEMA.getFieldName(), 
            buildSchemaNode()
        );
        
        rootNode.set(
            DebeziumStruct.PAYLOAD.getFieldName(),
            buildPayloadNode(event)
        );
        
        return objectMapper.writeValueAsBytes(rootNode);
    }
    
    private ObjectNode buildPayloadNode(Event event) {
        ObjectNode payloadNode = objectMapper.createObjectNode();
        
        // 設置操作類型
        payloadNode.put(
            DebeziumPayload.OPERATION.getFieldName(),
            getOperationCode(event.op())
        );
        
        // 設置前後數據
        payloadNode.set(
            DebeziumPayload.BEFORE.getFieldName(),
            convertToJson(event.before())
        );
        
        payloadNode.set(
            DebeziumPayload.AFTER.getFieldName(), 
            convertToJson(event.after())
        );
        
        // 設置源信息
        payloadNode.set(
            DebeziumPayload.SOURCE.getFieldName(),
            buildSourceNode(event.tableId())
        );
        
        return payloadNode;
    }
    
    private ObjectNode buildSourceNode(TableId tableId) {
        ObjectNode sourceNode = objectMapper.createObjectNode();
        
        sourceNode.put(
            DebeziumSource.DATABASE.getFieldName(),
            tableId.getSchemaName()
        );
        
        sourceNode.put(
            DebeziumSource.TABLE.getFieldName(),
            tableId.getTableName()
        );
        
        return sourceNode;
    }
}

在反序列化器中的使用

public class DebeziumJsonDeserializationSchema {
    
    public Event deserialize(byte[] message) {
        JsonNode rootNode = objectMapper.readTree(message);
        
        // 使用枚舉字段名獲取節點
        JsonNode payloadNode = rootNode.get(
            DebeziumStruct.PAYLOAD.getFieldName()
        );
        
        // 解析操作類型
        String operation = payloadNode.get(
            DebeziumPayload.OPERATION.getFieldName()
        ).asText();
        
        // 解析源信息
        JsonNode sourceNode = payloadNode.get(
            DebeziumPayload.SOURCE.getFieldName()
        );
        
        String database = sourceNode.get(
            DebeziumSource.DATABASE.getFieldName()
        ).asText();
        
        String table = sourceNode.get(
            DebeziumSource.TABLE.getFieldName()
        ).asText();
        
        // ... 其他解析邏輯
    }
}

生成的完整 JSON 示例
基於這個結構定義,會生成如下格式的 JSON:

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "struct",
        "fields": [
          {"type": "int32", "optional": false, "field": "id"},
          {"type": "string", "optional": true, "field": "name"}
        ],
        "optional": true,
        "name": "mysql-server-1.inventory.users.Value",
        "field": "before"
      }
    ],
    "optional": false,
    "name": "mysql-server-1.inventory.users.Envelope"
  },
  "payload": {
    "before": null,
    "after": {
      "id": 1001,
      "name": "張三"
    },
    "op": "c",
    "source": {
      "db": "inventory",
      "table": "users"
    }
  }
}

擴展建議
當前定義比較基礎,實際 Debezium 格式包含更多字段,可以擴展為:

enum DebeziumPayload {
    BEFORE(0, "before"),
    AFTER(1, "after"), 
    OPERATION(2, "op"),
    SOURCE(3, "source"),
    TIMESTAMP(4, "ts_ms"),
    TRANSACTION(5, "transaction");
}

enum DebeziumSource {
    DATABASE(0, "db"),
    TABLE(1, "table"),
    SERVER_ID(2, "server_id"),
    BINLOG_FILE(3, "file"),
    BINLOG_POSITION(4, "pos"),
    CONNECTOR(5, "connector"),
    SNAPSHOT(6, "snapshot");
}

總結:DebeziumJsonStruct 是一個結構定義類,它通過嵌套枚舉:

✅ 定義標準結構:完整定義 Debezium JSON 格式的字段結構

✅ 提供類型安全:使用枚舉替代字符串常量,避免拼寫錯誤

✅ 支持雙向訪問:既可以通過名稱訪問,也可以通過位置訪問

✅ 易於維護擴展:集中管理字段定義,支持輕鬆添加新字段

✅ 提高代碼可讀性:清晰的枚舉名稱使代碼意圖更明確

這個類是 Debezium JSON 序列化和反序列化組件的基礎,確保了數據格式的規範性和一致性。