目錄

  • 0、準備
  • 1、Spark 讀寫 Doris
  • 1.1 準備 Spark 環境
  • 1.2 使用 Spark Doris Connector
  • 1.2.1 SQL 方式讀寫數據
  • 1.2.2 DataFrame 方式讀寫數據(batch)
  • 1.2.3 RDD 方式讀取數據
  • 1.2.4 配置和字段類型映射
  • 1.3 使用 JDBC 的方式(不推薦)
  • 2、Flink Doris Connector
  • 2.1、準備 Flink 環境
  • 2.2 SQL 方式讀寫
  • 2.3 DataStream 讀寫
  • 2.3.1 Source
  • 2.3.2 Sink
  • 2.4 通用配置項和字段類型映射
  • 3 DataX doriswriter
  • 3.1 編譯
  • 3.2 使用
  • 3.3 參數説明
  • 4 ODBC 外部表
  • 4.1 使用方式
  • 4.2 使用 ODBC 的 MySQL 外表
  • 4.3 使用ODBC的Oracle外表
  • 5 Doris On ES
  • 5.1 原理
  • 5.2 使用方式
  • 5.2.1 Doris 中創建 ES 外表
  • 5.2.2 啓用列式掃描優化查詢速度
  • 5.2.3 探測 keyword 類型字段
  • 5.2.4 開啓節點自動發現,
  • 5.2.5 配置 https 訪問模式
  • 5.2.6 查詢用法
  • 5.3 最佳實踐
  • 5.3.1 時間類型字段使用建議
  • 5.3.2 獲取 ES 元數據字段_id

0、準備

CREATE TABLE table1
(
 siteid INT DEFAULT '10',
 citycode SMALLINT,
 username VARCHAR(32) DEFAULT '',
 pv BIGINT SUM DEFAULT '0'
)
AGGREGATE KEY(siteid, citycode, username)
DISTRIBUTED BY HASH(siteid) BUCKETS 10
PROPERTIES("replication_num" = "1");
insert into table1 values
(1,1,'jim',2),
(2,1,'grace',2),
(3,2,'tom',2),
(4,3,'bush',3),
(5,3,'helen',3);

1、Spark 讀寫 Doris

1.1 準備 Spark 環境

創建 maven 工程,編寫 pom.xml 文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd">
 <modelVersion>4.0.0</modelVersion>
 <groupId>com.atguigu.doris</groupId>
 <artifactId>spark-demo</artifactId>
 <version>1.0-SNAPSHOT</version>
 <properties>
 <scala.binary.version>2.12</scala.binary.version>
 <spark.version>3.0.0</spark.version>
 <maven.compiler.source>8</maven.compiler.source>
 <maven.compiler.target>8</maven.compiler.target>
 </properties>
 <dependencies>
 <!-- Spark 的依賴引入 -->
 <dependency>
 <groupId>org.apache.spark</groupId>
 <artifactId>sparkcore_${scala.binary.version}</artifactId>
 <scope>provided</scope>
 <version>${spark.version}</version>
 </dependency>
 <dependency>
 <groupId>org.apache.spark</groupId>
 <artifactId>sparksql_${scala.binary.version}</artifactId>
 <scope>provided</scope>
 <version>${spark.version}</version>
 </dependency>
 <dependency>
 <groupId>org.apache.spark</groupId>
 <artifactId>sparkhive_${scala.binary.version}</artifactId>
 <scope>provided</scope>
 <version>${spark.version}</version>
 </dependency>
 <!-- 引入 Scala -->
 <dependency>
 <groupId>org.scala-lang</groupId>
 <artifactId>scala-library</artifactId>
 <version>2.12.10</version>
 </dependency>
<dependency>
 <groupId>com.alibaba</groupId>
 <artifactId>fastjson</artifactId>
 <version>1.2.47</version>
 </dependency>
 <dependency>
 <groupId>mysql</groupId>
 <artifactId>mysql-connector-java</artifactId>
 <version>5.1.49</version>
 </dependency>
 <!--spark-doris-connector-->
 <dependency>
 <groupId>org.apache.doris</groupId>
 <artifactId>spark-doris-connector-3.1_2.12</artifactId>
 <!--<artifactId>spark-doris-connector-
2.3_2.11</artifactId>-->
 <version>1.0.1</version>
 </dependency>
 </dependencies>
 <build>
 <plugins>
 <!--編譯 scala 所需插件-->
 <plugin>
 <groupId>org.scala-tools</groupId>
 <artifactId>maven-scala-plugin</artifactId>
 <version>2.15.1</version>
 <executions>
 <execution>
 <id>compile-scala</id>
 <goals>
 <goal>add-source</goal>
 <goal>compile</goal>
 </goals>
 </execution>
 <execution>
 <id>test-compile-scala</id>
 <goals>
 <goal>add-source</goal>
 <goal>testCompile</goal>
 </goals>
 </execution>
 </executions>
 </plugin>
 <plugin>
 <groupId>net.alchim31.maven</groupId>
 <artifactId>scala-maven-plugin</artifactId>
 <version>3.2.2</version>
 <executions>
 <execution>
 <!-- 聲明綁定到 maven 的 compile 階段 -->
 <goals>
 <goal>compile</goal>
<goal>testCompile</goal>
 </goals>
 </execution>
 </executions>
 </plugin>
 <!-- assembly 打包插件 -->
 <plugin>
 <groupId>org.apache.maven.plugins</groupId>
 <artifactId>maven-assembly-plugin</artifactId>
 <version>3.0.0</version>
 <executions>
 <execution>
 <id>make-assembly</id>
 <phase>package</phase>
 <goals>
 <goal>single</goal>
 </goals>
 </execution>
 </executions>
 <configuration>
 <archive>
 <manifest>
 </manifest>
 </archive>
 <descriptorRefs>
 <descriptorRef>jar-withdependencies</descriptorRef>
 </descriptorRefs>
 </configuration>
 </plugin>
 <!-- <plugin>
 <groupId>org.apache.maven.plugins</groupId>
 <artifactId>maven-compiler-plugin</artifactId>
 <version>3.6.1</version>
 <!– 所有的編譯都依照 JDK1.8 –>
 <configuration>
 <source>1.8</source>
 <target>1.8</target>
 </configuration>
 </plugin>-->
 </plugins>
 </build>
</project>

1.2 使用 Spark Doris Connector

Spark Doris Connector 可以支持通過 Spark 讀取 Doris 中存儲的數據,也支持通過
Spark 寫入數據到 Doris。

1.2.1 SQL 方式讀寫數據

package com.atuigu.doris.spark
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
/**
 * TODO
 *
 * @version 1.0
 * @author cjp
 */
