動態

詳情 返回 返回

時序數據庫DolphinDB文本數據加載教程 - 動態 詳情

DolphinDB提供以下4個函數,將文本數據導入內存或數據庫:

loadText: 將文本文件導入為內存表。

ploadText: 將文本文件並行導入為分區內存表。與loadText函數相比,速度更快。

loadTextEx: 將文本文件導入數據庫中,包括分佈式數據庫,本地磁盤數據庫或內存數據庫。

textChunkDS:將文本文件劃分為多個小數據源,再通過mr函數進行靈活的數據處理。

DolphinDB的文本數據導入不僅靈活,功能豐富,而且速度非常快。DolphinDB與Clickhouse, MemSQL, Druid, Pandas等業界流行的系統相比,單線程導入的速度更快,最多可達一個數量級的優勢;多線程並行導入的情況下,速度優勢更加明顯。

本教程介紹文本數據導入時的常見問題,相應的解決方案以及注意事項。

  1. 自動識別數據格式

大多數其它系統中,導入文本數據時,需要由用户指定數據的格式。為了方便用户,DolphinDB在導入數據時,能夠自動識別數據格式。

自動識別數據格式包括兩部分:字段名稱識別和數據類型識別。如果文件的第一行沒有任何一列以數字開頭,那麼系統認為第一行是文件頭,包含了字段名稱。DolphinDB會抽取少量部分數據作為樣本,並自動推斷各列的數據類型。因為是基於部分數據,某些列的數據類型的識別可能有誤。但是對於大多數文本文件,無須手動指定各列的字段名稱和數據類型,就能正確地導入到DolphinDB中。

請注意:DolphinDB支持自動識別大部分DolphinDB提供的數據類型,但是目前暫不支持識別UUID和IPADDR類型,在後續版本中會支持。

loadText函數用於將數據導入DolphinDB內存表。下例調用loadText函數導入數據,並查看生成的數據表的結構。例子中涉及到的數據文件請參考附錄。

dataFilePath="/home/data/candle_201801.csv"
tmpTB=loadText(filename=dataFilePath);

查看數據表前5行數據:

select top 5 * from tmpTB;

symbol exchange cycle tradingDay date       time     open  high  low   close volume  turnover   unixTime
------ -------- ----- ---------- ---------- -------- ----- ----- ----- ----- ------- ---------- -------------
000001 SZSE     1     2018.01.02 2018.01.02 93100000 13.35 13.39 13.35 13.38 2003635 2.678558E7 1514856660000
000001 SZSE     1     2018.01.02 2018.01.02 93200000 13.37 13.38 13.33 13.33 867181  1.158757E7 1514856720000
000001 SZSE     1     2018.01.02 2018.01.02 93300000 13.32 13.35 13.32 13.35 903894  1.204971E7 1514856780000
000001 SZSE     1     2018.01.02 2018.01.02 93400000 13.35 13.38 13.35 13.35 1012000 1.352286E7 1514856840000
000001 SZSE     1     2018.01.02 2018.01.02 93500000 13.35 13.37 13.35 13.37 1601939 2.140652E7 1514856900000

調用schema函數查看錶結構(字段名稱、數據類型等信息):

tmpTB.schema().colDefs;

name       typeString typeInt comment
---------- ---------- ------- -------
symbol     SYMBOL     17
exchange   SYMBOL     17
cycle      INT        4
tradingDay DATE       6
date       DATE       6
time       INT        4
open       DOUBLE     16
high       DOUBLE     16
low        DOUBLE     16
close      DOUBLE     16
volume     INT        4
turnover   DOUBLE     16
unixTime   LONG       5
  1. 指定數據導入格式

本教程講述的4個數據加載函數中,均可用schema參數指定一個表,內含各字段的名稱、類型、格式、需要導入的列等信息。該表可包含以下4列:

  • name:字符串,表示列名
  • type:字符串,表示每列的數據類型
  • format:字符串,表示日期或時間列的格式
  • col:整型,表示要加載的列的下標。該列的值必須是升序。

其中,name和type這兩列是必需的,而且必須是前兩列。format和col這兩列是可選的,且沒有先後關係的要求。

例如,我們可以使用下面的數據表作為schema參數:

name         type
----------   -------
timestamp    SECOND
ID           INT
qty          INT
price        DOUBLE

2.1 提取文本文件的schema

extractTextSchema函數用於獲取文本文件的schema,包括字段名稱和數據類型等信息。

例如,使用extractTextSchema函數得到本教程中示例文件的表結構:

dataFilePath="/home/data/candle_201801.csv"
schemaTB=extractTextSchema(dataFilePath)
schemaTB;

