博客 / 詳情

返回

DTS按業務場景批量遷移阿里雲MySQL表實戰(下):遷移管理平台設計與實現

本文是 DTS按業務場景批量遷移阿里雲MySQL表實戰(上):技術選型和API對接 的後續,應用狀態模式,完成業務系統中的遷移模塊。DTS的對接方式可參考前文。

遷移管理平台設計與實現

完成DTS API對接後,就需要考慮如何將DTS和業務系統有機結合實現整套的遷移流程。

出於信息安全角度的考慮,本文刪除了大量涉及實際業務的實現代碼。

業務約束

從業務出發,最好的體驗肯定是用户無感的,即遷移完成後,確認新舊錶數據一致,直接切換到新表查詢。

如果遷移期間,用户對舊錶進行了寫入,新表可能會少數據,不能貿然切換,要做數據的對比。如果用户一直在寫入,就要一直反覆的對比、確認,有增量數據就要刪除新表重新遷移,流程複雜。

和業務方溝通,得知對方可以接受:禁止寫入正在遷移的公司的表,向用户報錯,等遷移完成後再恢復使用。
這樣流程就簡單多了,開始遷移時,將舊錶重命名增加特殊的後綴,就能防止用户操作,並確保舊錶數據不發生變更。

技術校驗

如何判斷一個公司是否遷移成功?最嚴謹的方式是逐表逐行數據對比,但是在使用DTS的情況下並無必要。我採取的比較策略是,在遷移前後:

  1. 源表目標表數量相同

  2. 對應表數據量相同、數據最後更新時間(如有此列)相同

只要滿足以上要求,就認為數據是一致的。可以通過SELECT COUNT(*), MAX(updateTime) FROM table_name一次性獲取。

遷移狀態機

經過分析和簡化,遷移狀態機如下:

image

可以發現,每個狀態都可以進行“推進”和“回滾”兩個動作,很適合使用狀態模式來實現。狀態模式的實現先放一放,看看幾個基本數據結構:

遷移任務

@Data
public class TableMigrateTask {
  // 主鍵
  private Long id;
  // 公司id
  private Long companyId;
  // 原始分庫位
  private Long oldSchemaId;
  // 新分庫位
  private Long newSchemaId;
  // 任務狀態
  private Integer state;
  // DTS任務狀態,和阿里雲定義一致
  private String dtsStatus;
  // DTS任務已刪除(釋放)
  private Boolean dtsDeleted;
  // DTS實例id
  private String dtsInstanceId;
  // DTS任務id
  private String dtsJobId;
  // 失敗原因
  private String failedReason;
  // 狀態跳轉時的信息,輔助排查問題
  private String transitInfo;
  // 遷移前表數據統計,json格式,表名、數據量、最後更新時間
  private String tableStatisticsSrc;
  // 遷移表結果統計,json格式kv結構,表名-數據量
  private String tableStatisticsDest;
}

遷移上下文

遷移狀態機實際處理的對象,封裝了一些服務,可以視為領域對象(充血模型)。

@Setter
public class TableMigrateExecuteContext {

  // 持有的任務對象
  @Getter
  private TableMigrateTask tableMigrateTask;

  // 當前的狀態
  @Getter
  private TableMigrateState currentState;

  private TableMigrateTaskRepository tableMigrateTaskRepository;

  private TableMigrateQueryService tableMigrateQueryService;

  private TableMigrateService tableMigrateService;

  private TableArchiveService tableArchiveService;

  private DataSourceHolder dataSourceHolder;

  public void createInstanceAndConfigureDtsJob() {
      // 調用DTS API創建任務
  }

  public DescribeDtsJobDetailResponse queryDtsMigJob() {
      // 調用DTS API查詢
  }

  public void switchRoute(long oldSchemaId, long newSchemaId) {
      // 將分表以外的單表update為newSchemaId
  }

  public void stopDtsMigJob() {
      // 調用DTS API停止
  }

  public void updateTableMigrateTask(TableMigrateTask modifiedTask) {
       // 更新持有的任務(持久化)
  }

