博客 / 詳情

返回

Apache Doris 支持 Arrow Flight SQL 協議,數據傳輸效率實現百倍飛躍

近年來,隨着數據科學、數據湖分析等場景的興起,對數據讀取和傳輸速度提出更高的要求。而 JDBC/ODBC 作為與數據庫交互的主流標準,在應對大規模數據讀取和傳輸時顯得力不從心,無法滿足高性能、低延遲等數據處理需求。為提供更高效的數據傳輸方案,Apache Doris 在 2.1 版本中基於 Arrow Flight SQL 協議實現了高速數據傳輸鏈路,使得數據傳輸性能實現百倍飛躍

基於 Arrow Flight SQL 的高速數據傳輸鏈路

在 Apache Doris 中,查詢結果以列存格式的 Block 組織。在之前版本中,如需將這些數據通過 MySQL Client 或 JDBC/ODBC 驅動傳輸至目標客户端時,需要先將 Block 序列化為行存格式的 Bytes,如果目標客户端是類似 Pandas 的列存數據科學組件或列存數據庫,還需將行存格式的 Bytes 再反序列化為列存格式,而序列化/反序列化操作是一個非常耗時的過程。

在 Apache Doris 2.1 版本 中,我們基於 Arrow Flight SQL 構建了高速數據傳輸鏈路,它支持主流語言通過 SQL 從 Doris 高速讀取大規模數據,極大提升了其他系統與 Apache Doris 間數據傳輸效率。若目標客户端同樣支持 Arrow 列存格式,整體傳輸過程將完全避免序列化/反序列化操作,徹底消除因此帶來時間及性能損耗。此外,依賴於 Arrow Flight 多節點和多核架構特性,實現了數據傳輸的完全並行化,極大提高了數據吞吐能力。

以 Python 讀取 Apache Doris 中數據為例,Apache Doris 先將列存的 Block 快速轉換為列存的 Arrow RecordBatch,隨後在 Python 客户端中,將 Arrow RecordBatch 轉換為同樣列存的 Pandas DataFrame 中,轉換速度極快,保障了數據傳輸的時效性。

基於 Arrow Flight SQL 的高速數據傳輸鏈路.png

不僅如此,Arrow Flight SQL 還提供了通用的 JDBC 驅動,支持與同樣遵循 Arrow Flight SQL 協議的數據庫無縫交互,這不僅增強了 Apache Doris 的兼容性,還為其拓展了更廣泛的應用場景。

性能測試

為了直觀地展示引入 Arrow Flight SQL 後對數據傳輸性能的提升效果,我們特地對 Python 使用 Pymysql、Pandas 以及 Arrow Flight SQL 這三種方式讀取 Apache Doris 中數據的耗時進行了對比。測試數據集如下:

性能測試-數據集.png

分別使用 Pymysql、Pandas、Arrow Flight SQL 對不同類型數據的傳輸進行了測試,測試結果如下:

性能測試-傳輸測試.png

從測試結果來看,Arrow Flight SQL 在所有列類型的傳輸上都展現出了顯著的性能優勢。在絕大多數讀取場景中,Arrow Flight SQL 的性能提升超 20 倍,而在部分場景中甚至實現了百倍的性能飛躍,為大數據處理和分析提供了強有力的保障。

性能測試.png

使用介紹

Apache Doris 支持 Arrow Flight SQL 後,我們得以利用 Python 的 ADBC Driver 輕鬆連接 Doris,實現數據的極速讀取。接下來,我們將使用 Python(版本要求 >= 3.9)的 ADBC Driver 執行一系列常見的數據庫語法操作,包括 DDL、DML、設置 Session 變量以及 Show 語句等。

01 安裝 Library

Library 被髮布在 PyPI,可通過以下方式簡單安裝:

pip install adbc_driver_manager
pip install adbc_driver_flightsql

在代碼中import 以下模塊/庫來使用已安裝的 Library:

import adbc_driver_manager
import adbc_driver_flightsql.dbapi as flight_sql