name       type
---------- ------
symbol     SYMBOL
exchange   SYMBOL
cycle      INT
tradingDay DATE
date       DATE
time       INT
open       DOUBLE
high       DOUBLE
low        DOUBLE
close      DOUBLE
volume     INT
turnover   DOUBLE
unixTime   LONG

通過extractTextSchema函數得到數據文件的表結構schemaTB以後,若表中自動解析的數據類型不符合預期,可以使用SQL語句對該表進行修改,從而得到滿足要求的表結構。

2.2 指定字段名稱和類型

當系統自動識別的字段名稱或者數據類型不符合預期或需求時,可以通過設置schema參數為文本文件中的每列指定字段名稱和數據類型。

例如,若導入數據的volume列被自動識別為INT類型,而需要的volume類型是LONG類型,就需要通過schema參數指定volumne列類型為LONG。下面的例子中,首先調用extractTextSchema函數得到文本文件的表結構,再根據需求修改表中列的數據類型。

dataFilePath="/home/data/candle_201801.csv"
schemaTB=extractTextSchema(dataFilePath)
update schemaTB set type="LONG" where name="volume";

使用loadText函數導入文本文件,將數據按照schemaTB所規定的字段數據類型導入到數據庫中。

tmpTB=loadText(filename=dataFilePath,schema=schemaTB);

查看錶中前五行的數據,volume列數據以長整型的形式正常顯示:

select top 5 * from tmpTB;

symbol exchange cycle tradingDay date       time     open  high  low   close volume  turnover   unixTime
------ -------- ----- ---------- ---------- -------- ----- ----- ----- ----- ------- ---------- -------------
000001 SZSE     1     2018.01.02 2018.01.02 93100000 13.35 13.39 13.35 13.38 2003635 2.678558E7 1514856660000
000001 SZSE     1     2018.01.02 2018.01.02 93200000 13.37 13.38 13.33 13.33 867181  1.158757E7 1514856720000
000001 SZSE     1     2018.01.02 2018.01.02 93300000 13.32 13.35 13.32 13.35 903894  1.204971E7 1514856780000
000001 SZSE     1     2018.01.02 2018.01.02 93400000 13.35 13.38 13.35 13.35 1012000 1.352286E7 1514856840000
000001 SZSE     1     2018.01.02 2018.01.02 93500000 13.35 13.37 13.35 13.37 1601939 2.140652E7 1514856900000

上例介紹了修改數據類型的情況,若要修改表中的字段名稱,也可以通過同樣的方法實現。

請注意,若DolphinDB對日期和時間相關數據類型的解析不符合預期,需要通過本教程第2.3小節的方式解決。

2.3 指定日期和時間類型的格式

對於日期列或時間列的數據,如果DolphinDB識別的數據類型不符合預期,不僅需要在schema的type列指定數據類型,還需要在format列中指定格式(用字符串表示),如"MM/dd/yyyy"。如何表示日期和時間格式請參考日期和時間的調整及格式。

下面結合例子具體説明對日期和時間列指定數據類型的方法。

在DolphinDB中執行以下腳本,生成本例所需的數據文件。

dataFilePath="/home/data/timeData.csv"
t=table(["20190623 14:54:57","20190623 15:54:23","20190623 16:30:25"] as time,`AAPL`MS`IBM as sym,2200 5400 8670 as qty,54.78 59.64 65.23 as price)
saveText(t,dataFilePath);

加載數據前,使用extractTextSchema函數獲取該數據文件的schema:

schemaTB=extractTextSchema(dataFilePath)
schemaTB;

name  type
----- ------
time  SECOND
sym   SYMBOL
qty   INT
price DOUBLE

顯然,系統識別time列的數據類型不符合預期。如果直接加載該文件,time列的數據將為空。為了能夠正確加載該文件time列的數據,需要指定time列的數據類型為DATETIME,並且指定該列的格式為"yyyyMMdd HH:mm:ss"。

update schemaTB set type="DATETIME" where name="time"
schemaTB[`format]=["yyyyMMdd HH:mm:ss",,,];

導入數據並查看,數據顯示正確:

tmpTB=loadText(dataFilePath,,schemaTB)
tmpTB;

time                sym  qty  price
------------------- ---- ---- -----
2019.06.23T14:54:57 AAPL 2200 54.78
2019.06.23T15:54:23 MS   5400 59.64
2019.06.23T16:30:25 IBM  8670 65.23

2.4 導入指定列

在導入數據時,可以通過schema參數指定只導入文本文件中的某幾列。

下例中,只需加載文本文件中symbol, date, open, high, close, volume, turnover這7列。

首先,調用extractTextSchema函數得到目標文本文件的表結構。

dataFilePath="/home/data/candle_201801.csv"
schemaTB=extractTextSchema(dataFilePath);

使用rowNo函數為各列生成列號,賦值給schema表中的col列,然後修改schema表,僅保留表示需要導入的字段的行。