  /**
   * 狀態推進
   *
   * @return 返回信息,不成功時非空
   */
  public String forward() {
    return currentState.forward(this);
  }

  /** 狀態回滾 */
  public String rollback() {
    return currentState.rollback(this);
  }

  /**
   * 重命名舊錶
   *
   * @param forward true-遷移場景,加——migold,反之則不加
   * @param ignoreExited 是否忽略已存在的表,僅在初始態的回滾場景可用
   */
  public void renameOldTableNames(boolean forward, boolean ignoreExited) {
      // 注意要考慮源庫中是否存在和舊錶相同的同名表
  }


  public void updateDestTableInfo() {
     // 更新目標表的統計信息
  }

  public void archiveNewTables(List<String> needArchiveTables) {
      // 歸檔新表  
  }

  /**
   * 刪除表
   *
   * @param newTable true-新表,false-舊錶
   */
  public void deleteTables(boolean newTable) {
      // 批量執行DROP TABLE
  }

  public void deleteDtsInstanceAndJob() {
      // 調用DTS API釋放實例
  }
}

工廠類

@Component
public class TableMigrateContextFactory {

  @Resource private TableMigrateTaskRepository tableMigrateTaskRepository;
  @Resource private TableMigrateQueryService tableMigrateQueryService;
  @Resource private TableMigrateService tableMigrateService;
  @Resource private TableArchiveService tableArchiveService;
  @Resource private DataSourceHolder dataSourceHolder;

  public TableMigrateExecuteContext buildContext(long taskId) {
    TableMigrateTask task = tableMigrateTaskRepository.getById(taskId);
    if (task == null || task.getStatus() == 0) {
      throw new BizException("表遷移任務不存在或已被刪除");
    }
    TableMigrateExecuteContext context = new TableMigrateExecuteContext();
    context.setTableMigrateTask(task);
    context.setCurrentState(buildState(TableMigrateStateEnum.getByValue(task.getState())));

    // 服務注入
    context.setTableMigrateTaskRepository(tableMigrateTaskRepository);
    context.setTableMigrateQueryService(tableMigrateQueryService);
    context.setTableMigrateService(tableMigrateService);
    context.setTableArchiveService(tableArchiveService);
    context.setDataSourceHolder(dataSourceHolder);
    return context;
  }

  private TableMigrateState buildState(TableMigrateStateEnum stateEnum) {
    switch (stateEnum) {
      case INIT:
        return new MigrateInitState();
      case FAILED:
        return new MigrateFailedState();
      case PROCESSING:
        return new MigrateProcessingState();
      case NEED_SWITCH:
        return new MigrateNeedSwitchState();
      case SWITCHED:
        return new MigrateSwitchedState();
      case FINISH:
        return new MigrateFinishState();
      default:
        throw new BizException("遷移狀態非法");
    }
  }
}

遷移狀態

我在做本次的系統設計時,對狀態模式做了一些回顧和參考。遷移狀態是狀態模式的核心,從設計模式的角度來看,狀態模式“允許對象在其內部狀態改變時動態調整自身行為,使得對象的表現形態如同修改了其所屬類。”

以下是各個類的繼承關係:

image

對應的狀態如下:

類名 含義 説明
TableMigrateState 接口定義
AbstractMigrateState 狀態抽象類
AbstractFinalState 終態抽象類 終態很多操作都是不支持的,和AbstractMigrateState分開更簡潔
MigrateInitState 初始 記錄要遷移的統計數據和配置
MigrateProcessingState 遷移中 DTS進行遷移動作的狀態
MigrateNeedSwitchState 遷移完成待切換分庫位 數據已同步在新表,但還不可以通過業務功能直接訪問
MigrateSwitchedState 分庫位已切換待刪除舊錶 數據已同步在新表,且能通過業務功能直接訪問
MigrateFinishState 遷移完成 數據已同步在新表且能訪問,舊錶已刪除
MigrateFailedState 遷移失敗 回滾,舊錶恢復訪問,新表如果有則刪除