02 連接 Doris

創建與 Doris Arrow Flight SQL 服務交互的客户端。需提供 Doris FE 的 Host、Arrow Flight Port 、登陸用户名以及密碼,並進行以下配置。

修改 Doris FE 和 BE 的配置參數:

  • 修改fe/conf/fe.confarrow_flight_sql_port為一個可用端口,如 9090。
  • 修改 be/conf/be.confarrow_flight_port為一個可用端口,如 9091。

假設 Doris 實例中 FE 和 BE 的 Arrow Flight SQL 服務將分別在端口 9090 和 9091 上運行,且 Doris 用户名/密碼為“user”/“pass”,那麼連接過程如下所示:

conn = flight_sql.connect(uri="grpc://127.0.0.1:9090", db_kwargs={
            adbc_driver_manager.DatabaseOptions.USERNAME.value: "user",
            adbc_driver_manager.DatabaseOptions.PASSWORD.value: "pass",
        })
cursor = conn.cursor()

連接完成後,可以通過 SQL 使返回的 Cursor 與 Doris 交互,執行例如建表、獲取元數據、導入數據、查詢等操作。

03 建表與獲取元數據

將 Query 傳遞給 cursor.execute()函數,執行建表與獲取元數據操作:

cursor.execute("DROP DATABASE IF EXISTS arrow_flight_sql FORCE;")
print(cursor.fetchallarrow().to_pandas())

cursor.execute("create database arrow_flight_sql;")
print(cursor.fetchallarrow().to_pandas())

cursor.execute("show databases;")
print(cursor.fetchallarrow().to_pandas())

cursor.execute("use arrow_flight_sql;")
print(cursor.fetchallarrow().to_pandas())

cursor.execute("""CREATE TABLE arrow_flight_sql_test
    (
         k0 INT,
         k1 DOUBLE,
         K2 varchar(32) NULL DEFAULT "" COMMENT "",
         k3 DECIMAL(27,9) DEFAULT "0",
         k4 BIGINT NULL DEFAULT '10',
         k5 DATE,
    )
    DISTRIBUTED BY HASH(k5) BUCKETS 5
    PROPERTIES("replication_num" = "1");""")
print(cursor.fetchallarrow().to_pandas())

cursor.execute("show create table arrow_flight_sql_test;")
print(cursor.fetchallarrow().to_pandas())

如果 StatusResult返回 0 ,則説明 Query 執行成功(這樣設計的原因是為了兼容 JDBC)。

  StatusResult
0            0

  StatusResult
0            0

                   Database
0         __internal_schema
1          arrow_flight_sql
..                      ...
507             udf_auth_db

[508 rows x 1 columns]

  StatusResult
0            0

  StatusResult
0            0
                   Table                                       Create Table
