博客 / 詳情

返回

一個月搞定100+表遷移:我的“偷師”Navicat實戰覆盤

個人聲明:本文所有代碼示例均已脱敏處理,僅保留核心技術邏輯,不涉及任何敏感業務信息。


前情提要:一個堪稱"社死"的工期

還記得那天,老闆把我叫到辦公室,遞過來一份需求文檔:"下個月要把項目遷移到新平台,數據這塊你來搞定。"

我打開文檔,掃了一眼,差點當場石化:

需求清單

  • 100+張數據表要遷移(還要支持後續動態新增)
  • 雙鏈路同步:MySQL到MySQL、MongoDB到PostgreSQL
  • 不能寫死配置,要能靈活擴展
  • 工期不到1個月

技術約束

  • 源環境(塔外)和目標環境(塔內)網絡完全隔離
  • 塔外只能讀源庫,無法訪問目標庫
  • 塔內只能寫目標庫,無法訪問源庫
  • 兩端唯一的橋樑:阿里雲OSS(塔外只能寫,塔內可以讀寫)
  • 塔內不支持MongoDB,必須用PostgreSQL替代

數據規模

  • 單表最大1000萬+行數據
  • 單店鋪單表50萬+行(涉及1000+個店鋪)
  • 總計100+張表

那一刻,我腦海裏浮現的畫面是:在公司地下室瘋狂寫MyBatis <select><insert>語句直到猝死...

但最終,我不僅提前5天完成遷移,還搞出了一套能讓後續表秒級上線的"全自動化流水線"。怎麼做到的?

答案就藏在Navicat的"導入/導出"功能裏——直接構造SQL文件上傳OSS,塔內執行,複雜邏輯全都在塔外處理!


一眼望去的七大技術難點

在開始動手前,我先梳理了一下面臨的挑戰:

難點1:表結構千差萬別
100+張表,每張表的字段、類型、主鍵都不一樣。傳統MyBatis方式意味着要寫100+個Mapper、100+個實體類。後續新增表還得繼續寫,代碼複用度≈0

難點2:同步策略多樣化
100+張表需要支持四種同步策略,條件各不相同:

  • 全表同步:基礎配置表,數據量小,TRUNCATE後一次性插入全部數據
  • 公司級條件同步:按company_id維度同步,支持條件過濾
  • 店鋪級增量同步:有is_deletedupdate_time的表,按shop_id+時間條件增量同步
  • 店鋪級全量同步:物理刪除的表,按shop_id維度全量同步單店鋪數據

每張表的策略和條件都不同,需要支持靈活配置

難點3:數據內容包含特殊字符
某些字段的內容包含分號、單引號等SQL特殊字符,如果不處理,生成的SQL文件會在執行時語法報錯。

難點4:超大數據量
單表1000萬+數據,一次性加載到內存必然OOM。而且生成的SQL文件可能幾百MB,網絡傳輸和存儲都是問題。

難點5:MongoDB到PostgreSQL的類型鴻溝
MongoDB的ObjectId、BSON對象、數組類型,PostgreSQL都不支持。需要做複雜的類型映射和轉換。

難點6:網絡隔離架構
塔外和塔內網絡完全隔離,傳統的ETL工具(DataX)根本用不了。它們都是"讀→處理→寫"的單機模式,需要同時訪問源庫和目標庫。

解決方案:自己搭建一個類似navicat的導入/導出,能動態執行SQL的功能。

難點7:表間依賴關係導致的順序問題
部分表之間存在外鍵依賴關係(如order_items依賴orders),如果併發同步:

  • order_items先執行插入,但orders還未同步 → 外鍵約束失敗
  • 需要識別依賴關係,先同步父表,再同步子表,保證數據完整性

解決方案:塔內掃描SQL文件時,優先處理父表,再併發處理其他表


靈感來源:Navicat是怎麼做的?

某天深夜,我打開Navicat準備手動導出第一批測試數據。盯着"導出嚮導"發呆的時候,突然腦子裏閃過一個念頭:

Navicat是怎麼做到導出任意表的?

我點開導出的.sql文件:

-- 刪除舊錶
DROP TABLE IF EXISTS `demo_table`;