狀態接口

public interface TableMigrateState {

  /**
   * 前進到下一狀態
   *
   * @param context
   * @return 失敗的提示信息
   */
  String forward(TableMigrateExecuteContext context);

  /**
   * 回滾操作
   *
   * @param context
   * @return 失敗的提示信息
   */
  String rollback(TableMigrateExecuteContext context);

  /**
   * 獲取當前的狀態對應枚舉
   */
  TableMigrateStateEnum getState();

  /**
   * 獲取下一個狀態
   */
  TableMigrateState getNextState();

  /**
   * 獲取回滾的狀態
   */
  TableMigrateState getRollbackState();
}

狀態抽象類

public abstract class AbstractMigrateState implements TableMigrateState {

  @Override
  public String forward(TableMigrateExecuteContext context) {
    TableMigrateTask task = context.getTableMigrateTask();

    // 1. 前置校驗
    // 根據校驗結果,判斷是留在當前狀態,還是直接回滾到遷移失敗狀態

    // 2. 實際動作,由實現類完成
   // 簡單起見,在實際動作裏的異常都自動回滾

    // 3. 狀態跳轉
    transit(context, getNextState());
    return null;
  }

  @Override
  public String rollback(TableMigrateExecuteContext context) {
    TableMigrateTask task = context.getTableMigrateTask();
    // 1. 當前狀態校驗
    checkCurrentState(context);

    // 2. 回滾操作,如果發生異常,保持在當前狀態

    // 3. 狀態跳轉
    transit(context, getRollbackState());
    return null;
  }

  /**
   * 前置校驗
   *
   * @param context
   */
  protected PreCheckResult preCheck(TableMigrateExecuteContext context) {
    return checkCurrentState(context);
  }

  protected PreCheckResult checkCurrentState(TableMigrateExecuteContext context) {
      // 檢查當前狀態是否符合預期,構造檢查結果
  }

  /**
   * 改變當前執行上下文狀態, 不做其他的業務操作
   *
   * @param context
   * @param nextState
   */
  private void transit(TableMigrateExecuteContext context, TableMigrateState nextState) {
    TableMigrateTask task = context.getTableMigrateTask();
    task.setState(nextState.getState().getValue());
    context.updateTableMigrateTask(task);
    context.setCurrentState(nextState);
  }
  
  protected abstract void doForward(TableMigrateExecuteContext context);

  /**
   * 回滾操作
   *
   * <p>需要保證冪等,如果單個回滾操作失敗,可以重複執行
   *
   * @param context
   */
  protected abstract void doRollback(TableMigrateExecuteContext context);

  /**
   * 舊錶更名
   *
   * @param context
   * @param forward true-遷移場景,舊錶加後綴; false-回滾場景,舊錶刪除後綴
   */
  protected void renameOldTableNames(
      TableMigrateExecuteContext context, boolean forward, boolean ignoreExited) {
    context.renameOldTableNames(forward, ignoreExited);
  }

  /**
   * 刪除新表
   *
   * <p>新表的刪除,最好不要共用這個方法
   *
   * @param context
   */
  protected void deleteNewTables(TableMigrateExecuteContext context) {
    context.deleteTables(true);
  }

  /** 前置校驗結果 */
  @Data
  @AllArgsConstructor
  public static class PreCheckResult {
    /** 中斷,需要回滾 */
    public static final int ABORT = -1;

    /** 校驗通過 */
    public static final int PASS = 0;

    /** 校驗不通過,保持原有狀態 */
    public static final int NOT_PASS = 1;

    private int code;

    private String msg;

    public static PreCheckResult buildPass() {
      return new PreCheckResult(PASS, null);
    }

    public boolean isPass() {
      return this.code == PASS;
    }
  }
}

終態抽象類

public abstract class AbstractFinalState implements TableMigrateState {
  @Override
  public String forward(TableMigrateExecuteContext context) {
    return "當前狀態【" + getState().getValue() + " " + getState().getDes() + "】已是終態,不能進行下一步操作";
  }

