一、前言
- 最近,項目有幾個表要從 MySQL 實時同步到 另一個 MySQL,也有同步到 ElasticSearch 的。
- 目前,公司生產環境同步,用的是 阿里雲的 DTS,每個同步任務每月 500多元,有點小貴。
- 其他環境:MySQL同步到ES,用的是 CloudCanal,不支持 數據轉換,添加同步字段比較麻煩,社區版限制5個任務,不夠用;MySQL同步到MySQL,用的是 debezium,不支持寫入 ES。
- 恰好3年前用過 SeaTunnel 的 前身 WaterDrop,那就開始吧。本文以 2.3.1 版本,Ubuntu 系統為例
二、開源數據集成平台SeaTunnel
1. 簡介
- SeaTunnel 是 Apache 軟件基金會下的一個高性能開源大數據集成工具,為數據集成場景提供靈活易用、易擴展並支持千億級數據集成的解決方案。
- Seaunnel 為實時(CDC)和批量數據提供高性能數據同步能力,支持十種以上數據源,已經在B站、騰訊雲、字節等數百家公司使用。
- 可以選擇 SeaTunnel Zeta 引擎上運行,也可以在 Apache Flink 或 Spark 引擎上運行。
2. 安裝
- 下載,這裏選擇 2.3.1 版本,執行 tar -xzvf apache-seatunnel-*.tar.gz 解壓縮
-
因為 2.3.2 版本,MySQL-CDC 找不到驅動,bug修復詳見
Caused by: java.sql.SQLException: No suitable driver at java.sql/java.sql.DriverManager.getDriver(DriverManager.java:298) at com.zaxxer.hikari.util.DriverDataSource.<init>(DriverDataSource.java:106) ... 20 more ... 11 more at org.apache.seatunnel.engine.client.job.ClientJobProxy.waitForJobComplete(ClientJobProxy.java:122) at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:181)
3. 安裝 connectors 插件
- 執行 bash bin/install-plugin.sh,國內建議先配置
maven鏡像,不然容易失敗 或者 慢 - 官方文檔寫着執行 sh bin/install-plugin.sh,我在 Ubuntu 20.04.2 LTS 上執行報錯(bin/install-plugin.sh: 54: Bad substitution),我提了PR
4. 編寫配置文件
- config 目錄下,新建配置文件:如 mysql-es-test.conf
-
添加 env 配置
因為是 實時同步,這裏 job.mode = "STREAMING",execution.parallelism 是 併發數env { # You can set flink configuration here execution.parallelism = 1 job.mode = "STREAMING" checkpoint.interval = 2000 #execution.checkpoint.interval = 10000 #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" } - MySQL 實時同步,需開啓 binlog
-
添加 數據源 配置
result_table_name 取個 臨時表名,便於後續使用。table-names 必須是 數據庫.表名,base-url 必須指定 數據庫。
startup.mode 默認是 INITIAL,先同步歷史數據,後增量同步,詳情點擊source { MySQL-CDC { result_table_name = "t1" server-id = 5656 username = "root" password = "pwd" table-names = ["db.t1"] base-url = "jdbc:mysql://host:3306/db" } } -
添加 轉換 配置,sql 比較靈活。
函數列表請點擊transform { Sql { source_table_name = "t1" query = "SELECT id, alias_name aliasName FROM t1 WHERE c1 = '1'" } } -
添加 輸出 配置
CDC 實時同步 es,必須配置 primary_keyssink { Elasticsearch { hosts = ["host:9200"] username = "elastic" password = "pwd" index = "index_t1" # cdc required options primary_keys = ["id"] } } - 最終配置截圖
5. 啓動任務
這裏以 本地模式為例,另有 集羣、spark、flink 模式。
./bin/seatunnel.sh -e local --config ./config/mysql-es-test.conf
三、總結
- 開源數據集成平台SeaTunnel 能夠比較方便的進行 MySQL 實時同步到 es 等,免費,還方便添加 同步字段。更多強大功能,請看官方文檔。
- 新版本自帶 同步引擎,不用依賴 spark、flink 等運行,降低了 小數據量同步場景 部署複雜度
- 新版本開始提供 UI界面,目前強依賴 調度平台 Apache DolphinScheduler
本文遵守【CC BY-NC】協議,轉載請保留原文出處及本版權聲明,否則將追究法律責任。
本文首先發佈於 https://www.890808.xyz/ ,其他平台需要審核更新慢一些。