Flink CDC系列之:Kafka的Debezium JSON 結構定義類DebeziumJsonStruct
這是一個 Debezium JSON 結構定義類,使用枚舉來定義 Debezium JSON 格式的字段結構和位置信息。
類概述
public class DebeziumJsonStruct
這個類通過嵌套枚舉定義了 Debezium JSON 格式的完整結構,包括字段名稱和位置信息。
嵌套枚舉詳解
DebeziumStruct - 頂層結構
enum DebeziumStruct {
SCHEMA(0, "schema"),
PAYLOAD(1, "payload");
作用:定義 Debezium JSON 的頂層結構字段。
字段説明:
- SCHEMA(0, “schema”):表結構信息(位置0,字段名"schema")
- PAYLOAD(1, “payload”):數據負載信息(位置1,字段名"payload")
對應的 JSON 結構:
{
"schema": { ... }, // SCHEMA 字段
"payload": { ... } // PAYLOAD 字段
}
DebeziumPayload - 數據負載結構
enum DebeziumPayload {
BEFORE(0, "before"),
AFTER(1, "after"),
OPERATION(2, "op"),
SOURCE(3, "source");
作用:定義 payload 對象內部的字段結構。
字段説明:
- BEFORE(0, “before”):變更前的數據(位置0,字段名"before")
- AFTER(1, “after”):變更後的數據(位置1,字段名"after")
- OPERATION(2, “op”):操作類型(位置2,字段名"op")
- SOURCE(3, “source”):源數據庫信息(位置3,字段名"source")
對應的 JSON 結構:
{
"payload": {
"before": { ... }, // BEFORE 字段
"after": { ... }, // AFTER 字段
"op": "c", // OPERATION 字段
"source": { ... } // SOURCE 字段
}
}
DebeziumSource - 源信息結構
enum DebeziumSource {
DATABASE(0, "db"),
TABLE(1, "table");
作用:定義 source 對象內部的字段結構。
字段説明:
- DATABASE(0, “db”):數據庫名稱(位置0,字段名"db")
- TABLE(1, “table”):表名稱(位置1,字段名"table")
對應的 JSON 結構:
{
"source": {
"db": "inventory", // DATABASE 字段
"table": "users" // TABLE 字段
}
}
枚舉設計特點
位置和名稱雙重映射
private final int position; // 位置索引
private final String fieldName; // 字段名稱
優勢:
- 可以通過位置快速訪問
- 可以通過名稱清晰標識
類型安全
使用枚舉替代字符串常量,避免拼寫錯誤:
// 安全的方式
DebeziumPayload.BEFORE.getFieldName() // 返回 "before"
// 不安全的方式(容易拼錯)
String field = "befor"; // 拼寫錯誤
易於擴展
添加新字段只需在對應枚舉中添加新值:
enum DebeziumPayload {
// 現有字段...
TIMESTAMP(4, "ts_ms"), // 新增時間戳字段
TRANSACTION(5, "transaction"); // 新增事務字段
實際使用場景
在序列化器中的使用
public class DebeziumJsonSerializationSchema {
public byte[] serialize(Event event) {
// 構建 Debezium JSON 結構
ObjectNode rootNode = objectMapper.createObjectNode();
// 使用枚舉確保字段名稱一致
rootNode.set(
DebeziumStruct.SCHEMA.getFieldName(),
buildSchemaNode()
);
rootNode.set(
DebeziumStruct.PAYLOAD.getFieldName(),
buildPayloadNode(event)
);
return objectMapper.writeValueAsBytes(rootNode);
}
private ObjectNode buildPayloadNode(Event event) {
ObjectNode payloadNode = objectMapper.createObjectNode();
// 設置操作類型
payloadNode.put(
DebeziumPayload.OPERATION.getFieldName(),
getOperationCode(event.op())
);
// 設置前後數據
payloadNode.set(
DebeziumPayload.BEFORE.getFieldName(),
convertToJson(event.before())
);
payloadNode.set(
DebeziumPayload.AFTER.getFieldName(),
convertToJson(event.after())
);
// 設置源信息
payloadNode.set(
DebeziumPayload.SOURCE.getFieldName(),
buildSourceNode(event.tableId())
);
return payloadNode;
}
private ObjectNode buildSourceNode(TableId tableId) {
ObjectNode sourceNode = objectMapper.createObjectNode();
sourceNode.put(
DebeziumSource.DATABASE.getFieldName(),
tableId.getSchemaName()
);
sourceNode.put(
DebeziumSource.TABLE.getFieldName(),
tableId.getTableName()
);
return sourceNode;
}
}
在反序列化器中的使用
public class DebeziumJsonDeserializationSchema {
public Event deserialize(byte[] message) {
JsonNode rootNode = objectMapper.readTree(message);
// 使用枚舉字段名獲取節點
JsonNode payloadNode = rootNode.get(
DebeziumStruct.PAYLOAD.getFieldName()
);
// 解析操作類型
String operation = payloadNode.get(
DebeziumPayload.OPERATION.getFieldName()
).asText();
// 解析源信息
JsonNode sourceNode = payloadNode.get(
DebeziumPayload.SOURCE.getFieldName()
);
String database = sourceNode.get(
DebeziumSource.DATABASE.getFieldName()
).asText();
String table = sourceNode.get(
DebeziumSource.TABLE.getFieldName()
).asText();
// ... 其他解析邏輯
}
}
生成的完整 JSON 示例
基於這個結構定義,會生成如下格式的 JSON:
{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{"type": "int32", "optional": false, "field": "id"},
{"type": "string", "optional": true, "field": "name"}
],
"optional": true,
"name": "mysql-server-1.inventory.users.Value",
"field": "before"
}
],
"optional": false,
"name": "mysql-server-1.inventory.users.Envelope"
},
"payload": {
"before": null,
"after": {
"id": 1001,
"name": "張三"
},
"op": "c",
"source": {
"db": "inventory",
"table": "users"
}
}
}
擴展建議
當前定義比較基礎,實際 Debezium 格式包含更多字段,可以擴展為:
enum DebeziumPayload {
BEFORE(0, "before"),
AFTER(1, "after"),
OPERATION(2, "op"),
SOURCE(3, "source"),
TIMESTAMP(4, "ts_ms"),
TRANSACTION(5, "transaction");
}
enum DebeziumSource {
DATABASE(0, "db"),
TABLE(1, "table"),
SERVER_ID(2, "server_id"),
BINLOG_FILE(3, "file"),
BINLOG_POSITION(4, "pos"),
CONNECTOR(5, "connector"),
SNAPSHOT(6, "snapshot");
}
總結:DebeziumJsonStruct 是一個結構定義類,它通過嵌套枚舉:
✅ 定義標準結構:完整定義 Debezium JSON 格式的字段結構
✅ 提供類型安全:使用枚舉替代字符串常量,避免拼寫錯誤
✅ 支持雙向訪問:既可以通過名稱訪問,也可以通過位置訪問
✅ 易於維護擴展:集中管理字段定義,支持輕鬆添加新字段
✅ 提高代碼可讀性:清晰的枚舉名稱使代碼意圖更明確
這個類是 Debezium JSON 序列化和反序列化組件的基礎,確保了數據格式的規範性和一致性。