  @Override
  public String rollback(TableMigrateExecuteContext context) {
    return "當前狀態【" + getState().getValue() + " " + getState().getDes() + "】已是終態,不能進行撤銷操作";
  }

  @Override
  public TableMigrateState getNextState() {
    throw new BizException("當前狀態" + getState().getValue() + "已是終態,沒有後續狀態可跳轉");
  }

  @Override
  public TableMigrateState getRollbackState() {
    throw new BizException("當前狀態" + getState().getValue() + "已是終態,沒有後續撤銷態可跳轉");
  }
}

初始

public class MigrateInitState extends AbstractMigrateState {
  @Override
  protected void doForward(TableMigrateExecuteContext context) {
    TableMigrateTask task = context.getTableMigrateTask();
    if (task.getOldSchemaId().equals(task.getNewSchemaId())) {
      throw new BizException("遷移前後的分庫位id相同");
    }

    // 1. 舊錶更名, 直接阻止後續的變更
    // 歸檔不影響RENAME
    renameOldTableNames(context, true, false);

    // 2. 創建DTS任務並回寫到task字段
    // 創建失敗則直接拋異常,回滾
    // 此處DTS任務是直接提交執行的,並不能確定當前實際是哪個狀態,因此狀態留空
    context.createInstanceAndConfigureDtsJob();
  }
  
  @Override
  protected void doRollback(TableMigrateExecuteContext context) {
    // 初始態回滾時,舊錶可能還沒有更名
    renameOldTableNames(context, false, true);
  }

  @Override
  public TableMigrateStateEnum getState() {
    return TableMigrateStateEnum.INIT;
  }

  @Override
  public TableMigrateState getNextState() {
    return new MigrateProcessingState();
  }

  @Override
  public TableMigrateState getRollbackState() {
    return new MigrateFailedState();
  }
}

遷移中

public class MigrateProcessingState extends AbstractMigrateState {

  /** DTS-未初始化的狀態 */
  private static final Set<String> DTS_NOT_INIT =
      Sets.newHashSet(
          DtsJobStatusEnum.NOT_STARTED.getCode(), DtsJobStatusEnum.NOT_CONFIGURED.getCode());

  /** DTS-處理中的狀態 */
  private static final Set<String> DTS_PROCESSING =
      Sets.newHashSet(
          DtsJobStatusEnum.PRECHECKING.getCode(),
          DtsJobStatusEnum.PRECHECK_PASS.getCode(),
          DtsJobStatusEnum.INITIALIZING.getCode(),
          DtsJobStatusEnum.SYNCHRONIZING.getCode(),
          DtsJobStatusEnum.MIGRATING.getCode(),
          DtsJobStatusEnum.SUSPENDING.getCode(),
          DtsJobStatusEnum.MODIFYING.getCode(),
          DtsJobStatusEnum.RETRYING.getCode(),
          DtsJobStatusEnum.UPGRADING.getCode(),
          DtsJobStatusEnum.DOWNGRADING.getCode(),
          DtsJobStatusEnum.LOCKED.getCode());

  /** DTS-失敗的狀態 */
  private static final Set<String> DTS_FAILED =
      Sets.newHashSet(
          DtsJobStatusEnum.PRECHECK_FAILED.getCode(),
          DtsJobStatusEnum.INITIALIZE_FAILED.getCode(),
          DtsJobStatusEnum.FAILED.getCode(),
          DtsJobStatusEnum.MIGRATION_FAILED.getCode());