object SQLDemo {
 def main( args: Array[String] ): Unit = {
 val sparkConf = new SparkConf().setAppName("SQLDemo")
 .setMaster("local[*]") //TODO 要打包提交集羣執行,註釋掉
 val sparkSession = 
SparkSession.builder().config(sparkConf).getOrCreate()
 sparkSession.sql(
 """
 |CREATE TEMPORARY VIEW spark_doris
 |USING doris
 |OPTIONS(
 | "table.identifier"="test_db.table1",
 | "fenodes"="hadoop1:8030",
 | "user"="test",
 | "password"="test"
 |);
 """.stripMargin)
 //讀取數據
 // sparkSession.sql("select * from spark_doris").show()
 //寫入數據
 sparkSession.sql("insert into spark_doris 
values(99,99,'haha',5)")
 }
}

1.2.2 DataFrame 方式讀寫數據(batch)

package com.atuigu.doris.spark
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
/**
 * TODO
 *
 * @version 1.0
 * @author cjp
 */
object DataFrameDemo {
 def main( args: Array[String] ): Unit = {
 val sparkConf = new SparkConf().setAppName("DataFrameDemo")
 .setMaster("local[*]") //TODO 要打包提交集羣執行,註釋掉
 val sparkSession = 
SparkSession.builder().config(sparkConf).getOrCreate()
// 讀取數據
 // val dorisSparkDF = sparkSession.read.format("doris")
 // .option("doris.table.identifier", "test_db.table1")
 // .option("doris.fenodes", "hadoop1:8030")
 // .option("user", "test")
 // .option("password", "test")
 // .load()
 // dorisSparkDF.show()
 // 寫入數據
 import sparkSession.implicits._
 val mockDataDF = List(
 (11,23, "haha", 8),
 (11, 3, "hehe", 9),
 (11, 3, "heihei", 10)
 ).toDF("siteid", "citycode", "username","pv")
 mockDataDF.show(5)
 mockDataDF.write.format("doris")
 .option("doris.table.identifier", "test_db.table1")
 .option("doris.fenodes", "hadoop1:8030")
 .option("user", "test")
 .option("password", "test")
 //指定你要寫入的字段
// .option("doris.write.fields", "user")
 .save()
 }
}

1.2.3 RDD 方式讀取數據

package com.atuigu.doris.spark
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
/**
 * TODO
 *
 * @version 1.0
 * @author cjp
 */
object RDDDemo {
 def main( args: Array[String] ): Unit = {
 val sparkConf = new SparkConf().setAppName("RDDDemo")
 .setMaster("local[*]") //TODO 要打包提交集羣執行,註釋掉
 val sc = new SparkContext(sparkConf)
 import org.apache.doris.spark._
 val dorisSparkRDD = sc.dorisRDD(
 tableIdentifier = Some("test_db.table1"),
 cfg = Some(Map(
 "doris.fenodes" -> "hadoop1:8030",
 "doris.request.auth.user" -> "test",
 "doris.request.auth.password" -> "test"
 ))
 )
 dorisSparkRDD.collect().foreach(println)
 }
}

1.2.4 配置和字段類型映射

1)通用配置項

Key

Default Value

Comment

doris.fenodes


Doris FE http 地址,支持多個地址,使用逗號分隔

doris.table.identifier


Doris 表名,如:db1.tbl1

doris.request.retries

3

向 Doris 發送請求的重試次數

doris.request.connect.timeout.ms

30000

向 Doris 發送請求的連接超時時間

doris.request.read.timeout.ms

30000

向 Doris 發送請求的讀取超時時間

doris.request.query.timeout.s

3600

查詢 doris 的超時時間,默認值為 1 小時,-1 表示無超時限制

doris.request.tablet.size

Integer.MAX_VALUE

一個 RDD Partition 對應的Doris Tablet 個數。此數值設置越小,則會生成越多的Partition。從而提升 Spark 側的並行度,但同時會對 Doris造成更大的壓力。

doris.batch.size

1024

一次從 BE 讀取數據的最大行數。增大此數值可減少Spark 與 Doris 之間建立連接的次數。從而減輕網絡延遲所帶來的的額外時間開銷。

doris.exec.mem.limit

2147483648

單個查詢的內存限制。默認為 2GB,單位為字節

doris.deserialize.arrow.async

false

是否支持異步轉換 Arrow 格式 到 spark-doris-connector迭代所需的 RowBatch

doris.deserialize.queue.size

64

異步轉換 Arrow 格式的內部處理隊列,當doris.deserialize.arrow.async為 true 時生效

doris.write.fields


指定寫入 Doris 表的字段或者字段順序,多列之間使用逗號分隔。默認寫入時要按照 Doris 表字段順序寫入全部字段。

sink.batch.size

10000

單次寫 BE 的最大行數

sink.max-retries

1

寫 BE 失敗之後的重試次數

2)SQL 和 Dataframe 專有配置

Key

Default Value

Comment

user


訪問 Doris 的用户名

password


訪問 Doris 的密碼

doris.filter.query.in.max.count

100

謂詞下推中,in 表達式 value列表元素最大數量。超過此數量,則 in 表達式條件過濾在 Spark 側處理。

3)RDD 專有配置

Key

Default Value

Comment

doris.request.auth.user


訪問 Doris 的用户名

doris.request.auth.password


訪問 Doris 的密碼

doris.read.field


讀取 Doris 表的列名列表,多列之間使用逗號分隔

doris.filter.query


過濾讀取數據的表達式,此表達式透傳給 Doris。Doris使用此表達式完成源端數據過濾

4)Doris 和 Spark 列類型映射關係:

Doris Type

Spark Type

NULL_TYPE

DataTypes.NullType

BOOLEAN

DataTypes.BooleanType

TINYINT

DataTypes.ByteType

SMALLINT

DataTypes.ShortType

INT

DataTypes.IntegerType

BIGINT

DataTypes.LongType

FLOAT

DataTypes.FloatType

DOUBLE

DataTypes.DoubleType

DATE

DataTypes.StringType1

DATETIME

DataTypes.StringType1

BINARY

DataTypes.BinaryType

DECIMAL

DecimalType

CHAR

DataTypes.StringType

LARGEINT

DataTypes.StringType

VARCHAR

DataTypes.StringType

DECIMALV2

DecimalType

TIME

DataTypes.DoubleType

HLL

Unsupported datatype

注:Connector 中,將 DATE 和 DATETIME 映射為 String。由於 Doris 底層存儲引擎處
理邏輯,直接使用時間類型時,覆蓋的時間範圍無法滿足需求。所以使用 String 類型直接返回對應的時間可讀文本。

1.3 使用 JDBC 的方式(不推薦)

這種方式是早期寫法,Spark 無法感知 Doris 的數據分佈,會導致打到 Doris 的查詢壓力
非常大。

package com.atuigu.doris.spark
import java.util.Properties
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}
object JDBCDemo {
 def main(args: Array[String]): Unit = {
 val sparkConf = new 
SparkConf().setAppName("JDBCDemo").setMaster("local[*]")
 val sparkSession = 
SparkSession.builder().config(sparkConf).getOrCreate()
 // 讀取數據
// val df=sparkSession.read.format("jdbc")
// .option("url","jdbc:mysql://hadoop1:9030/test_db")
// .option("user","test")
// .option("password","test")
// .option("dbtable","table1")
// .load()
//
// df.show()
 // 寫入數據
 import sparkSession.implicits._
 val mockDataDF = List(
 (11,23, "haha", 8),
 (11, 3, "hehe", 9),
 (11, 3, "heihei", 10)
 ).toDF("siteid", "citycode", "username","pv")
 val prop = new Properties()
 prop.setProperty("user", "root")
 prop.setProperty("password", "123456")
 df.write.mode(SaveMode.Append)
 .jdbc("jdbc:mysql://hadoop1:9030/test_db", "table1", prop)
 }
}

2、Flink Doris Connector

Flink Doris Connector 可以支持通過 Flink 操作(讀取、插入、修改、刪除) Doris 中
存儲的數據。
Flink Doris Connector Sink 的內部實現是通過 Stream load 服務向 Doris 寫入數據, 同時
也支持 Stream load 請求參數的配置設定。
版本兼容如下:

Connector

Flink

Doris

Java

Scala

1.11.6-2.12-xx

1.11.x

0.13+

8

2.12

1.12.7-2.12-xx

1.12.x