-- 重建表結構
CREATE TABLE `demo_table` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(50) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB;

-- 插入數據
INSERT INTO `demo_table` VALUES (1, 'test');

豁然開朗!Navicat的核心邏輯就是:

  1. SHOW CREATE TABLE獲取表結構
  2. SELECT *查詢數據
  3. 生成標準SQL文件
  4. 用户手動在目標庫執行

如果我把這套邏輯自動化呢?

  • 塔外:自動查表結構、自動查數據、自動生成SQL、自動上傳OSS
  • 塔內:自動掃描OSS、自動讀取SQL文件、自動執行

這不就完美契合了"塔外-塔內"的架構約束嗎!


核心方案設計

整體架構流程

f91cc3bf56d0539d6a6a3560ded4f617

技術選型説明

塔外系統技術棧

組件 選型 使用場景 選型理由
消息隊列 RocketMQ 觸發同步,異步解耦進行SQL文件構造 支持TAG過濾(MySQLToMySQL/MongodbToPgSQL)
順序消費保證數據一致性,支持可後續擴展同步類型例如RedisToMySQL
流式處理 JDBC Stream
MongoTemplate
讀取超大表數據 避免OOMsetFetchSize(Integer.MIN_VALUE)啓用MySQL服務器端遊標,Mongo使用流式讀取的api,內存佔用恆定
配置管理 MySQL配置表 管理同步規則 配置驅動,新增表無需改代碼,支持佔位符動態替換{shopId}/{companyId}
文件上傳 阿里雲OSS SDK SQL文件上傳 唯一能打通塔外塔內的橋樑,可用性99.995%,支持大文件

塔內系統技術棧

組件 選型 使用場景 選型理由
併發控制 CompletableFuture 併發處理多個SQL文件 JDK8原生,無需引入第三方庫,輕量級異步編程
文件下載 阿里雲OSS SDK SQL文件下載和刪除 流式下載,支持逐行讀取,執行成功後立即刪除防止重複
批量執行 JDBC Batch SQL批量執行 1000條/批平衡性能和內存,setAutoCommit(true)防止事務過大

第一難:100+張表結構各異,怎麼動態生成SQL?

傳統方案的絕望之路

如果用傳統MyBatis寫法,畫面會是這樣:

<!-- 表1的Mapper -->
<select id="queryTable1">
    SELECT id, name, create_time FROM table_1 WHERE shop_id = #{shopId}
</select>

<!-- 表2的Mapper -->
<select id="queryTable2">
    SELECT id, title, status FROM table_2 WHERE company_id = #{companyId}
</select>

<!-- ...重複100次... -->

手寫100個Mapper?別説一個月,一年都寫不完!而且後續新增表還得繼續寫,代碼複用度約等於0。

靈感來源:SHOW CREATE TABLE

MySQL提供了一個神器:SHOW CREATE TABLE

SHOW CREATE TABLE `user_info`;

輸出:

CREATE TABLE `user_info` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `username` varchar(50) DEFAULT NULL,
  `create_time` datetime DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB;

拿到建表語句 = 拿到了一切表信息(字段名、類型、主鍵...)

核心實現:動態解析表結構

public TableStructure getTableStructure(DataSource ds, String tableName) {
    String sql = "SHOW CREATE TABLE `" + tableName + "`";
    
    try (Connection conn = ds.getConnection();
         Statement stmt = conn.createStatement();
         ResultSet rs = stmt.executeQuery(sql)) {
        
        if (rs.next()) {
            String ddl = rs.getString(2);  // 第2列是DDL語句
            
            // 核心:正則解析DDL語句
            List<String> columns = parseColumns(ddl);      // 提取字段名
            String primaryKey = parsePrimaryKey(ddl);      // 提取主鍵
            
            return new TableStructure(columns, primaryKey);
        }
    }
    return null;
}