0  arrow_flight_sql_test  CREATE TABLE `arrow_flight_sql_test` (\n  `k0`...

04 導入數據

執行 INSERT INTO,向所創建表中導入少量測試數據:

cursor.execute("""INSERT INTO arrow_flight_sql_test VALUES
        ('0', 0.1, "ID", 0.0001, 9999999999, '2023-10-21'),
        ('1', 0.20, "ID_1", 1.00000001, 0, '2023-10-21'),
        ('2', 3.4, "ID_1", 3.1, 123456, '2023-10-22'),
        ('3', 4, "ID", 4, 4, '2023-10-22'),
        ('4', 122345.54321, "ID", 122345.54321, 5, '2023-10-22');""")
print(cursor.fetchallarrow().to_pandas())

如下所示則證明導入成功:

  StatusResult
0            0

如果需要導入大批量數據到 Doris,可以使用 pydoris 執行 Stream Load 來實現。

05 執行查詢

接着對上面導入的表進行查詢查詢,包括聚合、排序、Set Session Variable 等操作。

cursor.execute("select * from arrow_flight_sql_test order by k0;")
print(cursor.fetchallarrow().to_pandas())

cursor.execute("set exec_mem_limit=2000;")
print(cursor.fetchallarrow().to_pandas())

cursor.execute("show variables like \"%exec_mem_limit%\";")
print(cursor.fetchallarrow().to_pandas())

cursor.execute("select k5, sum(k1), count(1), avg(k3) from arrow_flight_sql_test group by k5;")
print(cursor.fetchallarrow().to_pandas())

結果如下所示:

   k0            k1    K2                k3          k4          k5
0   0       0.10000    ID       0.000100000  9999999999  2023-10-21
1   1       0.20000  ID_1       1.000000010           0  2023-10-21
2   2       3.40000  ID_1       3.100000000      123456  2023-10-22
3   3       4.00000    ID       4.000000000           4  2023-10-22
4   4  122345.54321    ID  122345.543210000           5  2023-10-22

[5 rows x 6 columns]

  StatusResult
0            0

    Variable_name Value Default_Value Changed
0  exec_mem_limit  2000    2147483648       1

           k5  Nullable(Float64)_1  Int64_2 Nullable(Decimal(38, 9))_3
0  2023-10-22         122352.94321        3            40784.214403333
1  2023-10-21              0.30000        2                0.500050005

[2 rows x 5 columns]

06 完整代碼

# Doris Arrow Flight SQL Test

# step 1, library is released on PyPI and can be easily installed.
# pip install adbc_driver_manager
# pip install adbc_driver_flightsql
import adbc_driver_manager
import adbc_driver_flightsql.dbapi as flight_sql

# step 2, create a client that interacts with the Doris Arrow Flight SQL service.
# Modify arrow_flight_sql_port in fe/conf/fe.conf to an available port, such as 9090.
# Modify arrow_flight_port in be/conf/be.conf to an available port, such as 9091.
conn = flight_sql.connect(uri="grpc://127.0.0.1:9090", db_kwargs={
            adbc_driver_manager.DatabaseOptions.USERNAME.value: "root",
            adbc_driver_manager.DatabaseOptions.PASSWORD.value: "",
        })
cursor = conn.cursor()

# interacting with Doris via SQL using Cursor
def execute(sql):
    print("\n### execute query: ###\n " + sql)
    cursor.execute(sql)
    print("### result: ###")
    print(cursor.fetchallarrow().to_pandas())

# step3, execute DDL statements, create database/table, show stmt.
execute("DROP DATABASE IF EXISTS arrow_flight_sql FORCE;")
execute("show databases;")
execute("create database arrow_flight_sql;")
execute("show databases;")
execute("use arrow_flight_sql;")
execute("""CREATE TABLE arrow_flight_sql_test
    (
         k0 INT,
         k1 DOUBLE,
         K2 varchar(32) NULL DEFAULT "" COMMENT "",
         k3 DECIMAL(27,9) DEFAULT "0",
         k4 BIGINT NULL DEFAULT '10',
         k5 DATE,
    )
    DISTRIBUTED BY HASH(k5) BUCKETS 5
    PROPERTIES("replication_num" = "1");""")
execute("show create table arrow_flight_sql_test;")


# step4, insert into
execute("""INSERT INTO arrow_flight_sql_test VALUES
        ('0', 0.1, "ID", 0.0001, 9999999999, '2023-10-21'),
        ('1', 0.20, "ID_1", 1.00000001, 0, '2023-10-21'),
        ('2', 3.4, "ID_1", 3.1, 123456, '2023-10-22'),
        ('3', 4, "ID", 4, 4, '2023-10-22'),
        ('4', 122345.54321, "ID", 122345.54321, 5, '2023-10-22');""")


# step5, execute queries, aggregation, sort, set session variable
execute("select * from arrow_flight_sql_test order by k0;")
execute("set exec_mem_limit=2000;")
execute("show variables like \"%exec_mem_limit%\";")
execute("select k5, sum(k1), count(1), avg(k3) from arrow_flight_sql_test group by k5;")

# step6, close cursor 
cursor.close()

大規模數據傳輸場景應用示例

01 Python

在 Python 中,通過 ADBC Driver 連接到已支持 Arrow Flight SQL 的 Doris 後,可以使用多種 ADBC API 從 Doris 加載 Clickbench 數據集到 Python。具體如下:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import adbc_driver_manager
import adbc_driver_flightsql.dbapi as flight_sql
import pandas
from datetime import datetime

my_uri = "grpc://0.0.0.0:`fe.conf_arrow_flight_port`"
my_db_kwargs = {
    adbc_driver_manager.DatabaseOptions.USERNAME.value: "root",
    adbc_driver_manager.DatabaseOptions.PASSWORD.value: "",
}
sql = "select * from clickbench.hits limit 1000000;"

# PEP 249 (DB-API 2.0) API wrapper for the ADBC Driver Manager.
def dbapi_adbc_execute_fetchallarrow():
    conn = flight_sql.connect(uri=my_uri, db_kwargs=my_db_kwargs)
    cursor = conn.cursor()
    start_time = datetime.now()
    cursor.execute(sql)
    arrow_data = cursor.fetchallarrow()
    dataframe = arrow_data.to_pandas()
    print("\n##################\n dbapi_adbc_execute_fetchallarrow" + ", cost:" + str(datetime.now() - start_time) + ", bytes:" + str(arrow_data.nbytes) + ", len(arrow_data):" + str(len(arrow_data)))
    print(dataframe.info(memory_usage='deep'))
    print(dataframe)

# ADBC reads data into pandas dataframe, which is faster than fetchallarrow first and then to_pandas.
def dbapi_adbc_execute_fetch_df():
    conn = flight_sql.connect(uri=my_uri, db_kwargs=my_db_kwargs)
    cursor = conn.cursor()
    start_time = datetime.now()
    cursor.execute(sql)
    dataframe = cursor.fetch_df()    
    print("\n##################\n dbapi_adbc_execute_fetch_df" + ", cost:" + str(datetime.now() - start_time))
    print(dataframe.info(memory_usage='deep'))
    print(dataframe)

# Can read multiple partitions in parallel.
def dbapi_adbc_execute_partitions():
    conn = flight_sql.connect(uri=my_uri, db_kwargs=my_db_kwargs)
    cursor = conn.cursor()
    start_time = datetime.now()
    partitions, schema = cursor.adbc_execute_partitions(sql)
    cursor.adbc_read_partition(partitions[0])
    arrow_data = cursor.fetchallarrow()
    dataframe = arrow_data.to_pandas()
    print("\n##################\n dbapi_adbc_execute_partitions" + ", cost:" + str(datetime.now() - start_time) + ", len(partitions):" + str(len(partitions)))
    print(dataframe.info(memory_usage='deep'))
    print(dataframe)

dbapi_adbc_execute_fetchallarrow()
dbapi_adbc_execute_fetch_df()
dbapi_adbc_execute_partitions()

執行結果如下(忽略重複輸出),從 Doris 加載 100 萬行 105 列 780M 的 Clickbench 數據集,僅用時 3s

##################
 dbapi_adbc_execute_fetchallarrow, cost:0:00:03.548080, bytes:784372793, len(arrow_data):1000000
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1000000 entries, 0 to 999999
Columns: 105 entries, CounterID to CLID
dtypes: int16(48), int32(19), int64(6), object(32)
memory usage: 2.4 GB
None
        CounterID   EventDate               UserID            EventTime              WatchID  JavaEnable                                              Title  GoodEvent  ...  UTMCampaign  UTMContent  UTMTerm  FromTag  HasGCLID          RefererHash              URLHash  CLID
0          245620  2013-07-09  2178958239546411410  2013-07-09 19:30:27  8302242799508478680           1  OWAProfessionov — Мой Круг (СВАО Интернет-магазин          1  ...                                                    0 -7861356476484644683 -2933046165847566158     0
999999       1095  2013-07-03  4224919145474070397  2013-07-03 14:36:17  6301487284302774604           0  @дневники Sinatra (ЛАДА, цена для деталли кто ...          1  ...                                                    0  -296158784638538920  1335027772388499430     0

[1000000 rows x 105 columns]

##################
 dbapi_adbc_execute_fetch_df, cost:0:00:03.611664
##################
 dbapi_adbc_execute_partitions, cost:0:00:03.483436, len(partitions):1
##################
 low_level_api_execute_query, cost:0:00:03.523598, stream.address:139992182177600, rows:-1, bytes:784322926, len(arrow_data):1000000
##################
 low_level_api_execute_partitions, cost:0:00:03.738128streams.size:3, 1, -1

02 JDBC

Arrow Flight SQL 協議的開源 JDBC 驅動兼容標準的 JDBC API,可用於大多數 BI 工具通過 JDBC 訪問 Doris,並支持高速傳輸 Apache Arrow 數據。使用方法與通過 MySQL 協議的 JDBC 驅動連接 Doris 類似,只需將鏈接 URL 中的jdbc:mysql 換成 jdbc:arrow-flight-sql,查詢返回的結果依然是 JDBC 的 ResultSet 數據結構。

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;

Class.forName("org.apache.arrow.driver.jdbc.ArrowFlightJdbcDriver");
String DB_URL = "jdbc:arrow-flight-sql://0.0.0.0:9090?useServerPrepStmts=false"
        + "&cachePrepStmts=true&useSSL=false&useEncryption=false";
String USER = "root";
String PASS = "";

Connection conn = DriverManager.getConnection(DB_URL, USER, PASS);
Statement stmt = conn.createStatement();
ResultSet resultSet = stmt.executeQuery("show tables;");
while (resultSet.next()) {
    String col1 = resultSet.getString(1);
    System.out.println(col1);
}

resultSet.close();
stmt.close();
conn.close();

03 JAVA

與 Python 類似,JAVA 也可以直接創建 ADBC Client 讀取 Doris 中數據。在這過程中,首先需獲取 FlightInfo,隨後連接每一個 Endpoint 拉取數據。

// method one
AdbcStatement stmt = connection.createStatement()
stmt.setSqlQuery("SELECT * FROM " + tableName)
// executeQuery, two steps:
// 1. Execute Query and get returned FlightInfo;
// 2. Create FlightInfoReader to sequentially traverse each Endpoint;
QueryResult queryResult = stmt.executeQuery()


// method two
AdbcStatement stmt = connection.createStatement()
stmt.setSqlQuery("SELECT * FROM " + tableName)
// Execute Query and parse each Endpoint in FlightInfo, and use the Location and Ticket to construct a PartitionDescriptor
partitionResult = stmt.executePartitioned();
partitionResult.getPartitionDescriptors()
//Create ArrowReader for each PartitionDescriptor to read data
ArrowReader reader = connection2.readPartition(partitionResult.getPartitionDescriptors().get(0).getDescriptor()))

04 Spark

對於 Spark,除了可以通過 JDBC 和 JAVA 方式連接 Flight SQL Server 外,還可以使用開源的 Spark-Flight-Connector ,該組件支持 Spark 作為 Client 讀寫 Flight SQL Server。其原因是 Arrow 數據格式與 Doris 中的 Block 數據格式的轉換速度非常快,相較於 CSV 與 Block 格式之間的轉換,其速度提升了 10 倍之多,並且 Arrow 數據格式對 Map、Array 等複雜類型的支持也更加出色。

結束語

目前,已有多家社區企業用户驗證並使用 Arrow Flight SQL 從 Doris 加載數據到 Python、Spark、Flink,測試結果説明,該方式的讀取速度相較於以往有了顯著的提升。未來,Apache Doris 計劃支持 Arrow Flight SQL 寫入,屆時由主流編程語言構建的系統均可藉助 ADBC 客户端來讀寫 Doris,實現高速的數據交互;並計劃利用 Arrow Flight 的並行化能力實現多 BE 並行讀取,還可以藉助 Arrow Flight SQL 實現 Doris 和 Doris、 Spark 和 Doris 之間的聯邦查詢。

user avatar
0 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.