0.13.+

8

2.12

1.13.5-2.12-xx

1.13.x

0.13.+

8

2.12

1.14.4-2.12-xx

1.14.x

0.13.+

8

2.12

2.1、準備 Flink 環境

創建 maven 工程,編寫 pom.xml 文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd">
 <modelVersion>4.0.0</modelVersion>
 <groupId>com.atguigu.doris</groupId>
 <artifactId>flink-demo</artifactId>
 <version>1.0-SNAPSHOT</version>
 <properties>
 <maven.compiler.source>8</maven.compiler.source>
 <maven.compiler.target>8</maven.compiler.target>
 <flink.version>1.13.1</flink.version>
 <java.version>1.8</java.version>
 <scala.binary.version>2.12</scala.binary.version>
 <slf4j.version>1.7.30</slf4j.version>
 </properties>
 <dependencies>
 <dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-java</artifactId>
 <version>${flink.version}</version>
 <scope>provided</scope> <!--不會打包到依賴中,只參與編譯,不
參與運行 -->
 </dependency>
 <dependency>
<groupId>org.apache.flink</groupId>
 <artifactId>flink-streamingjava_${scala.binary.version}</artifactId>
 <version>${flink.version}</version>
 <scope>provided</scope>
 </dependency>
 <dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flinkclients_${scala.binary.version}</artifactId>
 <version>${flink.version}</version>
 <scope>provided</scope>
 </dependency>
 <dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-table-plannerblink_${scala.binary.version}</artifactId>
 <version>${flink.version}</version>
 <scope>provided</scope>
 </dependency>
 <!---->
 <dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-runtimeweb_${scala.binary.version}</artifactId>
 <version>${flink.version}</version>
 <scope>provided</scope>
 </dependency>
 <dependency>
 <groupId>org.slf4j</groupId>
 <artifactId>slf4j-api</artifactId>
 <version>${slf4j.version}</version>
 <scope>provided</scope>
 </dependency>
 <dependency>
 <groupId>org.slf4j</groupId>
 <artifactId>slf4j-log4j12</artifactId>
 <version>${slf4j.version}</version>
 <scope>provided</scope>
 </dependency>
 <dependency>
 <groupId>org.apache.logging.log4j</groupId>
 <artifactId>log4j-to-slf4j</artifactId>
 <version>2.14.0</version>
 <scope>provided</scope>
 </dependency>
 <dependency>
 <groupId>mysql</groupId>
 <artifactId>mysql-connector-java</artifactId>
 <version>5.1.49</version>
 </dependency>
 <dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-statebackendrocksdb_${scala.binary.version}</artifactId>
 <version>${flink.version}</version>
 </dependency>
 <dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-sequence-file</artifactId>
 <version>${flink.version}</version>
 </dependency>
 <dependency>
 <groupId>com.ververica</groupId>
 <artifactId>flink-connector-mysql-cdc</artifactId>
 <version>2.0.0</version>
 </dependency>
 <!--flink-doris-connector-->
 <dependency>
 <groupId>org.apache.doris</groupId>
 <!--<artifactId>flink-doris-connector-
1.14_2.12</artifactId>-->
 <artifactId>flink-doris-connector-1.13_2.12</artifactId>
 <!--<artifactId>flink-doris-connector-
1.12_2.12</artifactId>-->
 <!--<artifactId>flink-doris-connector-
1.11_2.12</artifactId>-->
 <version>1.0.3</version>
 </dependency>
 </dependencies>
 <build>
 <plugins>
 <plugin>
 <groupId>org.apache.maven.plugins</groupId>
 <artifactId>maven-shade-plugin</artifactId>
 <version>3.2.4</version>
 <executions>
 <execution>
 <phase>package</phase>
 <goals>
 <goal>shade</goal>
 </goals>
 <configuration>
 <artifactSet>
 <excludes>
 
<exclude>com.google.code.findbugs:jsr305</exclude>
 <exclude>org.slf4j:*</exclude>
 <exclude>log4j:*</exclude>
 
<exclude>org.apache.hadoop:*</exclude>
</excludes>
 </artifactSet>
 <filters>
 <filter>
 <!-- Do not copy the signatures in 
the META-INF folder.
 Otherwise, this might cause 
SecurityExceptions when using the JAR. -->
 <artifact>*:*</artifact>
 <excludes>
 <exclude>META-INF/*.SF</exclude>
 <exclude>META-INF/*.DSA</exclude>
 <exclude>META-INF/*.RSA</exclude>
 </excludes>
 </filter>
 </filters>
 <transformers combine.children="append">
 <transformer 
implementation="org.apache.maven.plugins.shade.resource.ServicesR
esourceTransformer">
 </transformer>
 </transformers>
 </configuration>
 </execution>
 </executions>
 </plugin>
 </plugins>
 </build>
</project>

2.2 SQL 方式讀寫

package com.atuigu.doris.flink;
import 
org.apache.flink.streaming.api.environment.StreamExecutionEnviron
ment;
import 
org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
* TODO
*
* @author cjp
* @version 1.0
*/
public class SQLDemo {
 public static void main(String[] args) {
 StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 env.setParallelism(1);
 StreamTableEnvironment tableEnv =
StreamTableEnvironment.create(env);
 tableEnv.executeSql("CREATE TABLE flink_doris (\n" +
 " siteid INT,\n" +
" citycode SMALLINT,\n" +
 " username STRING,\n" +
 " pv BIGINT\n" +
 " ) \n" +
 " WITH (\n" +
 " 'connector' = 'doris',\n" +
 " 'fenodes' = 'hadoop1:8030',\n" +
 " 'table.identifier' = 'test_db.table1',\n" +
 " 'username' = 'test',\n" +
 " 'password' = 'test'\n" +
 ")\n");
 // 讀取數據
// tableEnv.executeSql("select * from flink_doris").print();
 // 寫入數據
 tableEnv.executeSql("insert into 
flink_doris(siteid,username,pv) values(22,'wuyanzu',3)");
 }
}

2.3 DataStream 讀寫

2.3.1 Source

package com.atuigu.doris.flink;
import org.apache.doris.flink.cfg.DorisStreamOptions;
import org.apache.doris.flink.datastream.DorisSourceFunction;
import 
org.apache.doris.flink.deserialization.SimpleListDeserializationS
chema;
import 
org.apache.flink.streaming.api.environment.StreamExecutionEnviron
ment;
import 
org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import java.util.Properties;
/**
* TODO
*
* @author cjp
* @version 1.0
*/
public class DataStreamSourceDemo {
 public static void main(String[] args) throws Exception {
 StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 env.setParallelism(1);
 Properties properties = new Properties();
 properties.put("fenodes","hadoop1:8030");
 properties.put("username","test");
 properties.put("password","test");
 properties.put("table.identifier","test_db.table1");
 env.addSource(new DorisSourceFunction(
 new DorisStreamOptions(properties),
 new SimpleListDeserializationSchema()
 )
 ).print();
 env.execute();
 }
}

2.3.2 Sink

1)Json 數據流寫法一