關鍵亮點

  1. 表名轉義:防止關鍵字衝突(如表名叫orderuser
  2. 正則解析DDL:一次性獲取字段、主鍵、類型信息
  3. 零硬編碼:任何表都能自動處理,後續新增表只需加配置

你問怎麼知道哪張表要同步?表名從哪來?請繼續往下看...(第三難中有解決方案,通過配置表實現)

這裏用到JDBC編程,適合當前業務需求(古法編程,不得已而為之)

生成完整SQL文件

拿到表結構後,生成標準SQL文件:

// 1. 先刪除目標環境的舊數據(保證冪等性)
String deleteStatement = "DELETE FROM `user_info` WHERE shop_id = 12345;\n";

// 2. 批量插入新數據(每批1000條)
String insertStatement = 
    "INSERT INTO `user_info` (`id`, `username`, `create_time`) VALUES\n" +
    "(1, 'Alice', '2025-01-01 12:00:00'),\n" +
    "(2, 'Bob', '2025-01-02 13:00:00');\n";

上傳到OSS後,塔內直接逐行讀取執行,完美!


第二難:數據裏有分號,SQL會被切割炸掉!

問題現場

默認SQL語句以;結尾,但數據內容可能包含各種特殊情況:

-- 情況1: 數據中包含分號
INSERT INTO `content` VALUES (1, '教程:Java;Spring;MyBatis');

-- 情況2: 數據以分號結尾
INSERT INTO `config` VALUES (2, 'path=/usr/local/bin;');

-- 情況3: 數據中有換行符,且以;結尾
INSERT INTO `article` VALUES (3, '第一行
第二行;
第三行');

塔內如果用;判斷SQL結束:

String line = reader.readLine();  
// 只讀到: INSERT INTO `content` VALUES (1, '教程:Java
// 數據被截斷了!

導致SQL切割錯位、語法報錯。

解決方案:特殊符號標記 + 逐行讀取

核心思路:每條SQL獨佔一行,用特殊符號;#END#標記結束

塔外生成SQL時

// 關鍵:使用特殊符號作為SQL結束標記
String SPECIAL_DELIMITER = ";#END#";

// 構造SQL(數據內容裏的分號、換行符都不處理)
String sql = "INSERT INTO `content` VALUES (1, 'Java;Spring')";  

// 寫入文件:每條SQL獨佔一行,以特殊符號結尾
writer.write(sql + SPECIAL_DELIMITER);
writer.write(System.lineSeparator());  // 系統換行符

上傳到OSS的文件內容:

INSERT INTO `content` VALUES (1, 'Java;Spring');#END#
INSERT INTO `config` VALUES (2, 'path=/usr/bin;');#END#
INSERT INTO `article` VALUES (3, '第一行\n第二行');#END#

説明

  • 每條SQL獨佔一行(以System.lineSeparator()換行)
  • 每條SQL以;#END#結尾(完整的SQL結束標記)
  • 數據內容裏的分號;、換行符\n等都保持原樣

塔內執行前還原

try (BufferedReader reader = new BufferedReader(
         new InputStreamReader(ossStream))) {
    
    List<String> sqlBatch = new ArrayList<>();
    StringBuilder currentSql = new StringBuilder();
    String line;
    
    while ((line = reader.readLine()) != null) {
        // 拼接當前行
        currentSql.append(line);
        
        // 檢查是否是完整的SQL(以;#END#結尾)
        if (currentSql.toString().endsWith(";#END#")) {
            // 還原:特殊符號 → 正常分號
            String realSql = currentSql.toString().replace(";#END#", ";");
            
            // 添加到批次
            sqlBatch.add(realSql);
            currentSql.setLength(0);  // 清空,準備下一條SQL
            
            // 批量執行(每500條一批)
            if (sqlBatch.size() >= 100) {
                executeBatch(stmt, sqlBatch);
                sqlBatch.clear();
            }
        }
    }
    
    // 執行剩餘SQL
    if (!sqlBatch.isEmpty()) {
        executeBatch(stmt, sqlBatch);
    }
}

為什麼選;#END#

  • 足夠長,不會和數據內容衝突(實測幾千萬條數據從未衝突)
  • 標記明確,易於理解
  • 塔內處理簡單,一行代碼搞定

關鍵點:為什麼塔內要逐行讀取?

原因一:SQL文件可能很大

單個SQL文件可能達到幾百MB(如50萬行數據),如果一次性讀取:

  • 內存佔用過高:100MB文件加載需要幾百MB+內存,而且多線程處理更容易造成OOM
  • GC壓力大:大對象頻繁創建和回收

原因二:無法按普通分號切割

如果用;切割會出錯:

// ❌ 錯誤做法
String[] sqls = allContent.split(";");  // 會誤切數據裏的分號!

正確做法:逐行拼接,遇到;#END#才算完整

// ✅ 正確做法
StringBuilder currentSql = new StringBuilder();
while ((line = reader.readLine()) != null) {
    currentSql.append(line);
    
    if (currentSql.toString().endsWith(";#END#")) {
        String sql = currentSql.toString().replace(";#END#", ";");
        executeBatch(sql);
        currentSql.setLength(0);  // 清空,準備下一條
    }
}

SQL文件格式示例

DELETE FROM `table` WHERE id = 1;#END#
INSERT INTO `table` VALUES (1, 'data;with;semicolons');#END#
INSERT INTO `table` VALUES (2, 'line1\nline2');#END#

第三難:同步策略多樣化,怎麼靈活配置?

背景:四種同步策略

同步策略 適用場景 SQL操作 數據範圍
全表同步 基礎配置表(數據量小,千行級) TRUNCATE + INSERT 整張表的所有數據
公司級條件同步 按公司維度管理的表 DELETE WHERE company_id=? + INSERT 單個公司的所有數據
店鋪級增量同步 有軟刪除標記和更新時間的表 DELETE WHERE shop_id=? AND ... + INSERT 單店鋪增量數據
店鋪級全量同步 物理刪除的表 DELETE WHERE shop_id=? + INSERT 單店鋪全部數據

問題:100+張表裏,四種策略混雜,查詢條件各不相同。需要靈活配置每張表的同步策略和WHERE條件。

解決方案:配置驅動 + 佔位符

核心思想:把同步策略、查詢條件放到配置表裏,每張表單獨配置

配置表設計

CREATE TABLE `sync_config` (
    `id` int PRIMARY KEY,
    `table_name` varchar(100),
    `table_level` varchar(20),     -- company/shop
    `sync_type` int,                -- 0:全表, 1:條件同步
    `where_condition` text,         -- WHERE條件模板(支持佔位符)
    `delete_strategy` varchar(20)   -- TRUNCATE/DELETE
);

配置示例

-- 全表同步
INSERT INTO sync_config VALUES (1, 'sys_config', 'company', 0, NULL, 'TRUNCATE');

-- 公司級條件同步
INSERT INTO sync_config VALUES (2, 'company_settings', 'company', 1, 
    'company_id = {companyId} AND status = 1', 'DELETE');

-- 店鋪級增量同步
INSERT INTO sync_config VALUES (3, 'user_table', 'shop', 1, 
    'shop_id = {shopId} AND update_time > {lastTime}', 'DELETE');

-- 店鋪級全量同步
INSERT INTO sync_config VALUES (4, 'order_table', 'shop', 1, 
    'shop_id = {shopId}', 'DELETE');

佔位符替換邏輯

private String buildWhereCondition(String template, SyncContext ctx) {
    if (template == null) return "";  // 全表同步,無WHERE條件
    
    return template
        .replace("{shopId}", String.valueOf(ctx.getShopId()))
        .replace("{companyId}", String.valueOf(ctx.getCompanyId()))
        .replace("{lastTime}", ctx.getLastSyncTime());
}

SQL生成過程(以店鋪級增量同步為例)

步驟1:構造查詢SQL

// 佔位符替換後得到WHERE條件
String whereCondition = "shop_id = 123 AND update_time > '2025-01-15 00:00:00'";

// 構造SELECT語句
String selectSql = "SELECT * FROM user_table WHERE " + whereCondition;

步驟2:流式讀取並生成SQL文件

關鍵點:從ResultSet元數據動態獲取字段,而非寫死字段名

try (ResultSet rs = stmt.executeQuery(selectSql)) {
    ResultSetMetaData metadata = rs.getMetaData();
    int columnCount = metadata.getColumnCount();
    
    // 從元數據獲取列名列表
    List<String> columnNames = new ArrayList<>();
    for (int i = 1; i <= columnCount; i++) {
        columnNames.add(metadata.getColumnName(i));
    }
    
    // 1. 先寫DELETE語句
    writer.write("DELETE FROM user_table WHERE " + whereCondition + ";#END#");
    writer.write(System.lineSeparator());
    
    // 2. 構造INSERT語句頭部(字段名從元數據獲取)
    String insertHeader = "INSERT INTO `user_table` (" + 
        String.join(", ", columnNames) + ") VALUES\n";
    
    StringBuilder values = new StringBuilder();
    int batchCount = 0;
    
    // 3. 流式讀取數據並拼接VALUES
    while (rs.next()) {
        values.append("(");
        for (int i = 1; i <= columnCount; i++) {
            if (i > 1) values.append(", ");
            // 根據字段類型格式化值(動態處理)
            values.append(formatValue(rs, i, metadata.getColumnType(i)));
        }
        values.append(")");
        batchCount++;
        
        // 每10行生成一條INSERT
        if (batchCount >= 10) {
            writer.write(insertHeader + values.toString() + ";#END#");
            writer.write(System.lineSeparator());
            values.setLength(0);
            batchCount = 0;
        } else {
            values.append(", ");
        }
    }
    
    // 4. 處理剩餘數據
    if (batchCount > 0) {
        writer.write(insertHeader + values.toString() + ";#END#");
    }
}

最終生成的SQL文件

DELETE FROM user_table WHERE shop_id = 123 AND update_time > '2025-01-15 00:00:00';#END#
INSERT INTO `user_table` (id, shop_id, username, update_time) VALUES
(1, 123, 'Alice', '2025-01-16 10:00:00'),
(2, 123, 'Bob', '2025-01-16 11:00:00');#END#

優勢總結

靈活性:四種策略自由配置,滿足不同表的需求
可擴展:新增表只需加配置,代碼零改動
佔位符:支持{shopId}{companyId}{lastTime}等動態參數
零硬編碼:字段名從元數據動態獲取,適配任意表結構


第四難:單表50W+數據,如何防止OOM?

問題:傳統方式的內存殺手

// 反面教材:一次性加載全部數據
String sql = "SELECT * FROM huge_table WHERE shop_id = 123";
List<Map<String, Object>> allRows = jdbcTemplate.queryForList(sql);  // 直接OOM

單店鋪單表可能50W+行,全部加載到內存會導致OutOfMemoryError。

解決方案:流式讀取 + 臨時文件

MySQL流式讀取

private void generateSQL(DataSource ds, String sql) throws SQLException {
    try (Connection conn = ds.getConnection();
         Statement stmt = conn.createStatement(
             ResultSet.TYPE_FORWARD_ONLY,    // 只向前遍歷
             ResultSet.CONCUR_READ_ONLY)) {  // 只讀模式
        
        // 核心:啓用MySQL流式讀取
        stmt.setFetchSize(Integer.MIN_VALUE);  // MySQL JDBC特殊約定!
        
        try (ResultSet rs = stmt.executeQuery(sql)) {
            int batchCount = 0;
            StringBuilder sqlValues = new StringBuilder();
            
            while (rs.next()) {  // 逐行處理
                sqlValues.append("(");
                for (int i = 1; i <= columnCount; i++) {
                    sqlValues.append(formatValue(rs, i));
                }
                sqlValues.append(")");
                batchCount++;
                
                // 每10行生成一條INSERT
                if (batchCount >= 10) {
                    writeInsert(sqlValues.toString());
                    sqlValues.setLength(0);  // 清空緩衝
                    batchCount = 0;
                }
            }
        }
    }
}

核心技巧

  • stmt.setFetchSize(Integer.MIN_VALUE):MySQL JDBC的特殊約定,啓用服務器端遊標
  • 每次只拉取1行數據到客户端,內存佔用恆定
  • 批量拼接VALUES:多行生成一條INSERT,減少SQL數量

MongoDB流式讀取

CloseableIterator<Document> iterator = 
    mongoTemplate.stream(query, Document.class, collectionName);

try {
    while (iterator.hasNext()) {
        Document doc = iterator.next();  // 逐文檔處理
        processDocument(doc);
    }
} finally {
    iterator.close();  // ⚠️ 必須手動關閉,否則連接泄漏!
}

塔內執行:流式讀取

try (BufferedReader reader = new BufferedReader(
         new InputStreamReader(ossStream))) {
    
    List<String> sqlBatch = new ArrayList<>();
    StringBuilder currentSql = new StringBuilder();
    String line;
    
    while ((line = reader.readLine()) != null) {
        // 拼接當前行
        currentSql.append(line);
        
        // 檢查是否是完整的SQL(以;#END#結尾)
        if (currentSql.toString().endsWith(";#END#")) {
            // 還原:特殊符號 → 正常分號
            String realSql = currentSql.toString().replace(";#END#", ";");
            
            // 添加到批次
            sqlBatch.add(realSql);
            currentSql.setLength(0);  // 清空,準備下一條SQL
            
            // 批量執行(每100條一批,塔外10條數據構造成1個insert語句)
            if (sqlBatch.size() >= 100) {
                executeBatch(stmt, sqlBatch);
                sqlBatch.clear();
            }
        }
    }
    
    // 執行剩餘SQL
    if (!sqlBatch.isEmpty()) {
        executeBatch(stmt, sqlBatch);
    }
    
    // 關鍵:自動提交,避免事務過大
    conn.setAutoCommit(true);
}

為什麼setAutoCommit(true)

單文件可能幾千條SQL,如果在一個事務裏會導致:

  • 鎖表時間過長
  • 回滾日誌暴漲
  • 內存佔用飆升

自動提交後,每條SQL獨立提交,避免以上問題。

效果對比:

方案 內存佔用 風險
一次性加載 2GB(50W行) 必然OOM
流式處理 50MB(常量級) 穩定

第五難:MongoDB到PostgreSQL的類型轉換

問題

MongoDB和PostgreSQL的數據類型完全不兼容:

MongoDB PostgreSQL 問題
ObjectId 無對應類型 主鍵轉換
BSON對象 JSONB 嵌套結構
數組 Array 類型聲明

解決方案

在配置表的擴展字段定義類型映射:

{
  "mongoCollection": "user_profile",
  "pgTable": "user_profile",
  "fieldMapping": {
    "_id": "id",
    "preferences": "preferences",
    "tags": "tags"
  },
  "typeMapping": {
    "_id": "OBJECTID_TO_VARCHAR",
    "preferences": "JSONB",
    "tags": "INTEGER_ARRAY"
  }
}

類型轉換代碼:

private String convertValue(Object value, String typeRule) {
    if (value == null) return "NULL";
    
    switch (typeRule) {
        case "JSONB":
            // {name: "test"} → '{"name":"test"}'::jsonb
            String json = toJsonString(value);
            return "'" + escapeSql(json) + "'::jsonb";
            
        case "INTEGER_ARRAY":
            // [1,2,3] → ARRAY[1,2,3]::INTEGER[]
            List<Integer> list = (List) value;
            return "ARRAY[" + String.join(",", list) + "]::INTEGER[]";
            
        case "OBJECTID_TO_VARCHAR":
            // ObjectId("507f...") → '507f...'
            return "'" + value.toString() + "'";
            
        default:
            return convertDefault(value);
    }
}

覆盤:一個月完成遷移的關鍵

整體架構:塔外-塔內雙鏈路

┌──────────── 塔外系統 (Outer) ────────────┐
│                                         │
│  ① API觸發同步                          │
│  ② 查詢配置表 → 拆分公司級/店鋪級配置       │
│  ③ 構建MQ消息 → 投遞RocketMQ             │
│  ④ MQ Consumer                         │
│     ├─ SHOW CREATE TABLE 獲取表結構       │
│     ├─ 流式讀取源數據庫                    │
│     ├─ 生成 DELETE + INSERT SQL          │
│     ├─ 分號替換為特殊符號                  │
│     └─ 上傳到 OSS                        │
└───────────────────────────────────────────┘
                    │
                    │ OSS中轉
                    ↓
┌──────────── 塔內系統 (Inner) ────────────┐
│                                         │
│  ⑤ 定時任務 / 手動觸發                    │
│  ⑥ 掃描OSS目錄 → 獲取待處理SQL文件列表      │
│  ⑦ 流式下載SQL文件 → 逐行讀取              │
│     ├─ 特殊符號還原為分號                  │
│     ├─ 批量執行(1000條/批)                │
│     └─ setAutoCommit(true) 防止事務過大   │
│  ⑧ 執行成功 → 立即刪除OSS文件              │
└───────────────────────────────────────────┘

核心亮點總結

技術點 傳統方案 本方案 效果
表結構獲取 手寫100個Mapper SHOW CREATE TABLE動態解析 零硬編碼,支持任意表
SQL分隔符 ;判斷結束 特殊符號;#END# 支持數據含分號、換行符
同步策略 全量同步or硬編碼 配置表+佔位符 靈活配置,4種策略
大數據量處理 一次性加載(OOM) 流式讀取+臨時文件 常量級內存,50W+行穩定
擴展性 新增表需改代碼 只需加配置 秒級上線新表同步

做對的3件事

1. 從工具中偷師學藝
Navicat的導入/導出功能啓發了整體方案,SHOW CREATE TABLE是突破口

2. 把複雜邏輯放在塔外
塔內只負責執行SQL,邏輯簡單;塔外可以隨意調試、優化

3. 配置驅動,而非代碼驅動
新增表只需加配置,不改代碼。後續維護成本趨近於0

最終效果

指標 數據
遷移表數量 200+張(含後續新增)
最大單表數據 1000+萬行
首次全量同步 10-30分鐘
日常增量同步 公司級表約30秒,店鋪級表約1分鐘
內存佔用 穩定在200MB左右
OOM次數 0(連續運行3個月)
工期 25天(提前5天完成)


寫在最後

以上便是我這次遷移實戰的全部分享。絕非標準答案,但希望能為你帶來一絲靈感。

這次遷移讓我深刻體會到:
好的架構不是設計出來的,而是從實際問題中"偷"出來的。

當你面對技術難題時,不妨問自己:

  • 有沒有現成的工具已經解決了類似問題?不要重複造輪子!!(Navicat)
  • 數據庫/框架本身提供了什麼能力?(SHOW CREATE TABLE、setFetchSize)
  • 能否用配置代替硬編碼?(配置表+佔位符)

感謝那些"默默扛下所有"的技術細節

  • SHOW CREATE TABLE —— 你扛下了表結構解析的苦活
  • stmt.setFetchSize(Integer.MIN_VALUE) —— 你默默守護了內存安全
  • ;#END# —— 你可能是全網最詭異但最實用的分隔符
  • RocketMQ的TAG過濾 —— 你讓消息路由變得優雅
  • CompletableFuture —— 你讓塔內併發處理成為可能
  • System.lineSeparator() —— 你讓SQL文件格式清晰明瞭

最後送大家一段話

寫代碼的時候,我們都是站在巨人肩膀上的追夢人。

技術本身沒有高低貴賤,能解決問題的就是好技術。不要盲目追求所謂的"最佳實踐",在約束下求最優解,才是工程師的智慧。

願你在技術的道路上,既能仰望星空,也能腳踏實地。


"在技術的世界裏,沒有完美的方案,只有最合適的選擇。
而最合適的選擇,往往來自於對問題本質的深刻理解。"
—— 一個在生產環境爬坑的後端開發


文章的最後,想和你多聊兩句。

技術之路,常常是熱鬧與孤獨並存。那些深夜的調試、靈光一閃的方案、還有踩坑爬起後的頓悟,如果能有人一起聊聊,該多好。

為此,我建了一個小花園——我的微信公眾號「[努力的小鄭]」。

這裏沒有高深莫測的理論堆砌,只有我對後端開發、系統設計和工程實踐的持續思考與沉澱。它更像我的數字筆記本,記錄着那些值得被記住的解決方案和思維火花。

如果你覺得今天的文章還有一點啓發,或者單純想找一個同行者偶爾聊聊技術、談談思考,那麼,歡迎你來坐坐。
85f114bceb12e933bb817ec5fecdfef7

願你前行路上,總有代碼可寫,有夢可追,也有燈火可親。

user avatar
0 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.