  @Override
  public PreCheckResult preCheck(TableMigrateExecuteContext context) {
    checkCurrentState(context);
    // 校驗DTS任務已完成
    TableMigrateTask task = context.getTableMigrateTask();
    DescribeDtsJobDetailResponse dtsJobDetailResponse = context.queryDtsMigJob();
    if (dtsJobDetailResponse == null || dtsJobDetailResponse.getBody() == null) {
      return new PreCheckResult(
          PreCheckResult.NOT_PASS,
          String.format(
              "DTS任務結果查詢失敗, 返回結果為空。instanceId=%s, jobId=%s",
              task.getDtsInstanceId(), task.getDtsJobId()));
    }
    DescribeDtsJobDetailResponseBody responseBody = dtsJobDetailResponse.getBody();
    if (BooleanUtils.isNotTrue(responseBody.getSuccess())) {
      return new PreCheckResult(
          PreCheckResult.NOT_PASS,
          String.format(
              "DTS任務結果查詢失敗, 接口調用結果為失敗。instanceId=%s, jobId=%s",
              task.getDtsInstanceId(), task.getDtsJobId()));
    }
    DtsJobStatusEnum dtsJobStatusEnum = DtsJobStatusEnum.getByCode(responseBody.getStatus());
    if (dtsJobStatusEnum == null) {
      return new PreCheckResult(
          PreCheckResult.NOT_PASS,
          String.format(
              "DTS任務狀態非法,請稍後重試。instanceId=%s, jobId=%s, 阿里雲狀態=%s",
              task.getDtsInstanceId(), task.getDtsJobId(), responseBody.getStatus()));
    }
    task.setDtsStatus(dtsJobStatusEnum.getCode());
    if (DTS_NOT_INIT.contains(dtsJobStatusEnum.getCode())) {
      return new PreCheckResult(
          PreCheckResult.NOT_PASS,
          String.format(
              "DTS任務狀態異常,尚未初始化。instanceId=%s, jobId=%s, 阿里雲狀態=%s",
              task.getDtsInstanceId(), task.getDtsJobId(), responseBody.getStatus()));
    }
    if (DTS_PROCESSING.contains(dtsJobStatusEnum.getCode())) {
      return new PreCheckResult(
          PreCheckResult.NOT_PASS,
          String.format(
              "DTS任務仍在處理中。instanceId=%s, jobId=%s, 阿里雲狀態=%s",
              task.getDtsInstanceId(), task.getDtsJobId(), responseBody.getStatus()));
    }
    if (DTS_FAILED.contains(dtsJobStatusEnum.getCode())) {
      return new PreCheckResult(
          PreCheckResult.ABORT,
          String.format(
              "DTS任務執行失敗。instanceId=%s, jobId=%s, 阿里雲狀態=%s",
              task.getDtsInstanceId(), task.getDtsJobId(), responseBody.getStatus()));
    }
    if (StringUtils.equals(DtsJobStatusEnum.FINISHED.getCode(), dtsJobStatusEnum.getCode())) {
      return PreCheckResult.buildPass();
    }
    return new PreCheckResult(
        PreCheckResult.NOT_PASS,
        String.format(
            "DTS任務狀態非法。instanceId=%s, jobId=%s, 阿里雲狀態=%s",
            task.getDtsInstanceId(), task.getDtsJobId(), responseBody.getStatus()));
  }

  @Override
  protected void doForward(TableMigrateExecuteContext context) {
    // 已在校驗時更新任務狀態,並根據DTS執行狀態判斷要不要回滾

    // 1. 將遷移後的表,按照原表的狀態進行歸檔

    // 2. 寫入遷移後的表統計信息
    context.updateDestTableInfo();
  }

  @Override
  protected void doRollback(TableMigrateExecuteContext context) {
    // 1. 嘗試中止DTS任務,對已完成的DTS任務調用不會拋異常

    // 2. 新表刪除
    deleteNewTables(context);

    // 3. 舊錶名稱還原
    renameOldTableNames(context, false, false);
  }

  @Override
  public TableMigrateStateEnum getState() {
    return TableMigrateStateEnum.PROCESSING;
  }

  @Override
  public TableMigrateState getNextState() {
    return new MigrateNeedSwitchState();
  }

  @Override
  public TableMigrateState getRollbackState() {
    return new MigrateFailedState();
  }
}

遷移完成待切換分庫位

public class MigrateNeedSwitchState extends AbstractMigrateState {