package com.atuigu.doris.flink;
import org.apache.doris.flink.cfg.*;
import org.apache.doris.flink.datastream.DorisSourceFunction;
import 
org.apache.doris.flink.deserialization.SimpleListDeserializationS
chema;
import 
org.apache.flink.streaming.api.environment.StreamExecutionEnviron
ment;
import java.util.Properties;
/**
* TODO
*
* @author cjp
* @version 1.0
*/
public class DataStreamJsonSinkDemo {
 public static void main(String[] args) throws Exception {
 StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 env.setParallelism(1);
 Properties pro = new Properties();
 pro.setProperty("format", "json");
 pro.setProperty("strip_outer_array", "true");
 env
 .fromElements(
 "{\"longitude\": \"116.405419\", \"city\": \"
北京\", \"latitude\": \"39.916927\"}"
 )
 .addSink(
 DorisSink.sink(
 DorisReadOptions.builder().build(),
 DorisExecutionOptions.builder()
 .setBatchSize(3)
 .setBatchIntervalMs(0L)
 .setMaxRetries(3)
 .setStreamLoadProp(pro).build(),
 DorisOptions.builder()
 .setFenodes("FE_IP:8030")
 .setTableIdentifier("db.table")
 .setUsername("root")
 .setPassword("").build()
 ));
// .addSink(
// DorisSink.sink(
// DorisOptions.builder()
// .setFenodes("FE_IP:8030")
// .setTableIdentifier("db.table")
// .setUsername("root")
// .setPassword("").build()
// ));
 env.execute();
 }
}

2)RowData 數據流

package com.atuigu.doris.flink;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.cfg.DorisSink;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import 
org.apache.flink.streaming.api.environment.StreamExecutionEnviron
ment;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.types.logical.*;
/**
* TODO
*
* @author cjp
* @version 1.0
*/
public class DataStreamRowDataSinkDemo {
 public static void main(String[] args) throws Exception {
 StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 env.setParallelism(1);
 DataStream<RowData> source = env.fromElements("")
 .map(new MapFunction<String, RowData>() {
 @Override
 public RowData map(String value) throws Exception 
{
 GenericRowData genericRowData = new 
GenericRowData(4);
 genericRowData.setField(0, 33);
 genericRowData.setField(1, new Short("3"));
 genericRowData.setField(2, 
StringData.fromString("flink-stream"));
 genericRowData.setField(3, 3L);
 return genericRowData;
 }
 });
 LogicalType[] types = {new IntType(), new SmallIntType(), 
new VarCharType(32), new BigIntType()};
 String[] fields = {"siteid", "citycode", "username", "pv"};
 source.addSink(
 DorisSink.sink(
 fields,
 types,
 DorisReadOptions.builder().build(),
 DorisExecutionOptions.builder()
 .setBatchSize(3)
 .setBatchIntervalMs(0L)
 .setMaxRetries(3)
 .build(),
 DorisOptions.builder()
 .setFenodes("hadoop1:8030")
 .setTableIdentifier("test_db.table1")
 .setUsername("test")
 .setPassword("test").build()
 ));
 env.execute();
 }
}

2.4 通用配置項和字段類型映射

1)通用配置項:

Key

Default Value

Comment

fenodes


Doris FE http 地址

table.identifier


Doris 表名,如:db1.tbl1

username


訪問 Doris 的用户名

password


訪問 Doris 的密碼

doris.request.retries

3

向 Doris 發送請求的重試次數

doris.request.connect.timeout.ms

30000

向 Doris 發送請求的連接超時時間

doris.request.read.timeout.ms

30000

向 Doris 發送請求的讀取超時時間

doris.request.query.timeout.s

3600

查詢 doris 的超時時間,默認值為 1 小時,-1 表示無超時限制

doris.request.tablet.size

Integer. MAX_VALUE

一個 Partition 對應的 Doris Tablet 個數。此數值設置越小,則會生成越多的 Partition。從而提升 Flink 側的並行度,但同時會對Doris 造成更大的壓力。

doris.batch.size

1024

一次從 BE 讀取數據的最大行數。增大此數值可減少 flink 與 Doris 之間建立連接的次數。從而減輕網絡延遲所帶來的的額外時間開銷。

doris.exec.mem.limit

2147483648

單個查詢的內存限制。默認為 2GB,單位為字節

doris.deserialize.arrow.async

false

是否支持異步轉換 Arrow 格式到 flink-dorisconnector 迭代所需的 RowBatch

doris.deserialize.queue.size

64

異步轉換 Arrow 格式的內部處理隊列,當doris.deserialize.arrow.async 為 true 時生效

doris.read.field


讀取 Doris 表的列名列表,多列之間使用逗號分隔

doris.filter.query


過濾讀取數據的表達式,此表達式透傳給Doris。Doris 使用此表達式完成源端數據過濾。

sink.batch.size

10000

單次寫 BE 的最大行數

sink.max-retries

1

寫 BE 失敗之後的重試次數

sink.batch.interval

10s

flush 間隔時間,超過該時間後異步線程將緩存中數據寫入 BE。 默認值為 10 秒,支持時間單位 ms、s、min、h 和 d。設置為 0表示關閉定期寫入。

sink.properties.*


Stream load 的導入參數例如’sink.properties.column_separator’ = ‘, ‘定義列分隔符’sink.properties.escape_delimiters’ = ‘true’特殊字符作為分隔符,’\x01’會被轉換為二進制的 0x01’sink.properties.format’ = ‘json’‘sink.properties.strip_outer_array’ = 'true’JSON 格式導入

sink.enable-delete

true

是否啓用刪除。此選項需要 Doris 表開啓批量刪除功能(0.15+版本默認開啓),只支持Uniq 模型。

sink.batch.bytes

10485760

單次寫 BE 的最大數據量,當每個 batch 中記錄的數據量超過該閾值時,會將緩存數據寫入 BE。默認值為 10MB

2)Doris 和 Flink 列類型映射關係:

Doris Type

Flink Type

NULL_TYPE

NULL

BOOLEAN

BOOLEAN

TINYINT

TINYINT

SMALLINT

SMALLINT

INT

INT

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

DATE

STRING

DATETIME

STRING

DECIMAL

DECIMAL

CHAR

STRING

LARGEINT

STRING

VARCHAR

STRING

DECIMALV2

DECIMAL

TIME

DOUBLE

HLL

Unsupported datatype

3 DataX doriswriter

DorisWriter 支持將大批量數據寫入 Doris 中。DorisWriter 通過 Doris 原生支持 Stream
load 方式導入數據, DorisWriter 會將 reader 讀取的數據進行緩存在內存中,拼接成 Json 文本,然後批量導入至 Doris。

3.1 編譯

可以自己編譯,也可以直接使用我們編譯好的包。
1)進入之前的容器環境

docker run -it \
-v /opt/software/.m2:/root/.m2 \
-v /opt/software/apache-doris-0.15.0-incubating-src/:/root/apachedoris-0.15.0-incubating-src/ \
apache/incubator-doris:build-env-for-0.15.0

2)運行 init-env.sh

cd /root/apache-doris-0.15.0-incubating-src/extension/DataX
sh init-env.sh

3)手動上傳依賴
上傳 alibaba-datax-maven-m2-20210928.tar.gz,解壓:

tar -zxvf alibaba-datax-maven-m2-20210928.tar.gz -C /opt/software

拷貝解壓後的文件到 maven 倉庫

sudo cp -r /opt/software/alibaba/datax/
/opt/software/.m2/repository/com/alibaba/

4)編譯 doriswriter:
(1)單獨編譯 doriswriter 插件:

cd /root/apache-doris-0.15.0-incubating-src/extension/DataX/DataX
mvn clean install -pl plugin-rdbms-util,doriswriter -DskipTests

(2)編譯整個 DataX 項目:

cd /root/apache-doris-0.15.0-incubating-src/extension/DataX/DataX
mvn package assembly:assembly -Dmaven.test.skip=true

