博客 / 詳情

返回

最佳實踐 | 在 EMR Serverless Spark 中實現 Doris 讀寫操作

背景信息

EMR Serverless Spark 是一款面向 Data+AI 的高性能 Lakehouse 產品。它為企業提供了一站式的數據平台服務,包括任務開發、調試、調度和運維等,極大地簡化了數據處理和模型訓練的全流程。同時,它100%兼容開源 Spark 生態,能夠無縫集成到客户現有的數據平台。使用 EMR Serverless Spark,企業可以更專注於數據處理分析和模型訓練調優,提高工作效率。

Apache Doris是一個高性能、實時的分析型數據庫,能夠較好地滿足報表分析、即席查詢、數據湖聯邦查詢加速等使用場景。更多信息,請參見Apache Doris 簡介

基於Apache Doris官方提供的Spark Connector,EMR Serverless Spark可以在開發時添加對應的配置來連接Doris。通過結合Apache Doris與EMR Serverless Spark,您可以高效地進行數據讀取、寫入和分析操作,從而實現端到端的數據處理流程。

EMR Serverless Spark 新用户可 免費領取 1000 CU*小時 資源包,歡迎體驗。

前提條件

  • 已創建Serverless Spark工作空間,詳情請參見創建工作空間
  • 已創建Doris集羣。

如果是在EMR on ECS創建包含Doris服務的數據分析(OLAP)集羣,詳情請參見創建集羣。本文以在EMR on ECS創建包含Doris服務的集羣為例,後續簡稱EMR Doris集羣。

使用限制

EMR Serverless Spark引擎的版本要求為esr-2.5.0、esr-3.1.0、esr-4.1.0及以上版本。

操作流程

步驟一:獲取Doris Spark Connector JAR並上傳至OSS

您需要查閲Doris的官方文檔 Spark Doris Connector。該文檔通常會列出不同版本的連接器與不同版本的 Spark 引擎的兼容情況。您需要確認您正在使用的 Spark 版本與 Doris Spark Connector 版本之間的兼容性。

  1. 訪問Doris Spark Connector的GitHub倉庫,選擇合適的版本進行下載。

Doris Spark Connector JAR包的命名格式為 spark-doris-connector-spark-${spark_version}-${connector_version}.jar。例如,您使用的引擎版本為esr-3.1.0 (Spark 3.4.3, Scala 2.12),則可以下載 spark-doris-connector-spark-3.4-24.0.0.jar

  1. 將下載的Spark Connector JAR上傳至阿里雲OSS中,上傳操作可以參見簡單上傳**

步驟二:創建網絡連接

EMR Serverless Spark需要能夠打通與EMR Doris集羣之間的網絡才可以正常訪問Doris服務。更多網絡連接信息,請參見EMR Serverless Spark與其他VPC間網絡互通

重要:配置安全組規則時,端口範圍請根據實際需求選擇性開放必要的端口。端口範圍的取值為1~65535。本文示例需開啓HTTP 端口(8031)、RPC 端口(9061)以及Webserver端口(8041)。

步驟三:在EMR Doris集羣中創建庫表

  1. 使用SSH方式登錄集羣,詳情請參見登錄集羣
  2. 執行以下命令,連接EMR Doris集羣。
mysql -h127.0.0.1  -P 9031 -uroot
  1. 創建數據庫和表。`
    `
CREATE DATABASE IF NOT EXISTS testdb;
USE testdb;
CREATE TABLE test (    id INT,     name STRING) PROPERTIES("replication_num" = "1");

4. 插入測試數據。

INSERT INTO test VALUES (1, 'a'), (2, 'b'), (3, 'c');

5. 查詢數據。

SELECT * FROM test;

返回信息如下圖所示。

步驟四:EMR Serverless Spark讀取Doris表

使用 SQL 會話讀 Doris 表
  1. 創建SQL會話,詳情請參見管理SQL會話

創建會話時,在引擎版本下拉列表中選擇與Doris Spark Connector版本對應的引擎版本,在網絡連接中選擇步驟二中創建好的網絡連接,並在Spark配置中添加以下參數來加載Doris Spark Connector。

spark.user.defined.jars  oss://<bucketname>/path/connector.jar

其中,oss://<bucketname>/path/connector.jar為您步驟一中上傳至OSS的Doris Spark Connector的路徑。例如,oss://emr-oss/spark/spark-doris-connector-spark-3.4-24.0.0.jar

  1. 數據開發頁面,選擇創建一個SQL > SparkSQL類型的任務,然後在右上角選擇創建好的SQL會話。