  @Override
  protected void doForward(TableMigrateExecuteContext context) {
    // 單表路由切換
    context.switchRoute(
        context.getTableMigrateTask().getOldSchemaId(),
        context.getTableMigrateTask().getNewSchemaId());
  }

  @Override
  protected void doRollback(TableMigrateExecuteContext context) {
    // 1. 新表刪除
    deleteNewTables(context);

    // 2. 舊錶名稱還原
    renameOldTableNames(context, false, false);

    // 3. 單表和路由恢復
    // 可能已經在forward時做過,因此也做復原
    context.switchRoute(
        context.getTableMigrateTask().getNewSchemaId(),
        context.getTableMigrateTask().getOldSchemaId());
  }

  @Override
  public TableMigrateStateEnum getState() {
    return TableMigrateStateEnum.NEED_SWITCH;
  }

  @Override
  public TableMigrateState getNextState() {
    return new MigrateSwitchedState();
  }

  @Override
  public TableMigrateState getRollbackState() {
    return new MigrateFailedState();
  }
}

分庫位已切換待刪除舊錶

public class MigrateSwitchedState extends AbstractMigrateState {

  @Override
  protected void doForward(TableMigrateExecuteContext context) {
    // 刪除舊錶
    context.deleteTables(false);
  }

  @Override
  protected void doRollback(TableMigrateExecuteContext context) {
    // 1. 新表刪除
    deleteNewTables(context);

    // 2. 舊錶名稱還原
    renameOldTableNames(context, false, false);

    // 3. 單表和路由恢復
    context.switchRoute(
        context.getTableMigrateTask().getNewSchemaId(),
        context.getTableMigrateTask().getOldSchemaId());
  }

  @Override
  public TableMigrateStateEnum getState() {
    return TableMigrateStateEnum.SWITCHED;
  }

  @Override
  public TableMigrateState getNextState() {
    return new MigrateFinishState();
  }

  @Override
  public TableMigrateState getRollbackState() {
    return new MigrateFailedState();
  }
}

遷移完成

public class MigrateFinishState extends AbstractFinalState {
  @Override
  public TableMigrateStateEnum getState() {
    return TableMigrateStateEnum.FINISH;
  }
}

遷移失敗

public class MigrateFailedState extends AbstractFinalState {
  @Override
  public TableMigrateStateEnum getState() {
    return TableMigrateStateEnum.FAILED;
  }
}

操作流程

  1. 創建遷移任務TableMigrateTask

  2. 通過工廠類,使用任務id創建包含遷移任務TableMigrateTask的遷移上下文TableMigrateExecuteContext

  3. 調用TableMigrateExecuteContext的forward()推進狀態、rollback()回滾狀態

查詢相關功能實現

DTS任務列表查詢

因為已經把DTS的相關字段持久化了,可以通過業務系統相關的遷移任務表實現分頁查詢。

不過在“遷移中”跳轉到“遷移完成待切換分庫位”的過程中,DTS也會經歷多個狀態,典型的有Prechecking、Migrating、Finished等(見DescribeDtsJobDetail_數據傳輸_API文檔的Status字段説明),可以通過接口獲取最新的狀態並寫入遷移任務表。

如何查詢待遷移的表?

回顧一下,要遷移的表分以下三種形式:

  • 後綴是公司id,如table_companyId

  • 後綴是 公司_年份,如table_company_year

  • 後綴是業務id,如table_bizId

對於同一個前綴,以companyId=123為例,第一、三種表都可以精確匹配:

  • 第一種表每個公司只有一張,比如table_a_123、table_b_123;

  • 第三種表每個bizId同一個公司只有一張,比如bizId可以取1、2,那麼會存在table_c_1、table_c_2、table_d_1、table_d_2,並且,bizId是有限的,可以從一張bizId_table表獲取所有可選值。

  • 對於第二種表year的取值範圍,雖然可以類似bizId一樣去找,但是並沒有直接的關係表。

我想到了兩種方案,最後選擇了第二種。

SHOW TABLES LIKE查詢指定前綴