產出在 target/datax/datax/.
hdfsreader, hdfswriter and oscarwriter 這三個插件需要額外的 jar 包。如果你並不需要這
些插件,可以在 DataX/pom.xml 中刪除這些插件的模塊。
5)拷貝編譯好的插件到 DataX

Sudo cp -r /opt/software/apache-doris-0.15.0-incubating-src/extension/DataX/doriswriter/target/datax/plugin/writer/dorisw
riter /opt/module/datax/plugin/writer

3.2 使用

1)準備測試表

MySQL 建表、插入測試數據
CREATE TABLE `sensor` (
 `id` varchar(255) NOT NULL,
 `ts` bigint(255) DEFAULT NULL,
 `vc` int(255) DEFAULT NULL,
 PRIMARY KEY (`id`)
)
insert into sensor values('s_2',3,3),('s_9',9,9);
Doris 建表
CREATE TABLE `sensor` (
 `id` varchar(255) NOT NULL,
 `ts` bigint(255) DEFAULT NULL,
 `vc` int(255) DEFAULT NULL
)
DISTRIBUTED BY HASH(`id`) BUCKETS 10;

2)編寫 json 文件

vim mysql2doris.json
{
 "job": {
 "setting": {
 "speed": {
 "channel": 1
 },
 "errorLimit": {
 "record": 0,
 "percentage": 0
 }
 },
 "content": [
 {
 "reader": {
 "name": "mysqlreader", 
 "parameter": {
 "column": [
 "id",
 "ts",
 "vc"
 ],
 "connection": [
 {
 "jdbcUrl": [
 "jdbc:mysql://hadoop1:3306/test"
 ], 
 "table": [
 "sensor"
 ]
 }
 ], 
 "username": "root", 
 "password": "000000"
 }
 }, 
 "writer": {
 "name": "doriswriter",
 "parameter": {
 "feLoadUrl": ["hadoop1:8030", "hadoop2:8030", 
"hadoop3:8030"],
 "beLoadUrl": ["hadoop1:8040", "hadoop2:8040", 
"hadoop3:8040"],
 "jdbcUrl": "jdbc:mysql://hadoop1:9030/",
 "database": "test_db",
 "table": "sensor",
 "column": ["id", "ts", "vc"],
 "username": "test",
 "password": "test",
 "postSql": [],
 "preSql": [],
 "loadProps": {
 },
 "maxBatchRows" : 500000,
 "maxBatchByteSize" : 104857600,
 "labelPrefix": "my_prefix",
 "lineDelimiter": "\n"
 }
 }
 }
 ]
 }
}

3)運行 datax 任務

bin/datax.py job/mysql2doris.json

3.3 參數説明

⚫ jdbcUrl
描述:Doris 的 JDBC 連接串,用户執行 preSql 或 postSQL。
必選:是
默認值:無
⚫ feLoadUrl
描述:和 beLoadUrl 二選一。作為 Stream Load 的連接目標。格式為 “ip:port”。其中
IP 是 FE 節點 IP,port 是 FE 節點的 http_port。可以填寫多個,doriswriter 將以輪詢的方式訪問。
必選:否
默認值:無
⚫ beLoadUrl
描述:和 feLoadUrl 二選一。作為 Stream Load 的連接目標。格式為 “ip:port”。其中 IP
是 BE 節點 IP,port 是 BE 節點的 webserver_port。可以填寫多個,doriswriter 將以輪詢的方式訪問。
必選:否
默認值:無
⚫ username
描述:訪問 Doris 數據庫的用户名
必選:是
默認值:無
⚫ password
描述:訪問 Doris 數據庫的密碼
必選:否
默認值:空
⚫ database
描述:需要寫入的 Doris 數據庫名稱。
必選:是
默認值:無
⚫ table
描述:需要寫入的 Doris 表名稱。
必選:是
默認值:無
⚫ column
描述:目的表需要寫入數據的字段,這些字段將作為生成的 Json 數據的字段名。字段之間用英文逗號分隔。例如: “column”: [“id”,“name”,“age”]。
必選:是
默認值:否
⚫ preSql
描述:寫入數據到目的表前,會先執行這裏的標準語句。
必選:否
默認值:無
⚫ postSql
描述:寫入數據到目的表後,會執行這裏的標準語句。
必選:否
默認值:無
⚫ maxBatchRows
描述:每批次導入數據的最大行數。和 maxBatchByteSize 共同控制每批次的導入數量。
每批次數據達到兩個閾值之一,即開始導入這一批次的數據。
必選:否
默認值:500000
⚫ maxBatchByteSize
描述:每批次導入數據的最大數據量。和 ** maxBatchRows** 共同控制每批次的導入
數量。每批次數據達到兩個閾值之一,即開始導入這一批次的數據。
必選:否
默認值:104857600
⚫ labelPrefix
描述:每批次導入任務的 label 前綴。最終的 label 將有 labelPrefix + UUID + 序號 組

必選:否
默認值:datax_doris_writer_
⚫ lineDelimiter
描述:每批次數據包含多行,每行為 Json 格式,每行的的分隔符即為 lineDelimiter。
支持多個字節, 例如’\x02\x03’。
必選:否
默認值:\n
⚫ loadProps
描述:StreamLoad 的請求參數,詳情參照 StreamLoad 介紹頁面。
必選:否
默認值:無
⚫ connectTimeout
描述:StreamLoad 單次請求的超時時間, 單位毫秒(ms)。
必選:否
默認值:-1

4 ODBC 外部表

ODBC External Table Of Doris 提供了 Doris 通過數據庫訪問的標準接口(ODBC)來訪問
外部表,外部表省去了繁瑣的數據導入工作,讓 Doris 可以具有了訪問各式數據庫的能力,並藉助 Doris 本身的 OLAP 的能力來解決外部表的數據分析問題:
(1)支持各種數據源接入 Doris
(2)支持 Doris 與各種數據源中的表聯合查詢,進行更加複雜的分析操作
(3)通過 insert into 將 Doris 執行的查詢結果寫入外部的數據源

4.1 使用方式

1)Doris 中創建 ODBC 的外表
方式一:不使用 Resource 創建 ODBC 的外表。

CREATE EXTERNAL TABLE `baseall_oracle` (
 `k1` decimal(9, 3) NOT NULL COMMENT "",
 `k2` char(10) NOT NULL COMMENT "",
 `k3` datetime NOT NULL COMMENT "",
 `k5` varchar(20) NOT NULL COMMENT "",
 `k6` double NOT NULL COMMENT ""
) ENGINE=ODBC
COMMENT "ODBC"
PROPERTIES (
"host" = "192.168.0.1",
"port" = "8086",
"user" = "test",
"password" = "test",
"database" = "test",
"table" = "baseall",
"driver" = "Oracle 19 ODBC driver",
"odbc_type" = "oracle"
);

方式二:通過 ODBC_Resource 來創建 ODBC 外表(推薦使用的方式)。

CREATE EXTERNAL RESOURCE `oracle_odbc`
PROPERTIES (
"type" = "odbc_catalog",
"host" = "192.168.0.1",
"port" = "8086",
"user" = "test",
"password" = "test",
"database" = "test",
"odbc_type" = "oracle",
"driver" = "Oracle 19 ODBC driver"
);
 
CREATE EXTERNAL TABLE `baseall_oracle` (
 `k1` decimal(9, 3) NOT NULL COMMENT "",
 `k2` char(10) NOT NULL COMMENT "",
 `k3` datetime NOT NULL COMMENT "",
 `k5` varchar(20) NOT NULL COMMENT "",
 `k6` double NOT NULL COMMENT ""
) ENGINE=ODBC
COMMENT "ODBC"
PROPERTIES (
"odbc_catalog_resource" = "oracle_odbc",
"database" = "test",
"table" = "baseall"
);