update schemaTB set col = rowNo(name)
schemaTB=select * from schemaTB where name in `symbol`date`open`high`close`volume`turnover;
請注意:
1.列號從0開始。上例中第一列symbol列對應的列號是0。
2.導入數據時不能改變各列的先後順序。如果需要調整列的順序,可以將數據文件加載後,再使用reorderColumns!函數。

最後,使用loadText函數,並配置schema參數,導入文本文件中指定的列。

tmpTB=loadText(filename=dataFilePath,schema=schemaTB);

查看錶中前5行,只導入了所需的列:

select top 5 * from tmpTB

symbol date       open   high  close volume turnover
------ ---------- ------ ----- ----- ------ ----------
000001 2018.01.02 9.31E7 13.35 13.35 13     2.003635E6
000001 2018.01.02 9.32E7 13.37 13.33 13     867181
000001 2018.01.02 9.33E7 13.32 13.32 13     903894
000001 2018.01.02 9.34E7 13.35 13.35 13     1.012E6
000001 2018.01.02 9.35E7 13.35 13.35 13     1.601939E6

2.5 跳過文本數據的前若干行

在數據導入時,若需跳過文件前n行(可能為文件説明),可指定skipRows參數為n。由於描述文件的説明通常不會非常冗長,因此這個參數的取值最大為1024。本教程講述的4個數據加載函數均支持skipRows參數。

下例中,通過loadText函數導入數據文件,並且查看該文件導入以後表的總行數,以及前5行的內容。

dataFilePath="/home/data/candle_201801.csv"
tmpTB=loadText(filename=dataFilePath)
select count(*) from tmpTB;

count
-----
5040

select top 5 * from tmpTB;

symbol exchange cycle tradingDay date       time     open  high  low   close volume  turnover   unixTime
------ -------- ----- ---------- ---------- -------- ----- ----- ----- ----- ------- ---------- -------------
000001 SZSE     1     2018.01.02 2018.01.02 93100000 13.35 13.39 13.35 13.38 2003635 2.678558E7 1514856660000
000001 SZSE     1     2018.01.02 2018.01.02 93200000 13.37 13.38 13.33 13.33 867181  1.158757E7 1514856720000
000001 SZSE     1     2018.01.02 2018.01.02 93300000 13.32 13.35 13.32 13.35 903894  1.204971E7 1514856780000
000001 SZSE     1     2018.01.02 2018.01.02 93400000 13.35 13.38 13.35 13.35 1012000 1.352286E7 1514856840000
000001 SZSE     1     2018.01.02 2018.01.02 93500000 13.35 13.37 13.35 13.37 1601939 2.140652E7 1514856900000

指定skipRows參數取值為1000,跳過文本文件的前1000行導入文件:

tmpTB=loadText(filename=dataFilePath,skipRows=1000)
select count(*) from tmpTB;

count
-----
4041

select top 5 * from tmpTB;

col0   col1 col2 col3       col4       col5      col6  col7  col8  col9  col10  col11      col12
------ ---- ---- ---------- ---------- --------- ----- ----- ----- ----- ------ ---------- -------------
000001 SZSE 1    2018.01.08 2018.01.08 101000000 13.13 13.14 13.12 13.14 646912 8.48962E6  1515377400000
000001 SZSE 1    2018.01.08 2018.01.08 101100000 13.13 13.14 13.13 13.14 453647 5.958462E6 1515377460000
000001 SZSE 1    2018.01.08 2018.01.08 101200000 13.13 13.14 13.12 13.13 700853 9.200605E6 1515377520000
000001 SZSE 1    2018.01.08 2018.01.08 101300000 13.13 13.14 13.12 13.12 738920 9.697166E6 1515377580000
000001 SZSE 1    2018.01.08 2018.01.08 101400000 13.13 13.14 13.12 13.13 469800 6.168286E6 1515377640000
請注意:如上例所示,在跳過前n行進行導入時,若數據文件的第一行是列名,改行會作為第一行被略過。

在上面的例子中,文本文件指定skipRows參數導入以後,由於表示列名的第一行被跳過,列名變成了默認列名:col1,col2等等。若需要保留列名而又指定跳過前n行,可先通過extractTextSchema函數得到文本文件的schema,在導入時指定schema參數:

schema=extractTextSchema(dataFilePath)
tmpTB=loadText(filename=dataFilePath,schema=schema,skipRows=1000)
select count(*) from tmpTB;

count
-----
4041

select top 5 * from tmpTB;

symbol exchange cycle tradingDay date       time      open  high  low   close volume turnover   unixTime
------ -------- ----- ---------- ---------- --------- ----- ----- ----- ----- ------ ---------- -------------
000001 SZSE     1     2018.01.08 2018.01.08 101000000 13.13 13.14 13.12 13.14 646912 8.48962E6  1515377400000
000001 SZSE     1     2018.01.08 2018.01.08 101100000 13.13 13.14 13.13 13.14 453647 5.958462E6 1515377460000
000001 SZSE     1     2018.01.08 2018.01.08 101200000 13.13 13.14 13.12 13.13 700853 9.200605E6 1515377520000
000001 SZSE     1     2018.01.08 2018.01.08 101300000 13.13 13.14 13.12 13.12 738920 9.697166E6 1515377580000
000001 SZSE     1     2018.01.08 2018.01.08 101400000 13.13 13.14 13.12 13.13 469800 6.168286E6 1515377640000
  1. 並行導入數據

3.1 單個文件多線程載入內存

ploadText函數可將一個文本文件以多線程的方式載入內存。該函數與loadText函數的語法是一致的,區別在於,ploadText函數可以快速載入大型文件,並且生成內存分區表。它充分利用了多核CPU來並行載入文件,並行程度取決於服務器本身CPU核數量和節點的localExecutors配置。

下面比較loadText函數與ploadText函數導入同一個文件的性能。

首先通過腳本生成一個4GB左右的文本文件:

filePath="/home/data/testFile.csv"
appendRows=100000000
t=table(rand(100,appendRows) as int,take(string('A'..'Z'),appendRows) as symbol,take(2010.01.01..2018.12.30,appendRows) as date,rand(float(100),appendRows) as float,00:00:00.000 + rand(86400000,appendRows) as time)
t.saveText(filePath);

分別通過loadTextploadText來載入文件。本例所用節點是6核12超線程的CPU。

timer loadText(filePath);
Time elapsed: 12629.492 ms

timer ploadText(filePath);
Time elapsed: 2669.702 ms

結果顯示在此配置下,ploadText的性能是loadText的4.5倍左右。

3.2 多文件並行導入

在大數據應用領域,數據導入往往不只是一個或兩個文件的導入,而是數十個甚至數百個大型文件的批量導入。為了達到更好的導入性能,建議儘量以並行方式導入批量的數據文件。

loadTextEx函數可將文本文件導入指定的數據庫中,包括分佈式數據庫,本地磁盤數據庫或內存數據庫。由於DolphinDB的分區表支持併發讀寫,因此可以支持多線程導入數據。使用loadTextEx將文本數據導入到分佈式數據庫,具體實現為將數據先導入到內存,再由內存寫入到數據庫,這兩個步驟由同一個函數完成,以保證高效率。

下例展示如何將磁盤上的多個文件批量寫入到DolphinDB分區表中。首先,在DolphinDB中執行以下腳本,生成100個文件,共約778MB,包括1千萬條記錄。

n=100000
dataFilePath="/home/data/multi/multiImport_"+string(1..100)+".csv"
for (i in 0..99){
    trades=table(sort(take(100*i+1..100,n)) as id,rand(`IBM`MSFT`GM`C`FB`GOOG`V`F`XOM`AMZN`TSLA`PG`S,n) as sym,take(2000.01.01..2000.06.30,n) as date,10.0+rand(2.0,n) as price1,100.0+rand(20.0,n) as price2,1000.0+rand(200.0,n) as price3,10000.0+rand(2000.0,n) as price4,10000.0+rand(3000.0,n) as price5)
    trades.saveText(dataFilePath[i])
};

