博客 / 詳情

返回

開源數據集成平台SeaTunnel:MySQL實時同步到es

一、前言

  • 最近,項目有幾個表要從 MySQL 實時同步到 另一個 MySQL,也有同步到 ElasticSearch 的。
  • 目前,公司生產環境同步,用的是 阿里雲的 DTS,每個同步任務每月 500多元,有點小貴。
  • 其他環境:MySQL同步到ES,用的是 CloudCanal,不支持 數據轉換,添加同步字段比較麻煩,社區版限制5個任務,不夠用;MySQL同步到MySQL,用的是 debezium,不支持寫入 ES。
  • 恰好3年前用過 SeaTunnel 的 前身 WaterDrop,那就開始吧。本文以 2.3.1 版本,Ubuntu 系統為例

二、開源數據集成平台SeaTunnel

1. 簡介

  • SeaTunnel 是 Apache 軟件基金會下的一個高性能開源大數據集成工具,為數據集成場景提供靈活易用、易擴展並支持千億級數據集成的解決方案。
  • Seaunnel 為實時(CDC)和批量數據提供高性能數據同步能力,支持十種以上數據源,已經在B站、騰訊雲、字節等數百家公司使用。
  • 可以選擇 SeaTunnel Zeta 引擎上運行,也可以在 Apache Flink 或 Spark 引擎上運行。
    seatunnel-architecture.png

2. 安裝

  • 下載,這裏選擇 2.3.1 版本,執行 tar -xzvf apache-seatunnel-*.tar.gz 解壓縮
  • 因為 2.3.2 版本,MySQL-CDC 找不到驅動,bug修復詳見

    Caused by: java.sql.SQLException: No suitable driver
          at java.sql/java.sql.DriverManager.getDriver(DriverManager.java:298)
          at com.zaxxer.hikari.util.DriverDataSource.<init>(DriverDataSource.java:106)
          ... 20 more
    
          ... 11 more
    
          at org.apache.seatunnel.engine.client.job.ClientJobProxy.waitForJobComplete(ClientJobProxy.java:122)
          at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:181)

3. 安裝 connectors 插件

  • 執行 bash bin/install-plugin.sh,國內建議先配置 maven 鏡像,不然容易失敗 或者 慢
  • 官方文檔寫着執行 sh bin/install-plugin.sh,我在 Ubuntu 20.04.2 LTS 上執行報錯(bin/install-plugin.sh: 54: Bad substitution),我提了PR
    seatunnel-install-connectors-error.png

4. 編寫配置文件

  • config 目錄下,新建配置文件:如 mysql-es-test.conf
  • 添加 env 配置
    因為是 實時同步,這裏 job.mode = "STREAMING",execution.parallelism 是 併發數

    env {
    # You can set flink configuration here
    execution.parallelism = 1
    job.mode = "STREAMING"
    checkpoint.interval = 2000
    #execution.checkpoint.interval = 10000
    #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
    }
  • MySQL 實時同步,需開啓 binlog
  • 添加 數據源 配置
    result_table_name 取個 臨時表名,便於後續使用。table-names 必須是 數據庫.表名,base-url 必須指定 數據庫。
    startup.mode 默認是 INITIAL,先同步歷史數據,後增量同步,詳情點擊

    source {
    MySQL-CDC {
      result_table_name = "t1"
      server-id = 5656
      username = "root"
      password = "pwd"
      table-names = ["db.t1"]
      base-url = "jdbc:mysql://host:3306/db"
    }
    }
  • 添加 轉換 配置,sql 比較靈活。
    函數列表請點擊

    transform {
    Sql {
      source_table_name = "t1"
      query = "SELECT id, alias_name aliasName FROM t1 WHERE c1 = '1'"
    }
    }
  • 添加 輸出 配置
    CDC 實時同步 es,必須配置 primary_keys

    sink {
      Elasticsearch {
          hosts = ["host:9200"]
          username = "elastic"
          password = "pwd"
    
          index = "index_t1"
          # cdc required options
          primary_keys = ["id"]
      }
    }
  • 最終配置截圖
    seatunnel-mysql-cdc-es.png

5. 啓動任務

這裏以 本地模式為例,另有 集羣、spark、flink 模式。

./bin/seatunnel.sh -e local --config ./config/mysql-es-test.conf

三、總結

  • 開源數據集成平台SeaTunnel 能夠比較方便的進行 MySQL 實時同步到 es 等,免費,還方便添加 同步字段。更多強大功能,請看官方文檔。
  • 新版本自帶 同步引擎,不用依賴 spark、flink 等運行,降低了 小數據量同步場景 部署複雜度
  • 新版本開始提供 UI界面,目前強依賴 調度平台 Apache DolphinScheduler

本文遵守【CC BY-NC】協議,轉載請保留原文出處及本版權聲明,否則將追究法律責任。
本文首先發佈於 https://www.890808.xyz/ ,其他平台需要審核更新慢一些。

javalover123

user avatar
0 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.