參數説明:

參數

説明

hosts

外表數據庫的 IP 地址

driver

ODBC 外表 Driver 名,需要和 be/conf/odbcinst.ini 中的 Driver 名一致。

odbc_type

外表數據庫的類型,當前支持 oracle, mysql, postgresql

user

外表數據庫的用户名

password

對應用户的密碼信息

2)ODBC Driver 的安裝和配置
各大主流數據庫都會提供 ODBC 的訪問 Driver,用户可以執行參照各數據庫官方推薦
的方式安裝對應的 ODBC Driver LiB 庫。
安裝完成之後,查找對應的數據庫的 Driver Lib 庫的路徑,並且修改 be/conf/odbcinst.ini
的配置:

[MySQL Driver]
Description = ODBC for MySQL
Driver = /usr/lib64/libmyodbc8w.so
FileUsage = 1

上述配置[]裏的對應的是 Driver 名,在建立外部表時需要保持外部表的 Driver 名和配置
文件之中的一致。

Driver= 這個要根據實際 BE 安裝 Driver 的路徑來填寫,本質上就是一個動態庫的路徑,
這裏需要保證該動態庫的前置依賴都被滿足。

切記,這裏要求所有的 BE 節點都安裝上相同的 Driver,並且安裝路徑相同,同時有相
同的 be/conf/odbcinst.ini 的配置

4.2 使用 ODBC 的 MySQL 外表

CentOS 數據庫 ODBC 版本對應關係:

Doris能否替代spark_#大數據


MySQL 與 Doris 的數據類型匹配:

Doris能否替代spark_#scala_02


1)安裝 unixODBC

安裝
yum install -y unixODBC unixODBC-devel libtool-ltdl libtool-ltdl-devel
查看是否安裝成功
odbcinst -j

2)安裝 MySQL 對應版本的 ODBC(每個 BE 節點都要)

下載
wget https://downloads.mysql.com/archives/get/p/10/file/mysqlconnector-odbc-5.3.11-1.el7.x86_64.rpm
安裝
yum install -y mysql-connector-odbc-5.3.11-1.el7.x86_64.rpm
查看是否安裝成功
myodbc-installer -d -l

3)配置 unixODBC,驗證通過 ODBC 訪問 Mysql

編輯 ODBC 配置文件
vim /etc/odbc.ini
[mysql]
Description = Data source MySQL
Driver = MySQL ODBC 5.3 Unicode Driver
Server = hadoop1
Host = hadoop1
Database = test
Port = 3306
User = root
Password = 000000
測試鏈接
isql -v mysql

4)準備 MySQL 表

CREATE TABLE `test_cdc` (
 `id` int NOT NULL AUTO_INCREMENT,
 `name` varchar(255) DEFAULT NULL,
 PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=91234 DEFAULT CHARSET=utf8mb4;
INSERT INTO `test_cdc` VALUES (123, 'this is a update');
INSERT INTO `test_cdc` VALUES (1212, '測試 flink CDC');
INSERT INTO `test_cdc` VALUES (1234, '這是測試');
INSERT INTO `test_cdc` VALUES (11233, 'zhangfeng_1');
INSERT INTO `test_cdc` VALUES (21233, 'zhangfeng_2');
INSERT INTO `test_cdc` VALUES (31233, 'zhangfeng_3');
INSERT INTO `test_cdc` VALUES (41233, 'zhangfeng_4');
INSERT INTO `test_cdc` VALUES (51233, 'zhangfeng_5');
INSERT INTO `test_cdc` VALUES (61233, 'zhangfeng_6');
INSERT INTO `test_cdc` VALUES (71233, 'zhangfeng_7');
INSERT INTO `test_cdc` VALUES (81233, 'zhangfeng_8');
INSERT INTO `test_cdc` VALUES (91233, 'zhangfeng_9');

5)修改 Doris 的配置文件(每個 BE 節點都要,不用重啓 BE)
在 BE 節點的 conf/odbcinst.ini,添加我們的剛才註冊的的 ODBC 驅動([MySQL ODBC
5.3.11]這部分)。

# Driver from the postgresql-odbc package
# Setup from the unixODBC package
[PostgreSQL]
Description = ODBC for PostgreSQL
Driver = /usr/lib/psqlodbc.so
Setup = /usr/lib/libodbcpsqlS.so
FileUsage = 1
# Driver from the mysql-connector-odbc package
# Setup from the unixODBC package
[MySQL ODBC 5.3.11]
Description = ODBC for MySQL
Driver= /usr/lib64/libmyodbc5w.so
FileUsage = 1
# Driver from the oracle-connector-odbc package
# Setup from the unixODBC package
[Oracle 19 ODBC driver]
Description=Oracle ODBC driver for Oracle 19
Driver=/usr/lib/libsqora.so.19.1

6)Doris 建 Resource
通過 ODBC_Resource 來創建 ODBC 外表,這是推薦的方式,這樣 resource 可以複用。

CREATE EXTERNAL RESOURCE `mysql_5_3_11`
PROPERTIES (
"host" = "hadoop1",
"port" = "3306",
"user" = "root",
"password" = "000000",
"database" = "test",
"table" = "test_cdc",
"driver" = "MySQL ODBC 5.3.11", --名稱要和上面[]裏的名稱一致
"odbc_type" = "mysql",
"type" = "odbc_catalog")

7)基於 Resource 創建 Doris 外表

CREATE EXTERNAL TABLE `test_odbc_5_3_11` (
 `id` int NOT NULL ,
 `name` varchar(255) null
) ENGINE=ODBC
COMMENT "ODBC"
PROPERTIES (
"odbc_catalog_resource" = "mysql_5_3_11", --名稱就是 resource 的名稱
"database" = "test",
"table" = "test_cdc"
);

8)查詢 Doris 外表

select * from `test_odbc_5_3_11`;

4.3 使用ODBC的Oracle外表

CentOS 數據庫 ODBC 版本對應關係:

Doris能否替代spark_#spark_03


與 Doris 的數據類型匹配:

Doris能否替代spark_#scala_04


1)安裝 unixODBC

安裝
yum install -y unixODBC unixODBC-devel libtool-ltdl libtool-ltdldevel
查看是否安裝成功
odbcinst -j

2)安裝 Oracle 對應版本的 ODBC(每個 BE 節點都要)
下載 4 個安裝包

wget
https://download.oracle.com/otn_software/linux/instantclient/1913
000/oracle-instantclient19.13-sqlplus-19.13.0.0.0-2.x86_64.rpm
wget 
https://download.oracle.com/otn_software/linux/instantclient/1913
000/oracle-instantclient19.13-devel-19.13.0.0.0-2.x86_64.rpm
wget 
https://download.oracle.com/otn_software/linux/instantclient/1913
000/oracle-instantclient19.13-odbc-19.13.0.0.0-2.x86_64.rpm
wget 
https://download.oracle.com/otn_software/linux/instantclient/1913
000/oracle-instantclient19.13-basic-19.13.0.0.0-2.x86_64.rpm
安裝 4 個安裝包
rpm -ivh oracle-instantclient19.13-basic-19.13.0.0.0-2.x86_64.rpm
rpm -ivh oracle-instantclient19.13-devel-19.13.0.0.0-2.x86_64.rpm
rpm -ivh oracle-instantclient19.13-odbc-19.13.0.0.0-2.x86_64.rpm
rpm -ivh oracle-instantclient19.13-sqlplus-19.13.0.0.0-
2.x86_64.rpm

