數據,是有保質期的。正如冰箱裏的牛奶,今天新鮮,明天可能就有點酸,後天直接倒掉。數據的價值,也會隨着時間的推移而遞減。
過去那些“老派”的 OLAP 系統,只能批量處理賬目,對實時性要求高的“流水賬”就力不從心了。它們在面對高併發實時寫入和複雜的分析查詢時,常常會露出疲態,數據延遲、查詢性能、併發處理和數據更新等問題層出不窮。
當所有人都焦慮於如何讓數據“快”起來的時候,Apache Doris 在底層邏輯上進行了一系列顛覆性的技術迭代,能快速接入各種數據源,並且擁有強大的實時更新能力,讓你的數據從產生的那一刻起,就具備了“快”的生命力。
到底 Doris 是怎麼做到,讓數據流動得如此低延遲?這正是我們接下來要深挖的“冰山之下”。
實時更新的挑戰
在實現實時更新的過程中,系統需要應對多個方面的挑戰,這些挑戰直接關係到系統在實際業務場景中的穩定性與性能表現:
- 數據延遲:實時更新的核心在於“快”。數據從產生到可查詢的過程必須儘可能短,實際生產中要求在 5-10 秒可見,理想情況下甚至要求低於 1 秒可見。同時,還需具備足夠的寫入吞吐能力,保證在併發寫入場景下也能穩定運行。
- 查詢性能:一邊持續高頻地接收數據更新,一邊還能保持百毫秒級別的查詢響應,對底層系統架構提出了極高要求。如何在更新密集的情況下,仍然提供快速、穩定的查詢體驗,是實時 OLAP 系統必須解決的問題。
- 併發處理:實時分析場景多面向終端用户,不僅查詢操作需要支持高併發,同時寫入也常常是併發的。傳統的表或者分區級別的寫衝突處理機制影響範圍較大,會影響數據寫入效率與業務體驗。理想狀態下,系統應允許用户制定衝突處理策略,從而提高數據接入的靈活性與可控性。
- 數據流維護與易用性保障:實時數據流的維護受多項複雜因素的影響,例如 TP 系統通過 CDC 捕獲的刪除操作以及 Schema 變更帶來的下游兼容性問題。同時,在鏈路重啓或容災恢復時,如何確保數據既不重複和不丟失,這對數據一致性的要求非常高。
這些挑戰正是評估一個 OLAP 系統是否真正具備實時更新能力的關鍵指標。
常見方案對比
在面臨數據更新的上述挑戰時,市面上的常用方案通常涉及三個關鍵點,分別是表達方式、更新實現和衝突解決,這些方案各有其適用場景。
- 在表達方式上, Snowflake、Redshift、Iceberg、Databricks 和 Hudi 通常使用 MERGE INTO 來處理數據更新,這要求變更數據必須先落盤成為 MERGE 的數據源,因此可能帶來一定的數據延遲和額外的 I/O 開銷。相比之下,Doris 和 ClickHouse 採用更加輕量的方式,通過特定列值表示刪除操作,使得寫入和刪除可以統一在同一數據流中處理,更加契合實時處理需求,尤其適用於 OLTP 類事務變更、訂單或賬單狀態更新等場景。
-
在更新實現方面, 業界常見的方案有四種:
- **Copy on Write**:在寫入時找出需要更新的文件,讀取並結合新的更新,生成新文件再寫入。這種方式優化了讀取性能,但在寫入時會顯著增加 I/O 開銷,特別是在隨機更新情況下,會引發大量讀寫 I/O,限制了實時更新能力。此方案的典型產品包括 Redshift、ClickHouse、Snowflake、Iceberg 和 Hudi。
- **Merge on Read**:在寫入時僅需添加新數據,讀取時再合併新舊數據,類似於 LSM Tree。這種方式優化了寫入性能,但查詢效率較低,難以滿足某些實時場景的查詢延遲要求,典型產品包括 Iceberg、Hudi 和 Doris Merge on Read Unique 表。
- **Delete bitmap / deletion vector:** 標記刪除的實現方案。在寫入時標記文件中被刪除的數據,並寫入刪除標記及新數據,查詢時跳過刪除標記的數據行。此方式既能避免 Copy on Write 的 I/O 放大效應,也獲得了 Copy on Write 的查詢性能。但是對於沒有主鍵索引的實現,生成刪除標記(Delete Bitmap / Delete Vector)時 I/O 和 CPU 消耗較大,效率低下,難以滿足高頻實時寫入場景。
- **Delete bitmap / deletion vector + primary index:** 標記刪除與主鍵索引結合的方式。主鍵索引能夠降低標記刪除時的查詢 I/O 和 CPU 消耗,使高頻實時更新成為可能。Doris 的 Merge on Write Unique 表採用了這種實現方式。
- 在衝突解決方面, 經典的寫寫衝突會導致寫入無法並行,從而顯著降低寫入吞吐量。Doris 提供了基於業務語義的衝突機制,可很好避免該問題(參考文檔)。而 Redshift、Snowflake、Iceberg 和 Hudi 等則採用了文件級別的衝突處理,因而不具備實時更新的能力。
Apache Doris 作為一款為實時分析場景打造的高性能 MPP 分析型數據庫,具備強大的數據寫入能力、亞秒級查詢性能以及出色的併發處理能力,因此成為構建面向用户的實時數據服務的優選方案。基於上述常見方案,本文將詳細拆解 Apache Doris 實時更新技術的核心設計,揭示其如何實現“極低延遲”的數據流動性。
為什麼 Apache Doris 實時更新更具優勢?
傳統 OLAP 數據庫主要用於批量分析,數據更新週期通常以小時甚至天為單位,適合以報表為主的內部系統。然而,隨着業務及數據服務的多樣化發展,越來越多的分析應用開始面向終端用户,要求亞秒級的查詢延遲和秒級的數據更新。 在此背景下,要求實時分析數據庫能夠應對高速寫入數據(每秒百萬級別的數據導入)、並在大規模場景下提供實時查詢。Apache Doris 憑藉其主鍵模型、數據延遲、查詢性能、併發處理、易用性等多方面特性的表現,在分析領域展現了獨特的實時更新能力。
01 主鍵模型
Doris 提供了主鍵表,確保數據主鍵的唯一性,支持基於主鍵的 upsert 語義。以下是一個以 user_id 主鍵的表的創建示例:
CREATE TABLE IF NOT EXISTS example_tbl_unique
(
user_id LARGEINT NOT NULL,
user_name VARCHAR(50) NOT NULL,
city VARCHAR(20),
age SMALLINT,
sex TINYINT
)
UNIQUE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 10;
在這個表中,初始數據包括四行: 101、102、103、104 。當新寫入 101、102 之後,表中的數據仍然保持是四行,但原先 101 和 102 的數據會被更新。
參考文檔:主鍵模型的導入更新
02 數據延遲
Doris 提供強一致性語義,確保數據寫入後立即可見,從而滿足低延遲的實時數據更新需求。數據組織使用 LSM tree 的方式組織,寫入操作採用刪除標記(Delete Bitmap)方式,相較於傳統的 Copy-on-Write 機制,能夠顯著減少 I/O 操作,提高寫入效率。這一設計不僅降低了存儲空間的浪費,還減輕了系統的負擔,從而提供了更高效的數據處理能力。
此外,Doris 利用主鍵索引優化了在更新數據時定位和檢索相歷史數據的性能,進一步提升了寫入速度並降低了資源消耗。通過這些設計,Doris 實現了綜合性的低延遲數據更新,能夠提供 1s 以下的數據延遲,滿足高效實時分析和快速數據響應的需求。
03 查詢性能
在更新場景下,Doris 採用標記刪除(Delete Bitmap)方式加速查詢性能。與 Merge-on-Read 的實現相比,標記刪除能夠避免在查詢時進行大量的刪除邏輯計算,從而減少查詢延遲並提升整體性能,確保查詢響應時間低於百毫秒,並支持高併發訪問。
此外,Doris 基於以下幾項技術,進一步提升了查詢性能:
- 分區和分桶裁剪技術: 智能跳過無關數據,進一步優化數據掃描過程,減少不必要的數據讀取,顯著提高查詢效率。
- 向量化技術: 在處理大規模數據時,通過批量化處理多個數據操作,減少 CPU 的上下文切換,顯著提升數據處理速度,尤其適用於大數據量的查詢場景。
- 優化器: 通過智能的查詢計劃選擇和執行,自動根據查詢條件調整最佳執行路徑,避免不必要的計算開銷,進一步提高查詢響應速度。
- 豐富的索引:包括點查索引和跳數索引。點查索引常用於加速點查,包括前綴索引和倒排索引,原理是通過索引定位到滿足 WHERE 條件的有哪些行,直接讀取那些行。跳數索引常用於加速分析,包括 ZoneMap 索引、BloomFilter 索引、NGram BloomFilter 索引,原理是通過索引確定不滿足 WHERE 條件的數據塊,跳過這些不滿足條件的數據塊,只讀取可能滿足條件的數據塊並再進行一次逐行過濾,最終得到滿足條件的行。
這些技術的結合使得 Doris 在高併發環境下能夠保持穩定的低延遲,確保其在秒級和毫秒級查詢性能上表現出色,滿足實時數據處理的嚴格要求。
04 併發處理
Doris 主鍵表支持應用語義處理衝突,在高併發亂序寫入時能夠保證數據的最終一致性。建表時,可以通過指定 SEQUENCE COLUMN 來自定義 MVCC 的衝突處理邏輯,Doris 的寫入負載均衡機制優先選擇 SEQUENCE 列較大的行。這一機制不僅適用於寫入衝突,還同樣適用於存量數據。
CREATE TABLE test.test_table
(
user_id bigint,
date date,
group_id bigint,
modify_date date,
keyword VARCHAR(128)
)
UNIQUE KEY(user_id, date, group_id)
DISTRIBUTED BY HASH (user_id) BUCKETS 32
PROPERTIES(
"function_column.sequence_col" = 'modify_date',
"replication_num" = "1",
"in_memory" = "false"
);
例如,在 OLTP 表中,modify_date 字段每次更新時都會設置為當前時間。在將 OLTP 數據庫的 CDC 同步到 Doris 時,可以將 modify_date 指定為 SEQUENCE 列。這樣,具有較大 modify_date 的數據行將生效,而如果後寫入的數據 modify_date 較小,則存量數據不會被更新。這一機制使得實時數據同步的衝突處理變得非常簡單,同時不影響寫入效率。
參考文檔:[主鍵模型的更新併發控制
](https://doris.apache.org/zh-CN/docs/data-operate/update/uniqu...)
05 易用性
- 首先,Doris 確保每次數據寫入的一致性和完整性,保證在高併發和實時更新環境中,數據始終保持一致並立即可見。結合標記刪除機制,Doris 使數據更新更加高效,減少了存儲開銷,並提升了查詢性能。
- 其次,Doris 還支持在線 Schema 變更,允許動態調整表結構,從而簡化數據流的維護,避免複雜的數據遷移過程。同時,靈活的列更新功能使數據更新更為高效,特別是在頻繁更新部分數據時,避免了全表更新帶來的性能開銷。
- 最後,Doris 支持隱藏列標記刪除方式,即為每個 Unique 表生成隱藏的
DORIS_DELETE_SIGN列,利用該標誌直接進行刪除操作,避免了傳統的複雜刪除步驟,提升了系統性能。同時,Doris 還支持將 SEQUENCE 列與刪除標誌結合使用,確保過期數據的刪除不會影響新數據,簡化了實時數據流中的更新與刪除操作。
受益於寫入原子性、強一致性語義,以及靈活的在線 Schema 變更和列更新機制等機制,Doris 能夠在高併發和實時更新場景中高效處理數據,簡化開發工作,並提升系統的響應速度和可靠性。
參考文檔:基於導入的批量刪除
生態融合
Doris 提供豐富的 API 和連接器,方便與現有的數據處理工具和框架(如 Spark、Flink、Kafka)進行集成,增強了生態靈活性,使得 Doris 能夠為用户提供更加強大的數據處理能力,適應多樣化的業務需求和技術環境。
01 Kafka
Kafka Connect 是一款可擴展、可靠的在 Apache Kafka 和其他系統之間進行數據傳輸的工具,可以定義 Connectors 將大量數據遷入遷出 Kafka,並通過 Doris Kafka Connector 將上游 topic 中的數據讀取後寫入到 Doris 中。
在 Kafka Connect 集羣上新增一個 Doris Sink 的 Connector,示例如下:
詳細步驟參考文檔
curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{
"name":"test-doris-sink-cluster",
"config":{
"connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
"topics":"topic_test",
"doris.topic2table.map": "topic_test:test_kafka_tbl",
"buffer.count.records":"50000",
"buffer.flush.time":"120",
"buffer.size.bytes":"5000000",
"doris.urls":"10.10.10.1",
"doris.user":"root",
"doris.password":"",
"doris.http.port":"8030",
"doris.query.port":"9030",
"doris.database":"test_db",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"org.apache.kafka.connect.json.JsonConverter"
}
}'
02 Flink
Apache Flink 是一個框架和分佈式處理引擎,用於在無界和有界數據流上進行有狀態的計算。可以使用 Flink Doris Connector 將上游的數據,比如 Kafka、MySQL 等產生的數據,實時寫入至 Doris。
使用 Flink 自帶的 DataGen 模擬數據寫入 Doris 中,示例如下:
具體步驟參考文檔
SET 'execution.checkpointing.interval' = '30s';
CREATE TABLE student_source (
id INT,
name STRING,
age INT
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.name.length' = '20',
'fields.id.min' = '1',
'fields.id.max' = '100000',
'fields.age.min' = '3',
'fields.age.max' = '30'
);
-- doris sink
CREATE TABLE student_sink (
id INT,
name STRING,
age INT
)
WITH (
'connector' = 'doris',
'fenodes' = '127.0.0.1:8030',
'table.identifier' = 'test.student',
'username' = 'root',
'password' = 'password',
'sink.label-prefix' = 'doris_label'
);
INSERT INTO student_sink SELECT * FROM student_source;
03 Spark Structured Streaming
Structured Streaming 是一個構建在 Spark SQL 引擎之上的可擴展、容錯的流處理引擎。藉助 Structured Streaming,可以高效地讀取上游數據源,並通過 Spark Doris Connector ,以 Stream Load 的方式將數據實時寫入 Doris,實現端到端的流式數據處理流程。
使用 Spark 自帶的 rate 數據源模擬數據寫入 Doris 中,示例如下:
完整代碼參考: https://github.com/apache/doris-spark-connector/blob/master/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/example/DorisWriteStreamExample.scala
val spark = SparkSession.builder()
.appName("RateSourceExample")
.master("local[1]")
.getOrCreate()
val rateStream = spark.readStream
.format("rate")
.option("rowsPerSecond", 10)
.load()
rateStream.writeStream
.format("doris")
.option("checkpointLocation", "/tmp/checkpoint")
.option("doris.table.identifier", "db.table")
.option("doris.fenodes", "127.0.0.1:8030")
.option("user", "root")
.option("password", "")
.start()
.awaitTermination()
spark.stop();
實時數據分析最佳實踐
用户案例 1:中通快遞
隨着中通快遞業務的持續增長,昔日雙 11 的業務高峯現已成為每日常態,原有數據架構在數據時效性、查詢效率、與維護成本方面,均面臨着較大的挑戰。為此,中通快遞引入 SelectDB,藉助其高效的數據更新、低延時的實時寫入與優異的查詢性能,在快遞業務實時分析場景、BI 報表與離線分析場景、高併發分析場景中均進行了應用實踐。
在實時分析場景中,基於 SelectDB 靈活豐富的 SQL 函數公式、高吞吐量的計算能力,實現了結果表的查詢加速, 能夠達到每秒上 2K+ 數量級的 QPS 併發查詢,數據報表更新及時度大大提高。
SelectDB 的引入滿足了複雜與簡單的實時分析需求。目前,SelectDB 日處理數據超過 6 億條,數據總量超過 45 億條,字段總量超過 200 列,並實現服務器資源節省 2/3、查詢時長從 10 分鐘降至秒級的數十倍提升。
用户案例 2:招聯金融
招聯金融(全稱“招聯消費金融股份有限公司”)旗下擁有“好期貸”“信用付”兩大消費金融產品體系,為用户提供全線上、免擔保、低利率的普惠消費信貸服務。早期採用 Lambda 架構,包含 ClickHouse、Spark、Impala、Hive、Kudu、Vertica 等,受限於運維依賴性高、資源利用率低、數據時效性低、併發能力弱等諸多問題。引入 Apache Doris 進行架構升級後,實現了高效實時分析、架構簡化、混合部署與彈性伸縮等多項目標。
在客羣篩選分析場景中,之前使用 Vertica 計算引擎處理 2.4 億條數據耗時 30-60 分鐘,替換為 Doris 後用時降至 5 分鐘,性能提升 6 倍以上,並且 Doris 作為開源數據庫,相比商業化產品 Vertica 有顯著的成本優勢。
結束語
以上就是 Apache Doris 在分析領域的實時更新能力詳細介紹。在主鍵表方面,Doris 支持易用的 UPSERT 語義,結合主鍵索引和標記刪除機制,確保了優異的寫入性能和低延遲的查詢性能。此外,用户自定義的衝突解決機制進一步提升了實時寫入的併發能力,快速的 Schema 變更功能則避免了實時數據流的中斷。列更新及靈活的列更新選項為更廣泛的實時場景提供了便捷支持。
展望未來,我們將在以下幾方面重點投入:
- 降低數據可見性延遲,以實現更加實時的數據訪問體驗;
- 提升生態工具在自動調整 Schema 方面的能力,並擴展 Light Schema 的適用範圍;
- 更加靈活的列更新,為用户提供更加高效、靈活的數據管理能力。