這是最先考慮到的方案,比較直接,而且在開發、測試環境中運行良好。但是在線上就不行了,將所有表查詢一次要數分鐘,調用早已超時。我想這應該和線上環境表數量過多導致元數據獲取變慢有關,每次查詢需要上百ms,累計耗時長達數分鐘。

DatabaseMetaData一次性獲取所有表後過濾

可以通過DataSource的元數據,一次性獲取數據源對應庫的所有的表,再將表名進行過濾。經過測試,10W級數據獲取全部表的時間在3~7S之間,和方案一相比快很多。

以下代碼片段展示瞭如何獲取所有表名,忽略異常處理。

String physicalSchemaName = dataSourceHolder.getPhysicalSchemaName(logicalSchemaName);
HikariDataSource dataSource = dataSourceHolder.getDataSourceByPhysicalSchemaName(physicalSchemaName);
try (Connection connection = dataSource.getConnection()) {
    DatabaseMetaData metaData = connection.getMetaData();
    ResultSet catalogs = metaData.getCatalogs();
    while (catalogs.next()) {
        String tableCat = catalogs.getString("TABLE_CAT");
        if (!StringUtils.equals(tableCat, physicalSchemaName)) {
            // 判斷庫名是否一致。如果多個庫實際上在同一個RDS實例,元數據實際上是這些庫的,而非單個庫的
            continue;
        }

        // 獲取指定數據庫中的所有表名
        ResultSet tableResultSet =
            metaData.getTables(physicalSchemaName, null, "%", new String[] {"TABLE"});
        int count = 0;
        while (tableResultSet.next()) {
          count++;
          String physicalTableName = tableResultSet.getString("TABLE_NAME");
          // 確定是否是要查的表,判斷邏輯此處省略【注1】
         }
        LoggerUtils.info("本次共查詢了" + count + "個表的元數據");
      }
      LoggerUtils.info(
          "獲取" + physicalSchemaName + "庫的所有表元數據總耗時:" + (System.currentTimeMillis() - t1) + "ms");
    } catch (SQLException e) {
      LoggerUtils.error("獲取分庫元數據失敗, 發生SQL異常", e);
      throw new BizException("獲取分庫元數據失敗, 發生SQL異常", e);
    }
}

對數據庫中每個表判定它是否是當前公司的表,對於第一、三種表,可以將後者放到一個HashSet中,每次循環時對比;對於第二種表,字符串前綴匹配無疑要花大量的時間。

為了加速前綴匹配,可以使用經典的數據結構——前綴匹配樹。前綴匹配樹的代碼如下:

public class StringPrefixTrie {
  private final Node root = new Node();

  static class Node {
    boolean isEnd;
    Map<Character, Node> children = new HashMap<>();
  }

  /**
   * 增加一個待匹配的模式
   *
   * @param pattern
   */
  public void addPattern(String pattern) {
    Node current = root;
    for (char c : pattern.toCharArray()) {
      current = current.children.computeIfAbsent(c, k -> new Node());
    }
    current.isEnd = true;
  }

  /**
   * 是否滿足前綴匹配
   *
   * @param str
   * @return
   */
  public String match(String str) {
    Node current = root;
    for (int i = 0; i < str.length(); i++) {
      current = current.children.get(str.charAt(i));
      if (current == null) {
        // 沒有匹配到任何前綴
        return null;
      }
      if (current.isEnd) {
        // 返回匹配到的任意一個前綴
        return str.substring(0, i + 1);
      }
    }
    return null;
  }
}

對所有需要前綴匹配的表的前綴調用addPattern(),在循環中先判斷是否滿足前綴匹配,再判斷精準匹配即可。

後續規劃

  1. 批量遷移功能,將遷移批量化、自動化:

    1. 批量撈取公司,判斷是否需要遷移、遷移成本。理想情況下,數據量少但表多的公司,是遷移到其他庫的最佳候選,大大降低源庫的表量又節約了複製數據的時間

    2. 多個公司id提交、創建任務、狀態流轉

  2. 自動校驗遷移是否成功

user avatar
0 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.