3)驗證 ODBC 驅動動態鏈接庫是否正確

ldd /usr/lib/oracle/19.13/client64/lib/libsqora.so.19.1

4)配置 unixODBC,驗證通過 ODBC 連接 Oracle

vim /etc/odbcinst.ini
添加如下內容:
[Oracle 19 ODBC driver]
Description = Oracle ODBC driver for Oracle 19
Driver = 
/usr/lib/oracle/19.13/client64/lib/libsqora.so.19.1
vim /etc/odbc.ini
添加如下內容:
[oracle]
Driver = Oracle 19 ODBC driver ---名稱是上面 oracle 部分用[]括起來的內容
ServerName =hadoop2:1521/orcl --oracle 數據 ip 地址,端口及 SID
UserID = atguigu
Password = 000000
驗證
isql oracle

5)修改 Doris 的配置(每個 BE 節點都要,不用重啓)
修改 BE 節點 conf/odbcinst.ini 文件,加入剛才/etc/odbcinst.ini 添加的一樣內容,並刪除原先的 Oracle 配置。

[Oracle 19 ODBC driver]
Description = Oracle ODBC driver for Oracle 19
Driver = 
/usr/lib/oracle/19.13/client64/lib/libsqora.so.19.1

6)創建 Resource

CREATE EXTERNAL RESOURCE `oracle_19`
PROPERTIES (
 "host" = "hadoop2",
 "port" = "1521",
 "user" = "atguigu",
 "password" = "000000",
 "database" = "orcl", --數據庫示例名稱,也就是 ORACLE_SID
 "driver" = "Oracle 19 ODBC driver", --名稱一定和 be odbcinst.ini
裏的 oracle 部分的[]裏的內容一樣
 "odbc_type" = "oracle",
 "type" = "odbc_catalog"
);

7)基於 Resource 創建 Doris 外表

CREATE EXTERNAL TABLE `oracle_odbc` (
 id int,
 name VARCHAR(20) NOT NULL
) ENGINE=ODBC
COMMENT "ODBC"
PROPERTIES (
 "odbc_catalog_resource" = "oracle_19", 
 "database" = "orcl",
 "table" = "student"
);

8)查詢 Doris 外表

select * from oracle_odbc;

5 Doris On ES

Doris-On-ES 將 Doris 的分佈式查詢規劃能力和 ES(Elasticsearch)的全文檢索能力相結合,提供更完善的 OLAP 分析場景解決方案:
(1)ES 中的多 index 分佈式 Join 查詢
(2)Doris 和 ES 中的表聯合查詢,更復雜的全文檢索過濾

5.1 原理

Doris能否替代spark_#spark_05


(1)創建 ES 外表後,FE 會請求建表指定的主機,獲取所有節點的 HTTP 端口信息以

及 index 的 shard 分佈信息等,如果請求失敗會順序遍歷 host 列表直至成功或完全失敗

(2)查詢時會根據 FE 得到的一些節點信息和 index 的元數據信息,生成查詢計劃併發

給對應的 BE 節點

(3)BE 節點會根據就近原則即優先請求本地部署的 ES 節點,BE 通過 HTTP Scroll 方

式流式的從 ES index 的每個分片中併發的從_source 或 docvalue 中獲取數據

(4)Doris 計算完結果後,返回給用户

5.2 使用方式

5.2.1 Doris 中創建 ES 外表

1)創建 ES 索引

PUT test
{
 "settings": {
 "index": {
 "number_of_shards": "1",
 "number_of_replicas": "0"
 }
 },
 "mappings": {
 "doc": { // ES 7.x 版本之後創建索引時不需要指定 type,會有一個默認且唯
一的`_doc` type
 "properties": {
 "k1": {
 "type": "long"
 },
 "k2": {
 "type": "date"
 },
 "k3": {
 "type": "keyword"
 },
 "k4": {
 "type": "text",
 "analyzer": "standard"
 },
 "k5": {
 "type": "float"
 }
 }
 }
 }
}

2)ES 索引導入數據

POST /_bulk
{"index":{"_index":"test","_type":"doc"}}
{ "k1" : 100, "k2": "2020-01-01", "k3": "Trying out Elasticsearch", 
"k4": "Trying out Elasticsearch", "k5": 10.0}
{"index":{"_index":"test","_type":"doc"}}
{ "k1" : 100, "k2": "2020-01-01", "k3": "Trying out Doris", "k4": 
"Trying out Doris", "k5": 10.0}
{"index":{"_index":"test","_type":"doc"}}
{ "k1" : 100, "k2": "2020-01-01", "k3": "Doris On ES", "k4": "Doris 
On ES", "k5": 10.0}
{"index":{"_index":"test","_type":"doc"}}
{ "k1" : 100, "k2": "2020-01-01", "k3": "Doris", "k4": "Doris", 
"k5": 10.0}
{"index":{"_index":"test","_type":"doc"}}
{ "k1" : 100, "k2": "2020-01-01", "k3": "ES", "k4": "ES", "k5": 
10.0}

3)Doris 中創建 ES 外表

CREATE EXTERNAL TABLE `es_test` (
 `k1` bigint(20) COMMENT "",
 `k2` datetime COMMENT "",
 `k3` varchar(20) COMMENT "",
 `k4` varchar(100) COMMENT "",
 `k5` float COMMENT ""
) ENGINE=ELASTICSEARCH // ENGINE 必須是 Elasticsearch
PROPERTIES (
"hosts" = 
"http://hadoop1:9200,http://hadoop2:9200,http://hadoop3:9200",
"index" = "test",
"type" = "doc",
"user" = "",
"password" = ""
);

參數説明:

參數

説明

hosts

ES 集羣地址,可以是一個或多個,也可以是 ES 前端的負載均衡地址

index

對應的 ES 的 index 名字,支持 alias,如果使用 doc_value,需要使用真實的名稱

type

index 的 type,不指定的情況會使用_doc

user

ES 集羣用户名

password

對應用户的密碼信息

➢ ES 7.x 之前的集羣請注意在建表的時候選擇正確的索引類型 type
➢ 認證方式目前僅支持 Http Basic 認證,並且需要確保該用户有訪問: /_cluster/state/、
_nodes/http 等路徑和 index 的讀權限; 集羣未開啓安全認證,用户名和密碼不需要
設置
➢ Doris 表中的列名需要和 ES 中的字段名完全匹配,字段類型應該保持一致
➢ ENGINE 必須是 Elasticsearch

Doris On ES 一個重要的功能就是過濾條件的下推: 過濾條件下推給 ES,這樣只有真正
滿足條件的數據才會被返回,能夠顯著的提高查詢性能和降低 Doris 和 Elasticsearch 的 CPU、memory、IO 使用量

下面的操作符(Operators)會被優化成如下 ES Query:

SQL syntax

ES 5.x+ syntax

=

term query

in

terms query

, < , >= , ⇐ | range query
and | bool.filter
or | bool.should
not | bool.must_not
not in | bool.must_not + terms query
is_not_null | exists query
is_null | bool.must_not + exists query
esquery | ES 原生 json 形式的 QueryDSL

數據類型映射:

Doris能否替代spark_spark_06

5.2.2 啓用列式掃描優化查詢速度

"enable_docvalue_scan" = "true"