創建數據庫和表:

login(`admin,`123456)
dbPath="dfs://DolphinDBdatabase"
db=database(dbPath,VALUE,1..10000)
tb=db.createPartitionedTable(trades,`tb,`id);

DolphinDB的cut函數可將一個向量中的元素分組。下面調用cut函數將待導入的文件路徑進行分組,再調用submitJob函數,為每個線程分配寫入任務,批量導入數據。

def writeData(db,file){
   loop(loadTextEx{db,`tb,`id,},file)
}
parallelLevel=10
for(x in dataFilePath.cut(100/parallelLevel)){
    submitJob("loadData"+parallelLevel,"loadData",writeData{db,x})
};
請注意:DolphinDB的分區表不允許多個線程同時向一個分區寫數據。上例中,每個文件中的分區列(id列)取值不同,因此不會造成多個線程寫入同一個分區的情況。在設計分區表的併發讀寫時,請確保不會有多個線程同時寫入同一分區。

通過getRecentJobs函數可以取得當前本地節點上最近n個批處理作業的狀態。使用select語句計算並行導入批量文件所需時間,得到在6核12超線程的CPU上耗時約1.59秒。

select max(endTime) - min(startTime) from getRecentJobs() where jobId like "loadData"+string(parallelLevel)+"%";

max_endTime_sub
---------------
1590

執行以下腳本,將100個文件單線程順序導入數據庫,記錄所需時間,耗時約8.65秒。