更多操作,請參見SparkSQL開發(鏈接:https://x.sm.cn/BJPDwLp)

  1. 拷貝如下代碼到新增的SparkSQL頁籤中,並根據需要修改相應的參數信息,然後單擊運行
CREATE TEMPORARY VIEW testUSING dorisOPTIONS(  "table.identifier" = "testdb.test",  "fenodes" = "<doris_address>:<http_port>",  "user" = "<user>",  "password" = "<password>");
SELECT * FROM test;

其中,涉及參數信息説明如下。

參數 描述 示例
testdb.test Doris服務中實際的數據庫和表名。 1. 如果您使用的是其他Doris集羣,請根據實際情況填寫相應的配置。<br/>2. 如果您使用的是EMR on ECS中創建的集羣,則填寫如下參數值:<br/>+ testdb.test:本文以testdb.test為例。 <br/>+ <doris_address>:您可以在EMR on ECS控制枱Doris集羣的節點管理頁面,單擊emr-master前的圖標,查看內網IP地址。 <br/>+ <http_port>:默認為8031。 <br/>+ <user>:默認用户名為root <br/>+ <password>:默認密碼為空。
<doris_address> Doris服務所在的節點內網IP地址。
<http_port> Doris服務監聽HTTP請求的端口號。
<user> 用於連接Doris服務的用户名。
<password> 用於連接Doris服務的用户密碼。

如果能夠正常返回數據,則表明配置正確。

使用 Notebook 會話讀 Doris 表
  1. 創建Notebook會話,詳情請參見管理Notebook會話

創建會話時,在引擎版本下拉列表中選擇與Doris Spark Connector版本對應的引擎版本,在網絡連接中選擇步驟二中創建好的網絡連接,並在Spark配置中添加以下參數來加載Doris Spark Connector。

spark.user.defined.jars  oss://<bucketname>/path/connector.jar

其中,oss://<bucketname>/path/connector.jar 為您步驟一中上傳至OSS的Doris Spark Connector的路徑。例如,oss://emr-oss/spark/spark-doris-connector-spark-3.4-24.0.0.jar

  1. 數據開發頁面,選擇創建一個Python > Notebook類型的任務,然後在右上角選擇創建的Notebook會話。

更多操作,請參見管理Notebook會話。

  1. 拷貝如下代碼到新增的Notebook頁籤中,並根據需要修改相應的參數信息,然後單擊運行
dorisSparkDF = spark.read.format("doris") \  .option("doris.table.identifier", "testdb.test") \  .option("doris.fenodes", "<doris_address>:<http_port>") \  .option("user", "<user>") \  .option("password", "<password>") \  .load()
  dorisSparkDF.show(3)

其中,涉及參數信息説明如下。

參數 描述 示例
testdb.test Doris服務中實際的數據庫和表名。 1. 如果您使用的是其他Doris集羣,請根據實際情況填寫相應的配置。<br/>2. 如果您使用的是EMR on ECS中創建的集羣,則填寫如下參數值:<br/>+ testdb.test:本文以testdb.test為例。 <br/>+ <doris_address>:您可以在EMR on ECS控制枱Doris集羣的節點管理頁面,單擊emr-master前的圖標,查看內網IP地址。 <br/>+ <http_port>:默認為8031。 <br/>+ <user>:默認用户名為root <br/>+ <password>:默認密碼為空。
<doris_address> Doris服務所在的節點內網IP地址。
<http_port> Doris服務監聽HTTP請求的端口號。
<user> 用於連接Doris服務的用户名。
<password> 用於連接Doris服務的用户密碼。

如果能夠正常返回數據,則表明配置正確。

步驟五:EMR Serverless Spark寫入Doris表

使用 SQL 會話寫 Doris 表

拷貝如下代碼到前一個步驟中新增的SparkSQL頁籤中,並根據需要修改相應的參數信息,然後單擊運行

CREATE TEMPORARY VIEW test_writeUSING dorisOPTIONS(  "table.identifier" = "testdb.test",  "fenodes" = "<doris_address>:<http_port>",  "user" = "<user>",  "password" = "<password>");
INSERT INTO test_write VALUES (4, 'd'), (5, 'e');SELECT * FROM test_write;

如果能夠返回以下數據,則表明數據寫入成功。

使用 Notebook 會話寫 Doris 表

拷貝如下代碼到前一個步驟中新增的Notebook頁籤中,並根據需要修改相應的參數信息,然後單擊運行

data = [(7, 'f'), (8, 'g')]mockDataDF = spark.createDataFrame(data, ["id", "name"])mockDataDF.write.mode("append").format("doris") \  .option("doris.table.identifier", "testdb.test") \  .option("doris.fenodes", "<doris_address>:<http_port>") \  .option("user", "<user>") \  .option("password", "<password>") \  .save()  dorisSparkDF = spark.read.format("doris") \  .option("doris.table.identifier", "testdb.test") \  .option("doris.fenodes", "<doris_address>:<http_port>") \  .option("user", "<user>") \  .option("password", "<password>") \  .load()
  dorisSparkDF.show(10)

如果能夠返回以下數據,則表明數據寫入成功。

如果您在使用 EMR Serverless Spark 的過程中遇到任何疑問,可加入釘釘羣58570004119諮詢。

user avatar
0 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.