1)參數説明
是否開啓通過 ES/Lucene 列式存儲獲取查詢字段的值,默認為 false。開啓後 Doris 從 ES中獲取數據會遵循以下兩個原則:
(1)盡力而為: 自動探測要讀取的字段是否開啓列式存儲(doc_value: true),如果獲取的
字段全部有列存,Doris 會從列式存儲中獲取所有字段的值
(2)自動降級: 如果要獲取的字段只要有一個字段沒有列存,所有字段的值都會從行
存_source 中解析獲取

2)優勢:
默認情況下,Doris On ES 會從行存也就是_source 中獲取所需的所有列,_source 的存
儲採用的行式+json 的形式存儲,在批量讀取性能上要劣於列式存儲,尤其在只需要少數列的情況下尤為明顯,只獲取少數列的情況下,docvalue 的性能大約是_source 性能的十幾倍。
3)注意
text 類型的字段在 ES 中是沒有列式存儲,因此如果要獲取的字段值有 text 類型字段會
自動降級為從_source 中獲取.
在獲取的字段數量過多的情況下(>= 25),從 docvalue中獲取字段值的性能會和從_source中獲取字段值基本一樣。

5.2.3 探測 keyword 類型字段

"enable_keyword_sniff" = "true"

參數説明:
是否對 ES 中字符串類型分詞類型(text) fields 進行探測,獲取額外的未分詞(keyword)字
段名(multi-fields 機制)
在 ES 中可以不建立 index 直接進行數據導入,這時候 ES 會自動創建一個新的索引,
針對字符串類型的字段 ES 會創建一個既有 text 類型的字段又有 keyword 類型的字段,這就是 ES 的 multi fields 特性,mapping 如下:

"k4": {
 "type": "text",
 "fields": {
 "keyword": { 
 "type": "keyword",
 "ignore_above": 256
 }
 }
}

對 k4 進行條件過濾時比如=,Doris On ES 會將查詢轉換為 ES 的 TermQuery。
SQL 過濾條件:

k4 = "Doris On ES"
轉換成 ES 的 query DSL 為:
"term" : {
 "k4": "Doris On ES"
}

因為 k4 的第一字段類型為 text,在數據導入的時候就會根據 k4 設置的分詞器(如果沒
有設置,就是 standard 分詞器)進行分詞處理得到 doris、on、es 三個 Term,如下 ES analyze
API 分析:

POST /_analyze
{
 "analyzer": "standard",
 "text": "Doris On ES"
}

分詞的結果是:

{
 "tokens": [
 {
 "token": "doris",
 "start_offset": 0,
 "end_offset": 5,
 "type": "<ALPHANUM>",
 "position": 0
 },
 {
 "token": "on",
 "start_offset": 6,
 "end_offset": 8,
 "type": "<ALPHANUM>",
 "position": 1
 },
 {
 "token": "es",
 "start_offset": 9,
 "end_offset": 11,
 "type": "<ALPHANUM>",
 "position": 2
 }
 ]
}

查詢時使用的是:

"term" : {
 "k4": "Doris On ES"
}

Doris On ES 這個 term 匹配不到詞典中的任何 term,不會返回任何結果,而啓用
enable_keyword_sniff: true 會自動將 k4 = "Doris On ES"轉換成 k4.keyword = "Doris On ES"來完全匹配 SQL 語義,轉換後的 ES query DSL 為:

"term" : {
 "k4.keyword": "Doris On ES"
}

k4.keyword 的類型是 keyword,數據寫入 ES 中是一個完整的 term,所以可以匹配。

5.2.4 開啓節點自動發現,

"nodes_discovery" = "true"

參數説明:
是否開啓 es 節點發現,默認為 true。
當配置為 true 時,Doris 將從 ES 找到所有可用的相關數據節點(在上面分配的分片)。
如果 ES 數據節點的地址沒有被 Doris BE 訪問,則設置為 false。ES 集羣部署在與公共 Internet隔離的內網,用户通過代理訪問。

5.2.5 配置 https 訪問模式

"http_ssl_enabled" = "true"

參數説明:
ES 集羣是否開啓 https 訪問模式。
目前 fe/be 實現方式為信任所有,這是臨時解決方案,後續會使用真實的用户配置證書。

5.2.6 查詢用法

完成在 Doris 中建立 ES 外表後,除了無法使用 Doris 中的數據模型(rollup、預聚合、
物化視圖等)外並無區別。
1)基本查詢

select * from es_table where k1 > 1000 and k3 ='term' or k4 like 
'fu*z_'

2)擴展的 esquery(field, QueryDSL)
通過 esquery(field, QueryDSL)函數將一些無法用 sql 表述的 query 如 match_phrase、geoshape 等下推給 ES 進行過濾處理,esquery 的第一個列名參數用於關聯 index,第二個參數是 ES 的基本 Query DSL 的 json 表述,使用花括號{}包含,json 的 root key 有且只能有一個,如 match_phrase、geo_shape、bool 等。
(1)match_phrase 查詢:

select * from es_table where esquery(k4, '{
 "match_phrase": {
 "k4": "doris on es"
 }
 }');

(2)geo 相關查詢:

select * from es_table where esquery(k4, '{
 "geo_shape": {
 "location": {
 "shape": {
 "type": "envelope",
 "coordinates": [
 [
 13,
 53
 ],
 [
 14,
 52
 ]
 ]
 },
 "relation": "within"
 }
 }
 }');

(3)bool 查詢:

select * from es_table where esquery(k4, ' {
 "bool": {
 "must": [
 {
 "terms": {
 "k1": [
 11,
 12
 ]
 }
 },
 {
 "terms": {
 "k2": [
 100
 ]
 }
 }
 ]
 }
 }');

5.3 最佳實踐

5.3.1 時間類型字段使用建議

在 ES 中,時間類型的字段使用十分靈活,但是在 Doris On ES 中如果對時間類型字段
的類型設置不當,則會造成過濾條件無法下推。
創建索引時對時間類型格式的設置做最大程度的格式兼容:

"dt": {
 "type": "date",
 "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
}

在 Doris 中建立該字段時建議設置為 date 或 datetime,也可以設置為 varchar 類型, 使用
如下 SQL 語句都可以直接將過濾條件下推至 ES:

select * from doe where k2 > '2020-06-21';
select * from doe where k2 < '2020-06-21 12:00:00'; 
select * from doe where k2 < 1593497011; 
select * from doe where k2 < now();
select * from doe where k2 < date_format(now(), '%Y-%m-%d');

注意:
(1)在 ES 中如果不對時間類型的字段設置 format, 默認的時間類型字段格式為
strict_date_optional_time||epoch_millis
(2)導入到 ES 的日期字段如果是時間戳需要轉換成 ms, ES 內部處理時間戳都是按照
ms 進行處理的, 否則 Doris On ES 會出現顯示錯誤。

5.3.2 獲取 ES 元數據字段_id

導入文檔在不指定_id 的情況下 ES 會給每個文檔分配一個全局唯一的_id 即主鍵, 用户
也可以在導入時為文檔指定一個含有特殊業務意義的_id; 如果需要在 Doris On ES 中獲取該字段值,建表時可以增加類型為 varchar 的_id 字段:

CREATE EXTERNAL TABLE `doe` (
 `_id` varchar COMMENT "",
 `city` varchar COMMENT ""
) ENGINE=ELASTICSEARCH
PROPERTIES (
"hosts" = "http://127.0.0.1:8200",
"user" = "root",
"password" = "root",
"index" = "doe",
"type" = "doc"
}

注意:
(1)_id 字段的過濾條件僅支持=和 in 兩種
(2)_id 字段只能是 varchar 類型