timer writeData(db, dataFilePath);
Time elapsed: 8647.645 ms

結果顯示在此配置下,並行開啓10個線程導入速度是單線程導入的5.5倍左右。

查看數據表中的記錄條數:

select count(*) from loadTable("dfs://DolphinDBdatabase", `tb);

count
------
10000000
  1. 導入數據庫前的預處理

在將數據導入數據庫之前,若需要對數據進行復雜的處理,例如日期和時間數據類型的強制轉換,填充空值等,可以在調用loadTextEx函數時指定transform參數。tansform參數接受一個函數作為參數,並且要求該函數只能接受一個參數。函數的輸入是一個未分區的內存表,輸出也是一個未分區的內存表。需要注意的是,只有loadTextEx函數提供transform參數。

4.1 指定日期和時間數據的數據類型

4.1.1 將數值類型表示的日期和時間轉化為指定類型

數據文件中表示時間的數據可能是整型或者長整型,而在進行數據分析時,往往又需要將這類數據強制轉化為時間類型的格式導入並存儲到數據庫中。針對這種場景,可通過loadTextEx函數的transform參數為文本文件中的日期和時間列指定相應的數據類型。

首先,創建分佈式數據庫和表。

login(`admin,`123456)
dataFilePath="/home/data/candle_201801.csv"
dbPath="dfs://DolphinDBdatabase"
db=database(dbPath,VALUE,2018.01.02..2018.01.30)
schemaTB=extractTextSchema(dataFilePath)
update schemaTB set type="TIME" where name="time"
tb=table(1:0,schemaTB.name,schemaTB.type)
tb=db.createPartitionedTable(tb,`tb1,`date);

自定義函數foo,用於對數據進行預處理,並返回處理過後的數據表。

def foo(mutable t){
    return t.replaceColumn!(`time,time(t.time/10))
}
請注意:在自定義函數體內對數據進行處理時,請儘量使用本地的修改(帶有!的函數)來提升性能。

調用loadTextEx函數,並且指定transform參數,系統會對文本文件中的數據執行transform參數指定的函數,即foo函數,再將得到的結果保存到數據庫中。

tmpTB=loadTextEx(dbHandle=db,tableName=`tb1,partitionColumns=`date,filename=dataFilePath,transform=foo);

查看錶內前5行數據。可見time列是以TIME類型存儲,而不是文本文件中的INT類型:

select top 5* from loadTable(dbPath,`tb1);

symbol exchange cycle tradingDay date       time               open  high  low   close volume  turnover   unixTime
------ -------- ----- ---------- ---------- ------------------ ----- ----- ----- ----- ------- ---------- -------------
000001 SZSE     1     2018.01.02 2018.01.02 02:35:10.000000000 13.35 13.39 13.35 13.38 2003635 2.678558E7 1514856660000
000001 SZSE     1     2018.01.02 2018.01.02 02:35:20.000000000 13.37 13.38 13.33 13.33 867181  1.158757E7 1514856720000
000001 SZSE     1     2018.01.02 2018.01.02 02:35:30.000000000 13.32 13.35 13.32 13.35 903894  1.204971E7 1514856780000
000001 SZSE     1     2018.01.02 2018.01.02 02:35:40.000000000 13.35 13.38 13.35 13.35 1012000 1.352286E7 1514856840000
000001 SZSE     1     2018.01.02 2018.01.02 02:35:50.000000000 13.35 13.37 13.35 13.37 1601939 2.140652E7 1514856900000

4.1.2 為文本文件中的日期和時間相關列指定數據類型

另一種與日期和時間列相關的處理是,文本文件中日期以DATE類型存儲,在導入數據庫時希望以MONTH的形式存儲。這種情況也可通過loadTextEx函數的transform參數轉換該日期列的數據類型,步驟與上述過程一致。

login(`admin,`123456)
dbPath="dfs://DolphinDBdatabase"
db=database(dbPath,VALUE,2018.01.02..2018.01.30)
schemaTB=extractTextSchema(dataFilePath)
update schemaTB set type="MONTH" where name="tradingDay"
tb=table(1:0,schemaTB.name,schemaTB.type)
tb=db.createPartitionedTable(tb,`tb1,`date)
def fee(mutable t){
    return t.replaceColumn!(`tradingDay,month(t.tradingDay))
}
tmpTB=loadTextEx(dbHandle=db,tableName=`tb1,partitionColumns=`date,filename=dataFilePath,transform=fee);

查看錶內前5行數據。可見tradingDay列是以MONTH類型存儲,而不是文本文件中的DATE類型:

select top 5* from loadTable(dbPath,`tb1);

symbol exchange cycle tradingDay date       time     open  high  low   close volume  turnover   unixTime
------ -------- ----- ---------- ---------- -------- ----- ----- ----- ----- ------- ---------- -------------
000001 SZSE     1     2018.01M   2018.01.02 93100000 13.35 13.39 13.35 13.38 2003635 2.678558E7 1514856660000
000001 SZSE     1     2018.01M   2018.01.02 93200000 13.37 13.38 13.33 13.33 867181  1.158757E7 1514856720000
000001 SZSE     1     2018.01M   2018.01.02 93300000 13.32 13.35 13.32 13.35 903894  1.204971E7 1514856780000
000001 SZSE     1     2018.01M   2018.01.02 93400000 13.35 13.38 13.35 13.35 1012000 1.352286E7 1514856840000
000001 SZSE     1     2018.01M   2018.01.02 93500000 13.35 13.37 13.35 13.37 1601939 2.140652E7 1514856900000

4.2 對錶內數據填充空值

transform參數支持調用DolphinDB的內置函數,當內置函數要求多個參數時,我們可以使用部分應用將多參數函數轉換為一個參數的函數。例如,調用nullFill!函數對文本文件中的空值進行填充。

db=database(dbPath,VALUE,2018.01.02..2018.01.30)
tb=db.createPartitionedTable(tb,`tb1,`date)
tmpTB=loadTextEx(dbHandle=db,tableName=`pt,partitionColumns=`date,filename=dataFilePath,transform=nullFill!{,0});
  1. 使用Map-Reduce自定義數據導入

DolphinDB支持使用Map-Reduce自定義數據導入,將數據按行進行劃分,並將劃分後的數據通過Map-Reduce導入到DolphinDB。

可使用textChunkDS函數將文件劃分為多個小文件數據源,再通過mr函數寫入到數據庫中。在調用mr將數據存入數據庫前,用户還可進行靈活的數據處理,從而實現更復雜的導入需求。

5.1 將文件中的股票和期貨數據存儲到兩個不同的數據表

在DolphinDB中執行以下腳本,生成一個大小約為1GB的數據文件,其中包括股票數據和期貨數據。

n=10000000
dataFilePath="/home/data/chunkText.csv"
trades=table(rand(`stock`futures,n) as type, rand(`IBM`MSFT`GM`C`FB`GOOG`V`F`XOM`AMZN`TSLA`PG`S,n) as sym,take(2000.01.01..2000.06.30,n) as date,10.0+rand(2.0,n) as price1,100.0+rand(20.0,n) as price2,1000.0+rand(200.0,n) as price3,10000.0+rand(2000.0,n) as price4,10000.0+rand(3000.0,n) as price5,10000.0+rand(4000.0,n) as price6,rand(10,n) as qty1,rand(100,n) as qty2,rand(1000,n) as qty3,rand(10000,n) as qty4,rand(10000,n) as qty5,rand(10000,n) as qty6)
trades.saveText(dataFilePath);

分別創建用於存放股票數據和期貨數據的分佈式數據庫和表:

login(`admin,`123456)
dbPath1="dfs://DolphinDBTickDatabase"
dbPath2="dfs://DolphinDBFuturesDatabase"
db1=database(dbPath1,VALUE,`IBM`MSFT`GM`C`FB`GOOG`V`F`XOM`AMZN`TSLA`PG`S)
db2=database(dbPath2,VALUE,2000.01.01..2000.06.30)
tb1=db1.createPartitionedTable(trades,`stock,`sym)
tb2=db2.createPartitionedTable(trades,`futures,`date);

定義函數,用於劃分數據,並將數據寫入到不同的數據庫。

def divideImport(tb, mutable stockTB, mutable futuresTB)
{
    tdata1=select * from tb where type="stock"
    tdata2=select * from tb where type="futures"
    append!(stockTB, tdata1)
    append!(futuresTB, tdata2)
}

再通過textChunkDS函數劃分文本文件,以300MB為單位進行劃分,文件被劃分成了4部分。

ds=textChunkDS(dataFilePath,300)
ds;

(DataSource<readTableFromFileSegment, DataSource<readTableFromFileSegment, DataSource<readTableFromFileSegment, DataSource<readTableFromFileSegment)

調用mr函數,指定數據源將文件導入到數據庫中。由於map函數(由mapFunc參數指定)只接受一個表作為參數,這裏我們使用部分應用將多參數函數轉換為一個參數的函數。

mr(ds=ds, mapFunc=divideImport{,tb1,tb2}, parallel=false);
請注意,這裏每個小文件數據源可能包含相同分區的數據。DolphinDB不允許多個線程同時對相同分區進行寫入,因此要將mr函數的parallel參數設置為false,否則會拋出異常。

查看2個數據庫中表的前5行,股票數據庫中均為股票數據,期貨數據庫中均為期貨數據。

stock表:

select top 5 * from loadTable("dfs://DolphinDBTickDatabase", `stock);

type  sym  date       price1    price2     price3      price4       price5       price6       qty1 qty2 qty3 qty4 qty5 qty6
----- ---- ---------- --------- ---------- ----------- ------------ ------------ ------------ ---- ---- ---- ---- ---- ----
stock AMZN 2000.02.14 11.224234 112.26763  1160.926836 11661.418403 11902.403305 11636.093467 4    53   450  2072 9116 12
stock AMZN 2000.03.29 10.119057 111.132165 1031.171855 10655.048121 12682.656303 11182.317321 6    21   651  2078 7971 6207
stock AMZN 2000.06.16 11.61637  101.943971 1019.122963 10768.996906 11091.395164 11239.242307 0    91   857  3129 3829 811
stock AMZN 2000.02.20 11.69517  114.607763 1005.724332 10548.273754 12548.185724 12750.524002 1    39   270  4216 8607 6578
stock AMZN 2000.02.23 11.534805 106.040664 1085.913295 11461.783565 12496.932604 12995.461331 4    35   488  4042 6500 4826

futures表:

select top 5 * from loadTable("dfs://DolphinDBFuturesDatabase", `futures);

type    sym  date       price1    price2     price3      price4       price5       price6       qty1 qty2 qty3 qty4 qty5 ...
------- ---- ---------- --------- ---------- ----------- ------------ ------------ ------------ ---- ---- ---- ---- ---- ---
futures MSFT 2000.01.01 11.894442 106.494131 1000.600933 10927.639217 10648.298313 11680.875797 9    10   241  524  8325 ...
futures S    2000.01.01 10.13728  115.907379 1140.10161  11222.057315 10909.352983 13535.931446 3    69   461  4560 2583 ...
futures GM   2000.01.01 10.339581 112.602729 1097.198543 10938.208083 10761.688725 11121.888288 1    1    714  6701 9203 ...
futures IBM  2000.01.01 10.45422  112.229537 1087.366764 10356.28124  11829.206165 11724.680443 0    47   741  7794 5529 ...
futures TSLA 2000.01.01 11.901426 106.127109 1144.022732 10465.529256 12831.721586 10621.111858 4    43   136  9858 8487 ...
n=10000000
dataFilePath="/home/data/chunkText.csv"
trades=table(rand(`IBM`MSFT`GM`C`FB`GOOG`V`F`XOM`AMZN`TSLA`PG`S,n) as sym,sort(take(2000.01.01..2000.06.30,n)) as date,10.0+rand(2.0,n) as price1,100.0+rand(20.0,n) as price2,1000.0+rand(200.0,n) as price3,10000.0+rand(2000.0,n) as price4,10000.0+rand(3000.0,n) as price5,10000.0+rand(4000.0,n) as price6,rand(10,n) as qty1,rand(100,n) as qty2,rand(1000,n) as qty3,rand(10000,n) as qty4, rand(10000,n) as qty5, rand(1000,n) as qty6)
trades.saveText(dataFilePath);

5.2 快速加載大文件首尾部分數據

可使用textChunkDS將大文件劃分成多個小的數據源(chunk),然後加載首尾兩個數據源。在DolphinDB中執行以下腳本生成數據文件:

n=10000000
dataFilePath="/home/data/chunkText.csv"
trades=table(rand(`IBM`MSFT`GM`C`FB`GOOG`V`F`XOM`AMZN`TSLA`PG`S,n) as sym,sort(take(2000.01.01..2000.06.30,n)) as date,10.0+rand(2.0,n) as price1,100.0+rand(20.0,n) as price2,1000.0+rand(200.0,n) as price3,10000.0+rand(2000.0,n) as price4,10000.0+rand(3000.0,n) as price5,10000.0+rand(4000.0,n) as price6,rand(10,n) as qty1,rand(100,n) as qty2,rand(1000,n) as qty3,rand(10000,n) as qty4, rand(10000,n) as qty5, rand(1000,n) as qty6)
trades.saveText(dataFilePath);

再通過textChunkDS函數劃分文本文件,以10MB為單位進行劃分。

ds=textChunkDS(dataFilePath, 10);

調用mr函數,加載首尾兩個chunk的數據。因為這兩個chunk的數據非常小,加載速度非常快。

head_tail_tb = mr(ds=[ds.head(), ds.tail()], mapFunc=x->x, finalFunc=unionAll{,false});

查看head_tail_tb表中的記錄數以及前5條記錄。因為數據是隨機生成,記錄數可能每次會略有不同,前5行的數據也會跟下面顯示的不同。

select count(*) from head_tail_tb;

count
------
192262

查看錶的前5行數據:

select top 5 * from head_tail_tb;

sym  date       price1    price2     price3      price4       price5       price6       qty1 qty2 qty3 qty4 qty5 qty6
---- ---------- --------- ---------- ----------- ------------ ------------ ------------ ---- ---- ---- ---- ---- ----
IBM  2000.01.01 10.978551 114.535418 1163.425635 11827.976468 11028.01038  10810.987825 2    51   396  6636 9403 937
MSFT 2000.01.01 11.776656 106.472172 1138.718459 10720.778545 10164.638399 11348.744314 9    79   691  533  5669 72
FB   2000.01.01 11.515097 118.674854 1153.305462 10478.6335   12160.662041 13874.09572  3    29   592  2097 4103 113
MSFT 2000.01.01 11.72034  105.760547 1139.238066 10669.293733 11314.226676 12560.093619 1    99   166  2282 9167 483
TSLA 2000.01.01 10.272615 114.748639 1043.019437 11508.695323 11825.865846 10495.364306 6    43   95   9433 6641 490
  1. 其它注意事項

6.1 不同編碼的數據的處理

由於DolphinDB的字符串採用UTF-8編碼,加載的文件必須是UTF-8編碼。若為其它形式的編碼,可以在導入以後進行轉化。DolphinDB提供了convertEncode、fromUTF8和toUTF8函數,用於導入數據後對字符串編碼進行轉換。

例如,使用convertEncode函數轉換表tmpTB中的exchange列的編碼:

dataFilePath="/home/data/candle_201801.csv"
tmpTB=loadText(filename=dataFilePath, skipRows=0)
tmpTB.replaceColumn!(`exchange, convertEncode(tmpTB.exchange,"gbk","utf-8"));

6.2 數值類型的解析

本教程第1節介紹了DolphinDB在導入數據時的數據類型自動解析機制,本節講解數值類型數據的解析。在數據導入時,若指定數據類型為數值類型(包括CHAR,SHORT,INT,LONG,FLOAT和DOUBLE),則系統能夠識別以下幾種形式的數據:

  • 數字表示的數值,例如:123
  • 以逗號分隔的數字表示的數值,例如:100,000
  • 帶有小數點的數字表示的數值,即浮點數,例如:1.231
  • 科學計數法表示的數值,例如:1.23E5

DolphinDB在導入時會會自動忽略數字前後的字母及其他符號,如果沒有出現任何數字,則解析為NULL值。下面結合例子具體説明。

首先,執行以下腳本,創建一個文本文件。

dataFilePath="/home/data/testSym.csv"
prices1=["2131","$2,131", "N/A"]
prices2=["213.1","$213.1", "N/A"]
totals=["2.658E7","-2.658e7","2.658e-7"]
tt=table(1..3 as id, prices1 as price1, prices2 as price2, totals as total)
saveText(tt,dataFilePath);

創建的文本文件中,price1和price2列中既有數字,又有字符。若不指定schema參數導入數據,DolphinDB會將price1和price2列均識別為SYMBOL類型:

tmpTB=loadText(dataFilePath)
tmpTB;

id price1 price2 total
-- ------ ------ --------
1  2131   213.1  2.658E7
2  $2,131 $213.1 -2.658E7
3  N/A    N/A    2.658E-7

tmpTB.schema().colDefs;

name   typeString typeInt comment
------ ---------- ------- -------
id     INT        4
price1 SYMBOL     17
price2 SYMBOL     17
total  DOUBLE     16

若分別指定price1和price2列為INT和FLOAT類型,DolphinDB在導入時會會自動忽略數字前後的字母及其他符號。如果沒有出現任何數字,則解析為NULL值。

schemaTB=table(`id`price1`price2`total as name, `INT`INT`FLOAT`DOUBLE as type)
tmpTB=loadText(dataFilePath,,schemaTB)
tmpTB;

id price1 price2     total
-- ------ ---------- --------
1  2131   213.100006 2.658E7
2  2131   213.100006 -2.658E7
3                    2.658E-7

6.3 自動脱去文本外的雙引號

在CSV文件中,有時候會用雙引號來處理文本和數值中含有的特殊字符(譬如分隔符)的字段。DolphinDB處理這樣的數據時,會自動脱去文本外的雙引號。下面結合例子具體説明。

首先生成示例數據。生成的文件中,num列數據為使用三位分節法表示的數值。

dataFilePath="/home/data/testSym.csv"
tt=table(1..3 as id,  [""500"",""3,500"",""9,000,000""] as num)
saveText(tt,dataFilePath);

導入數據並查看錶內數據,DolphinDB自動脱去了文本外的雙引號。

tmpTB=loadText(dataFilePath,,schemaTB)
tmpTB;

id num
-- -------
1  500
2  3500
3  9000000

附錄

本教程的例子中使用的數據文件: candle_201801.csv。

Add a new 評論

Some HTML is okay.