| 日期 | 作者 | 版本 | 備註 |
|---|---|---|---|
| 2020-07-02 | dingbin | v1.0 | |
1. ElasticSearch集羣構建索引
上一篇文章Elasticsearch7.8詳盡使用指南(一):ElasticSearch集羣部署實踐中我們講述了ElasticSearch集羣部署方法。Es集羣已經部署好了,接下來我們要新建索引,並展示如何構建全量和增量數據到索引中去的詳盡實踐方法論。
注意:
本文所述設計的所有的程序安裝包和相關源代碼均提供下載:es7.8package.rar 提取碼: x4gg
解壓後內容如下圖:(其中esproj.zip是本文開源的es構建全量或實時索引的相關java源代碼)
![]()
本文針對項目的測試數據假定來源都存儲在mysql數據庫中。如下圖所示:
數據庫data庫中的position表中共計有91644條數據。我們以此91644條數據為例逐步demo實踐如果將該表中所有數據構建到es索引中去。
1.1. 新建索引和配置mapping
1.1.1. Es restful API
接下來我們在敍述es新建索引和配置mapping的過程中會大量地用到es提供的restful API。以下羅列最常用的es restful API:
查看es集羣健康狀態:curl http://192.168.0.110:19200/_cat/health?
查看es集羣結點:curl http://192.168.0.110:19200/_cat/nodes?v
可見有3個節點es-node1,es-node2和es-node3 。
查看es集羣有哪些索引:curl http://192.168.0.110:19200/_cat/indices?v
可見es目前沒有任何索引
在elasticsearch7.x以前的版本中,可以這麼理解: es也相當於一個數據庫,都可以存儲數據。我們對照mysql存儲數據需要先創建database和table,相應地,es存儲數據之前,也要創建index和type,我們簡單感性地認為index和type相當於mysql的database和table。顯然同mysql創建table一樣,我們要定義索引數據的schema,在es中有特定的術語叫做mapping,相當於mysql的table定義。但是這樣的理解在es7.x後不成立了!!!!
如上圖所示,elasticsearch 7.x版本之後已經徹底去除了type的概念,elasticsearch7默認不在支持指定索引類型,默認索引類型是_doc。index下面不再允許設置type。直接給某個index設置mapping即可。相當於index就是mysql數據庫中的dabase.table了。一個index唯一對應一個 mapping且不再支持type是es 7.x以後的正確現狀。更詳細的細節可參考:
https://www.elastic.co/guide/en/elasticsearch/reference/current/removal-of-types.html
https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping.html#mapping
假定對應position招聘職位表數據,我們新建的es的index名稱叫:position。
創建索引名稱為position: curl -XPUT http://192.168.0.110:19200/position?pretty
再次查看索引:curl http://192.168.0.110:19200/_cat/indices?v
可見名稱叫position的index已經被創建,它的docs.count為0,目前沒有任何文檔即數據。
刪除索引job: curl -XDELETE http://192.168.0.110:19200/position?pretty
接下來要為position 索引庫新建一個mapping。下面重點敍述es的mapping即映射。
1.1.2. ES映射(mapping)
查看指定index的mapping命令:
curl -XGET http://192.168.0.110:19200/position/_mapping?pretty
可見此時索引沒有任何mapping。Mapping為空。
查詢所有index中的mapping: curl -XGET http://192.168.0.110:19200/_all/_mapping?pretty
Es7.x 版本下為position這個index新建一個mapping的restful API語法有如下兩種合法的形式:
1) 事先不單獨創建index,而將index和mapping一次性在一個命令中創建,語法如下:
curl -XPUT http://192.168.0.110:19200/position/?pretty -H 'content-Type:application/json' -d ' {
"mappings":{
"properties":{
"title":{
"type":"text"
},
"description":{
"type":"text"
},
"price":{
"type":"double"
},
"onSale":{
"type":"boolean"
},
"type":{
"type":"integer"
},
"createDate":{
"type":"date"
}
}
}
}'
2) 分階段創建:先單獨創建index, 再基於此index創建mapping,語法如下:
curl -XPUT http://192.168.0.110:19200/position1/_mappings?pretty -H 'content-Type:application/json' -d ' {
"properties":{
"title":{
"type":"text"
},
"description":{
"type":"text"
},
"price":{
"type":"double"
},
"onSale":{
"type":"boolean"
},
"type":{
"type":"integer"
},
"createDate":{
"type":"date"
}
}
}'
以上兩種方式我們建議方式1)一次性創建index和mapping。簡單明瞭。
最後再次查看現在的index和mapping:
查看index: curl http://192.168.0.110:19200/_cat/indices?v
再次執行查看maping: curl -XGET http://192.168.0.110:19200/_all/_mapping?pretty
至此,我們敍述瞭如何創建索引和其對應的字段類型定義mapping。這裏描述的是創建靜態mapping的方式。後面我們將利用更加便利的索引模板的方式來創建mapping。
這裏先刪除postion和position1兩個index如下:
接下來敍述一下es的字段數據類型。
1.1.3. 字段數據類型
Es支持非常多的字段數據類型:
(以下敍述的都是es7.x尤其是es7.8支持的字段數據類型,低版本的es跟這個可能有些許差別,要注意!)
- text:默認會進行分詞,支持模糊查詢(5.x之後版本string類型已廢棄,請大家使用text)。
- keyword:不進行分詞;keyword類型默認開啓doc_values來加速聚合排序操作,佔用了大量磁盤io 如非必須可以禁用doc_values。
- number:如果只有過濾場景 用不到range查詢的話,使用keyword性能更佳,另外數字類型的doc_values比字符串更容易壓縮。
- array:es不需要顯示定義數組類型,只需要在插入數據時用'[]'表示即可,'[]'中的元素類型需保持一致。
- range:對數據的範圍進行索引;目前支持 number range、date range 、ip range。
- boolean: 只接受true、false 也可以是字符串類型的“true”、“false”
- date:支持毫秒、根據指定的format解析對應的日期格式,內部以long類型存儲。
- geo_point:存儲經緯度數據對。
- ip:將ip數據存儲在這種數據類型中,方便後期對ip字段的模糊與範圍查詢。
- nested:嵌套類型,一種特殊的object類型,存儲object數組,可檢索內部子項。
1.1.3.1. 核心數據類型
核心數據類型如上圖所示,其中需要特別説明的是:
string類的text和keyword區別一定要重視:把一個string字段指定為text類型説明該字段內容是要經過中文分詞器進行分詞,然後切分成多個term用於全文檢索的。於此相對,如果一個string字段被指定為keyword,就説明內部建索引過程不會對該字段文本內容進行分詞,它會作為一個整體被建到索引正排裏去。
1.1.3.2. 地理數據類型
該數據類型主要針對支持地理座標數據進行快速經緯度查詢而支持的一種特殊的數據類型。
1.1.3.3. 複合數據類型
1.1.3.4. 數組類型
1.1.3.5. 多字段類型
更詳細的內容細節可參見網頁:
https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-types.html
1.1.4. 利用索引模板定義和新建索引
本節是我們推薦的創建索引和索引定義mapping的方式。因為它更加便利。本文采用索引模板index templates的方式來配置索引的schema。es索引可使用預定義的模板進行創建,這個模板稱作Index templates。注意:索引模板支隊所有新建立的索引有效,對已經存在的索引無效。模板設置包括settings和mappings,通過模式匹配的方式使得多個索引重用一個模板。關於索引模板的更詳細細節可參考網頁:https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-templates.html
https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-templates-v1.html
https://www.elastic.co/guide/en/elasticsearch/reference/current/dynamic-templates.html#match-mapping-type
雖然es7.8引入了新的複合索引模板,但仍然支持傳統的索引模板。本文使用傳統的索引模板格式(legacy index template )。
1.1.4.1. match_mapping_type:被探測到的數據類型
在索引模板中有一個非常重要的field就是:match_mapping_type,它代表文檔字段值被字面探測識別成的數據類型,它的規則如下:
1) true 或false 被自動探測識別為boolean類型;
2) 當date_detection開關打開時,一些符合特定日期格式字符串將會被自動探測識別為date類型;
3) 所有帶有小數部分的數字被自動探測識別為double類型。(注意不是float類型)
4) 不帶有小數部分的數字被自動識別為long類型;(注意不是integer類型。)
5) 對於對象類型統一識別為object類型;
6) 最重要的,所有字符串統一識別為string類型;
7) *代表任意類型。
上圖是一個示例。
此外,_all在7.x版本已經被copy_to所代替,可用於滿足特定場景。copy_to將字段數值拷貝到目標字段,實現類似_all的作用。
注意:copy_to的目標字段不出現在_source中
當date_detection被設置為true(默認)時,凡是string類型且符合
strict_date_optional_time設置的日期格式的字段都被識別為類型date。
strict_date_optional_time默認格式如下:
"strict_date_optional_time":"yyyy/MM/dd HH:mm:ss Z||yyyy/MM/dd Z"
date格式可以在put mapping的時候用 format 參數指定,如果不指定的話,則啓用默認格式,是"strict_date_optional_time||epoch_millis"。這表明只接受符合"strict_date_optional_time"格式的字符串值,或者long型數字。
1.1.4.2. match/unmatch/match_pattern:如何匹配與排除匹配
match表徵文檔的字段名稱匹配什麼樣的模式,是一種過濾選項。默認以通配符的形式匹配:即*或?等表示匹配含義。unmatch在上述match匹配的結果集中構建排除集。
如果想支持正則表達式匹配方式,則加上match_pattern:regex即可。一個示例如下圖所示:
1.1.4.3. 索引別名與0停機時間
一般來説重新索引過程中的會遇到的一個比較頭疼的問題是必須更新你的應用,來使用另一個索引名。索引別名正是用來解決這個問題的!
索引別名就像一個快捷方式或軟連接, 可以指向一個或多個索引, 也可以給任何需要索引名的API 使用。別名帶給我們極大的靈活性,允許我們做到:
這裏有兩種管理別名的途徑: _alias 用於單個操作, _aliases 用於原子化多個操作。
在這一章中, 我們假設你的應用採用一個叫 my_index 的索引。 而事實上, my_index 是一個指向當前真實索引的別名。真實的索引名將包含一個版本號: my_index_v1 , my_index_v2 等等。開始, 我們創建一個索引 my_index_v1 , 然後將別名 my_index 指向它:
開始, 我們創建一個索引 my_index_v1 , 然後將別名 my_index 指向它:
<1> 創建索引 my_index_v1 。
<2> 將別名 my_index 指向 my_index_v1 。
你可以檢測這個別名指向哪個索引:
或哪些別名指向這個索引:
兩者都將返回下列值:
然後, 我們決定修改索引中一個字段的映射。 當然我們不能修改現存的映射, 索引我們需要重新索引數據。 首先, 我們創建有新的映射的索引 my_index_v2 。
然後我們從將數據從 my_index_v1 遷移到 my_index_v2 。一旦我們認為數據已經被正確的索引了, 我們就將別名指向新的索引。
別名可以指向多個索引, 所以我們需要在新索引中添加別名的同時從舊索引中刪除它。 這個操作需要原子化, 所以我們需要用 _aliases 操作:
這樣,你的應用就從舊索引遷移到了新的,而沒有停機時間。
提示:
即使你認為現在的索引設計已經是完美的了,當你的應用在生產 環境使用時,還是有可能在今後有一些改變的。所以請做好準備:在應用中使用別名而不是索引。然後你就可以在任何時候重建索引。別名的開銷很小,應當廣泛使用。
Elasticsearch的別名,就類似數據庫的視圖。
創建別名:
我們為索引my_index創建一個別名my_index_alias,這樣我們對my_index_alias的操作就像對my_index的操作一樣
POST /_aliases
{
"actions": [
{
"add": {
"index": "my_index",
"alias": "my_index_alias"
}
}
]
}
別名不僅僅可以關聯一個索引,它能聚合多個索引
我們為索引my_index_1 和 my_index_2 創建一個別名my_index_alias,這樣對my_index_alias的操作(僅限讀操作),會操作my_index_1和my_index_2,類似於聚合了my_index_1和my_index_2.我們是不能對my_index_alias進行寫操作,當有多個索引時alias,不能區分到底操作哪一個
POST /_aliases
{
"actions": [
{
"add": {
"index": "my_index_1",
"alias": "my_index_alias"
}
},
{
"add": {
"index": "my_index_2",
"alias": "my_index_alias"
}
}
]
}
GET /my_index_alias/_search
{
}
創建filtered的別名:
例如對於同一個index,我們給不同人看到不同的數據,
如my_index有個字段是team,team字段記錄了該數據是那個team的。team之間的數據是不可見的。
POST /_aliases
{
"actions": [
{
"add": {
"index": "my_index",
"alias": "my_index__teamA_alias",
"filter":{
"term":{
"team":"teamA"
}
}
}
},
{
"add": {
"index": "my_index",
"alias": "my_index__teamB_alias",
"filter":{
"term":{
"team":"teamB"
}
}
}
},
{
"add": {
"index": "my_index",
"alias": "my_index__team_alias"
}
}
]
}
GET /my_index__teamA_alias/_search 只能看到teamA的數據
GET /my_index__teamB_alias/_search 只能看到teamB的數據
GET /my_index__team_alias/_search 既能看到teamA的,也能看到teamB的數據
因此在索引模板中我們強烈建議引入索引別名,以應對未來在生產環境中一定會發生的索引升級。
下圖是索引模板中設置索引別名的一個示例:
好了,到目前為止,關於索引模板的預備知識我們基本都講述完畢了。下面開始實戰過程:
為position招聘職位表數據定義一個索引模板。
1.1.4.4. 實戰:為position數據庫表數據定義動態索引模板
1.1.4.4.1. Mysql字段處理
CREATE TABLE `position` (
`id` varchar(64) NOT NULL COMMENT '緙栧彿',
`recruitment_id` varchar(64) NOT NULL COMMENT '鎷涜仒緙栧彿',
`position_name` varchar(256) DEFAULT NULL COMMENT '鑱屼綅鍚嶇О',
`student_type` varchar(256) DEFAULT NULL COMMENT '瀛﹀巻',
`student` varchar(255) DEFAULT NULL,
`majorName` longtext,
`major` longtext COMMENT '涓撲笟',
`demand_number` varchar(8) DEFAULT NULL COMMENT '闂傚倸媧犻崑鎾存叏閻熸澘鈧嘲效婢舵劕鏋?',
`position_description` longtext COMMENT '鑱屼綅鎻忚堪',
`city` text,
`cityName` text,
`college` varchar(500) DEFAULT NULL,
`sut1` varchar(255) DEFAULT NULL,
`major_standard` text COMMENT '標準專業',
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC COMMENT='招聘職位信息';
以上是招聘職位信息的 mysql數表定義。
為了實現對該數據表實時數據的索引增量,我們統一為所有業務表再 新增一個字段:_updatetime,類型為bigint,並且同時創建該字段的mysql 索引idx_for_updatetime,建索引的目的是為了快速查找定位。該字段存儲前台發送來的對該條招聘職位信息改動(第一次叫創建)的時間戳,記錄為自1970.1.1日0時0分0秒到現在經歷的毫秒數,其值類似於1594508968000。
接下來,對position 插入最後一列字段為_updatetime,值統一設定為0,mysql語句如下:
ALTER TABLE position ADD COLUMN _updatetime BIGINT NOT NULL COMMENT 'updated timestamp in epoch milliseconds' after major_standard;
為_updatetime建立索引,sql執行如下:
ALTER TABLE position ADD INDEX idx_for_updatetime(_updatetime) ;
1.1.4.4.2. Es 索引mapping數據格式設計
以上表格中除了紅色標註“是”的項目需要文本分詞以支持全文索引,其他文本不需要分詞。大部分中文分詞器在檢索時選擇ik_max_word,在檢索時選擇分詞組件:ik_smart。對應如下:
建索引時分詞模塊:"analyzer": "ik_max_word"
檢索時分詞模塊: "search_analyzer": "ik_smart"
_updatetime字段es格式為date,format 為epoch_millis。
對於major和city 這兩個字段,雖然是文本類型,但他們的取值特點如下:
可見他們都是以逗號分隔的多個部分,每個部分具有完整意義不能再被細分詞。所以我們對這兩個字段單獨採用逗號分詞器:
1.1.4.4.3. 以動態索引模板創建索引和mapping
根據前節分析,我們可以很容易寫出招聘職位信息表數據的動態索引模板,如下:
{
"index_patterns":[
"collegejob_*"
],
"order":0,
"settings":{
"number_of_shards":5,
"number_of_replicas":1,
"analysis":{
"analyzer":{
"comma":{
"pattern":",",
"type":"pattern"
}
}
}
},
"mappings":{
"_source":{
"enabled":true
},
"dynamic":"true",
"date_detection":false,
"numeric_detection":true,
"properties":{
"id":{
"type":"keyword"
},
"recruitment_id":{
"type":"keyword"
},
"demand_number":{
"type":"text",
"index":"true",
"analyzer":"ik_max_word",
"search_analyzer":"ik_smart",
"fields":{
"raw":{
"type":"keyword",
"index":true,
"ignore_above":1024
}
}
},
"major":{
"type":"text",
"analyzer":"comma",
"search_analyzer":"comma"
},
"city":{
"type":"text",
"analyzer":"comma",
"search_analyzer":"comma"
},
"updatetime":{
"type":"date",
"format":"epoch_millis"
}
},
"dynamic_templates":[
{
"string_fields":{
"match":"*",
"match_mapping_type":"string",
"mapping":{
"type":"text",
"index":"true",
"analyzer":"ik_max_word",
"search_analyzer":"ik_smart",
"fields":{
"raw":{
"type":"keyword",
"index":true,
"ignore_above":32766
}
}
}
}
}
]
},
"aliases":{
"{index}-alias":{
}
}
}
上圖是對文字版的動態索引模板的主要內容的簡要説明(可能跟文字版有些許出入,以最終的文字版為準)。
新添加動態索引模板的es restfulAPI為:
curl -XPUT http://192.168.0.110:19200/_template/collegejob_template_1?pretty -H 'content-Type:application/json' -d '
{STATEMENT} '
將該語句中的{STATEMENT} 替換為上文 文字版的 動態索引模板內容即可。
執行結果為:
接下來查詢剛才新建的索引模板,執行命令:
curl -XGET http://192.168.0.110:19200/_template/collegejob_template_1?pretty
可見能查到剛才新建的collegejob_template_1索引模板了。
刪除索引模板的命令是:
curl -XDELETE http://192.168.0.110:19200/_template/collegejob_template_1?pretty
最後根據這個動態索引模板,我們創建一個索引名稱叫做:collegejob_position_v20200712_1.
注意index名稱必須要能匹配到上面我們已經創建好的動態索引模板中的index_patterns中的值的pattern。
RestfulAPI: curl -XPUT http://192.168.0.110:19200/collegejob_position_v20200712_1?pretty
再次查看索引:curl http://192.168.0.110:19200/_cat/indices?v
查看索引collegejob_position_v20200712_1的mapping:
curl http://192.168.0.110:19200/collegejob_position_v20200712_1/_mappings?pretty
可見該index已經被正確關聯到我們預期的mapping上了。
至此,索引和mapping已經創建成功了。接下來開始構建全量索引數據和增量索引數據。
1.2. 構建全量索引數據
1.2.1. 準備測試數據
position ---招聘職位表,共計91644條測試數據。為了模擬分別構建全量索引數據和構建增量索引數據的過程,同時也模擬演示生產環境下數據產生與處理的過程,我們對測試數據做如下處理:
1)全量和增量數據的區分字段是_updatetime(_updatetime字段是個bigint型數據,存儲自1970.1.1 0:0:0 至今的毫秒數,要求所有業務數據表都必須有此字段和定義);
2)假定從當前時刻開始建全量索引索引,記錄開始建全量索引的當前時刻,記錄為S時刻:2020年 07月 12日 星期日 09:51:09 CST, 轉換成_updatetime的數據格式為:1594518666324,注意是一個13位長整數。記住這一時刻很重要。後面構建索引時也需要這個分界時間戳S。
3)編寫sql語句實現對position 表中一半數據的_updattime設置為早於時刻S的隨機位於時間段[1577808000000, S],(1577808000000為2020.1.1) 另一半數據的_updattime設置為晚於時刻S的隨機位於時間段[S,1594522719000]。(1594522719000 為S+1小時時刻)。
Mysql生成在i ≤ R ≤ j 這個範圍得到一個隨機整數R ,公式為:FLOOR(i + RAND() * (j – i + 1))
結果如下:
update position set _updatetime=FLOOR(1577808000000+RAND() * (1594518666324-1577808000000 + 1)) limit 45822 ;
update position set _updatetime=FLOOR(1594522719000+RAND() * (1594522719000-1594518666324 + 1)) where _updatetime=0 ;
接下來查詢驗證一下更新時間戳早於和晚於時刻S的數據條數:
select count(1) from position where _updatetime < 1594518666324 ;
select count(1) from position where _updatetime > 1594518666324 ;
現在表中一半數據早於時刻S,另一半數據晚於時刻S。
接下來構建mysql2es的全量索引。
1.2.2. Zookeeper迅速安裝部署方法
在後面章節全量和增量構建索引階段為了提高構建索引的效率和保證高可用,我們經常用的一個實現思路是以多機器多進程代替單機單進程。而對於多機器多進程,不可避免涉及多進程之間工作進度的協同,需要用到分佈式一致性的工具。建議使用zookeeper。比如分佈式鎖等。本節迅速簡述一下zookeeper的安裝部署和使用方法。
注意:安裝zookeeper之前需要在每台機器上安裝好jdk,建議安裝至少jdk1.8及以上版本。本文安裝的是jdk1.8。
1.2.2.1. Zookeeper技術簡介
ZooKeeper是一個分佈式的,開放源碼的分佈式應用程序協調服務,是Google的Chubby一個開源的實現,是Hadoop和Hbase的重要組件。它是一個為分佈式應用提供一致性服務的軟件,提供的功能包括:配置維護、域名服務、分佈式同步、組服務等。
ZooKeeper的目標就是封裝好複雜易出錯的關鍵服務,將簡單易用的接口和性能高效、功能穩定的系統提供給用户。
ZooKeeper包含一個簡單的原語集,提供Java和C的接口。
ZooKeeper代碼版本中,提供了分佈式獨享鎖、選舉、隊列的接口,其中分佈鎖和隊列有Java和C兩個版本,選舉只有Java版本。
1) 原理
ZooKeeper是以Fast Paxos算法為基礎的,Paxos 算法存在活鎖的問題,即當有多個proposer交錯提交時,有可能互相排斥導致沒有一個proposer能提交成功,而Fast Paxos作了一些優化,通過選舉產生一個leader (領導者),只有leader才能提交proposer,具體算法可見Fast Paxos。因此,要想弄懂ZooKeeper首先得對Fast Paxos有所瞭解。
ZooKeeper的基本運轉流程:
- 1、選舉Leader。
- 2、同步數據。
- 3、選舉Leader過程中算法有很多,但要達到的選舉標準是一致的。
- 4、Leader要具有最高的執行ID,類似root權限。
- 5、集羣中大多數的機器得到響應並follow選出的Leader。
2) 特點
在Zookeeper中,znode是一個跟Unix文件系統路徑相似的節點,可以往這個節點存儲或獲取數據。如果在創建znode時Flag設置為EPHEMERAL,那麼當創建這個znode的節點和Zookeeper失去連接後,這個znode將不再存在在Zookeeper裏,Zookeeper使用Watcher察覺事件信息。當客户端接收到事件信息,比如連接超時、節點數據改變、子節點改變,可以調用相應的行為來處理數據。Zookeeper的Wiki頁面展示瞭如何使用Zookeeper來處理事件通知,隊列,優先隊列,鎖,共享鎖,可撤銷的共享鎖,兩階段提交。
那麼Zookeeper能做什麼事情呢,簡單的例子:假設我們有20個搜索引擎的服務器(每個負責總索引中的一部分的搜索任務)和一個總服務器(負責向這20個搜索引擎的服務器發出搜索請求併合並結果集),一個備用的總服務器(負責當總服務器宕機時替換總服務器),一個web的cgi(向總服務器發出搜索請求)。搜索引擎的服務器中的15個服務器提供搜索服務,5個服務器正在生成索引。這20個搜索引擎的服務器經常要讓正在提供搜索服務的服務器停止提供服務開始生成索引,或生成索引的服務器已經把索引生成完成可以提供搜索服務了。使用Zookeeper可以保證總服務器自動感知有多少提供搜索引擎的服務器並向這些服務器發出搜索請求,當總服務器宕機時自動啓用備用的總服務器。
1.2.2.2. Zookeeper集羣快速搭建
從zookeeper官網https://zookeeper.apache.org/releases.html 下載當前最新版本的zookeeper3.6.1。
分佈式zookeeper(簡稱zk)集羣至少要求運行在3台或以上服務器上。本文講述是基於安裝在3台vmware虛擬機上,各虛擬機機器結點如下表:
本節使用的所有vmware虛擬機配置均為CPU:8核,內存6G,硬盤足夠。
根據我們一貫部署分佈式服務的做法:
1) 先創建zk用户和zk組;
groupadd zk
vim /etc/group 會發現最後一行有zk用户組
adduser -g zk zk
創建zk用户,同時加入zk用户組,自動創建zk的homedir為/home/zk
vim /etc/passwd 可以看到最後一行是zk用户。
passwd zk
為zk用户新設立密碼
將zk用户加入sudo權限
注意:本步驟非必須,可選。
chmod +w /etc/sudoers
vim /etc/sudoers
添加如下行:
然後再chmod -w /etc/sudoers
2) 創建zk服務的basedir:/opt/zk
chown -R zk:zk /opt/zk
3) 在/opt/zk下分別創建app data logs temp分別作為zk的app/data/logs/temp 目錄。
4) 配置zk
解壓zk壓縮包文件:apache-zookeeper-3.6.1-bin.tar.gz 到/opt/zk/app目錄下:
在cent7a機器上執行:
cd /opt/zk/app/apache-zookeeper-3.6.1-bin/conf
mv zoo_sample.cfg zoo.cfg
vim zoo.cfg 修改如下:
其中clientPort 2181是客户端連接zk集羣的端口,dataDir和dataLogDir分別是數據目錄和日誌目錄。文件最後的3行是用於zk集羣互聯。
server.A = B:C:D
A:zookeeper服務器的序號,即第幾號服務器.
注意這個序號要與zookeeper的myid保持一致
B:服務器的 IP 地址
C:服務器跟隨者follower與集羣中的 Leader 服務器交換信息的端口
D:如果集羣中的 Leader 服務器宕機,需要一個端口通信重新進行選舉,選出一個新的 Leader。這個端口就是用來做leader選舉的端口
注意server.1/server.2/server.3 中的1/2/3是zk 結點的序號,不同結點必須不能相同。
直接將此zoo.cfg一行不用修改原樣拷貝到cent7b和cent7c機器上相同目錄下。
接下來在cent7a的datadir即:/opt/zk/data下新創建myid文件,並寫入1:
同樣地,在cent7b的datadir即:/opt/zk/data下新創建myid文件,並寫入2:
在cent7c的datadir即:/opt/zk/data下新創建myid文件,並寫入3:
至此,zk集羣配置結束。啓動zk集羣之前不要忘記開放3台機器上2181/2888/3888 三個端口:
systemctl start firewalld
firewall-cmd --zone=public --add-port=2181/tcp --permanent
firewall-cmd --zone=public --add-port=2888/tcp --permanent
firewall-cmd --zone=public --add-port=3888/tcp --permanent
firewall-cmd --reload
Zk集羣主要操作命令如下:
- 服務端命令
在所有機器上執行:
/opt/zk/app/apache-zookeeper-3.6.1-bin/bin/zkServer.sh start/stop/status/restart ##啓動/停止/查詢狀態/重啓 zk服務
可見zk集羣成功,1個leader和2個follower。
- 客户端命令
在所有機器上執行:
/opt/zk/app/apache-zookeeper-3.6.1-bin/bin/zkCli.sh ##連接本地服務器,默認是2181端口
/opt/zk/app/apache-zookeeper-3.6.1-bin/bin/zkCli.sh -server ip:port ##連接指定zk服務器和端口
1.2.2.3. 交互式命令行使用
ZooKeeper是通過客户端腳本來操作的。客户端腳本:zkCli.sh,存放在ZooKeeper的bin目錄下。
默認連接本地的ZooKeeper服務器:#zkCli.sh
連接指定的ZooKeeper服務器:#zkCli.sh –server Server IP:port
在cent7a上運行:
/opt/zk/app/apache-zookeeper-3.6.1-bin/bin/zkCli.sh -server 192.168.0.112:2181 ,顯示如下:
執行:/opt/zk/app/apache-zookeeper-3.6.1-bin/bin/zkCli.sh -server 192.168.0.112:2181
此時進入zookeeper系統的交互模式。
此時鍵入h或help命令,可以看到交互模式下支持的命令選項,如下圖:
命令行工具的一些簡單操作如下:
1) 顯示根目錄下、文件: ls / 使用 ls 命令來查看當前 ZooKeeper 中所包含的內容
2) 顯示根目錄下、文件: ls2 / 查看當前節點數據並能看到更新次數等數據
3) 創建文件,並設置初始內容: create /zk "test" 創建一個新的 znode節點“ zk ”以及與它關聯的字符串
4) 獲取文件內容: get /zk 確認 znode 是否包含我們所創建的字符串
5) 修改文件內容: set /zk "zkbak" 對 zk 所關聯的字符串進行設置
6) 刪除文件: delete /zk 將剛才創建的 znode 刪除
7) 退出客户端: quit
8) 幫助命令: help
1.2.2.4. Java API 使用
Zookeeper提供了豐富的java api 。後續可直接在附件的java工程中詳見。
1.2.2.5. Zookeeper可視化工具 ZooInspector使用
Zookeeper 有很多可視化工具,其中一個輕便易用的工具是ZooInspector. 程序包是:ZooInspector.zip (該文件附於項目交付清單中) 。解壓後 直接在windows上雙擊 ZooInspector/build/ zookeeper-dev-ZooInspector.jar 即可打開圖形界面如下:
鍵入上面搭建好的zookeeper集羣192.168.0.112:2181 即可進入可視化界面:
1.2.3. Mysql2Es構建全量索引
1.2.3.1. 架構設計
構建從mysql2es的全量索引數據基於一個基本的假設:_updatetime字段值只會不變或增大,永遠不會減小。
構建全量索引之初,我們鎖定當前時刻,即上節中的時刻S。我們要做的就是編程實現select 表中所有_updattime <= 時刻S的數據,然後add進入到es索引collegejob_position_v20200712_1(前節已經創建好)中去即可。
實現上述編程邏輯並不十分困難,但仍然需要商榷以下細節:
1) 如果全量數據規模十分龐大,單進程程序進行mysql選擇數據並且構建全量索引到ES的過程可能會比較耗時,如果提高效率?可採用的方案是多進程實現。可以單擊多進程,也可以多機多進程。當然,如果你覺得你的數據量小,沒必要多進程時,我們的方案也可以適應。以上就是我們具體構建全量索引的重要原則:多進程構建全量索引。
2) 多進程實現時首先會遇到多個進程同時select mysql庫,如何解決數據衝突? 要避免進程A已經select 出去的數據絕對不能被進程B再次select!如何解決?
解決方案是:分佈式鎖(Distributed locks)。分佈式鎖的主流實現有很多,本文采用zookeeper實現。關於zookeeper集羣安裝部署和簡單實用本文前節中已經敍述過。
3) 最後一個細節需要知道的是,考慮到全量構建索引數據規模可能很大,全量階段向es添加文檔構建索引採用ES提供的bulk API,以提高構建效率。有關es bulk API更詳細的説明可參考:https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html 。
1.2.3.2. Java編程實現
構建全量和增量索引數據的java編程實現工程都在附件文件:esproj.zip。其項目工程目錄如下圖:
配置文件:
- c3p0-config.xml : 連接數據庫配置文件;
- log4j.properties: 日誌配置文件
- es.properties: es集羣連接信息配置文件
- esbuilder-config.properties: es索引構建配置文件(包含全量構建和增量(實時)構建配置)
- EsFullBuilder 是全量索引構建主類;
- EsInstBuilder 是增量(實時)索引構建主類;
- EsRrecuritmentPositionSearchDemo 是招聘職位信息檢索ES JAVA api demo。
下面重點解釋一下esbuilder-config.properties文件的細節:
- es.fullbuilder.version :一次全量構建索引的版本號,必須能區別於不同次構建
- es.fullbuilder.dbname:數據庫名
- es.fullbuilder.dbname:數據庫表名
- es.fullbuilder.datasource:數據庫中數據源範圍的限定,必須是逗號分隔的3項,如:0,1594518666324,5000,含義是:_updatetime起始時間戳,_updatetime終止時間戳,每次從mysql最多選取多少條數據。0,1594518666324表示(0, 1594518666324],左開右閉,select:_updatetime > 0 and _updatetime <=1594518666324 limit 5000。1594518666324是13位數字,表示1970年1月1日到現在的毫秒數,即前文所述的時刻S。
- es.fullbuilder.datasource.primarykey:數據庫中主鍵字段名;
- es.fullbuilder.sql:sql語句中select 後面 by前面的部分,常見的配置是*,表示全部字段都要往ES中建立索引。需要主要一點:默認數據庫中字段名和es索引中字段名相同,如果某個字段不同名稱,比如數據庫中字段name1要建到es索引中字段名為name2,那麼這裏的sql應當為:name1 as name2。
- es.fullbuilder.zkconnectinfo:zookeeper連接信息。注意:esfullbuilder是支持多機器分佈式多進程並行構建索引,構建進度等信息需要通過zk來存取。本程序依賴zookeeper服務。
- es.fullbuilder.esindexname:es索引名稱;
- es.fullbuilder.bulkactions:es bulk processor批量建索引參數:達到多少個request就往es發送給Es;
- es.fullbuilder.bulksizemb: es bulk processor批量建索引參數:達到多少個字節就往es發送;
- es.fullbuilder.bulkconcurrentrequests:es bulk processor批量建索引參數:開啓多少個線程來建索引;
- es.fullbuilder.bulkawaitminute:es bulk processor批量建索引參數:最長等待多少分鐘就退出builder。
編譯和運行方法:
在主目錄下執行:mvn clean install -U 即可。會生成:
1)lib目錄;
2)config目錄;
3)esbuilder.jar。
其中lib是依賴庫,config是包含esbuilder-config.properties等全部需要的配置文件。Esbuilder.jar是最終的jar包。
運行時,必須保證以上3個目錄和文件在同一個目錄下。
執行 java -cp esbuilder.jar com.freedom.es.es.main. EsFullBuilder 即可。
因為支持多機器分佈式部署,建議使用時,在多個機器上同時運行。
本文時間是在cent7a cent7b cent7c 3台機器上同時運行上述命令,執行如下:
1) 初始:
初始時由上圖可見,es中collegejob_position_v20200712_1索引中文檔數量為0.
由上圖可見,初始時zk目錄為空。(zookeepr目錄忽略)。
2) 如上圖,下面在3個機器上同時執行java -cp esbuilder.jar com.freedom.es.es.main.EsFullBuilder 命令,結果如下:
由上圖可見,3個EsFullBuilder進程在3台機器上同時並行執行 建全量索引的過程。多台機器並行執行可顯著提高構建索引效率,尤其是對於大規模數據更為明顯和必要。
上圖是執行過程中zk上樹節點的圖。能看到每個EsFullBuider會以自己所在機器IP:workdir的名稱註冊到zk上nodes節點下。data節點存放數據庫_updatetime時間戳依次被處理後的剩餘時間段範圍。
3) 幾分鐘以後,執行結束。通過下面日誌可到3個進程都把能從數據庫獲取的數據都獲取了。執行結果去zk上看,如下:
所有的實例心跳結點都已經消失了,説明全部進程都退出了。每個實例上面的內容都是SUCCESS,表明全部都成功了。
再看下es上索引情況:
可見文檔數量已經是45822條了。
再去數據庫上查詢一下:
數據庫查詢得出的記錄條數也是45822,與ES索引文檔數量一致,可見全量索引構建正確。
至此,全量索引構建過程全部結束。接下來到了構建增量實時索引過程。
1.3. 構建增量索引數據
構建增量索引是相對於構建全量索引而言的,一般指響應線上服務實時增加的文檔流數據而構建的實時索引。
增量構建索引的數據源是一張mysql表,表示線上依次發生的實時流水數據,假定這張表的名稱後綴都含有_instflow(實時流水的意思)。
對應招聘職位這一業務場景,如下創建其流水錶:
CREATE TABLE `position_instflow` (
`id` varchar(64) NOT NULL COMMENT '文檔id',
`doctype` int(11) NOT NULL COMMENT '文檔類型,增刪改分別對應數字0,1,2',
`sequenceby3` int(11) NOT NULL COMMENT '文檔id相對於總數3的序號',
`sequenceby5` int(11) NOT NULL COMMENT '文檔id相對於總數5的序號',
`sequenceby7` int(11) NOT NULL COMMENT '文檔id相對於總數7的序號',
`_updatetime` bigint(20) NOT NULL COMMENT 'updated timestamp in epoch milliseconds',
`handled` int(11) NOT NULL COMMENT '該文檔是否被處理過',
PRIMARY KEY (`id`) USING BTREE,
KEY `idx_for_sequenceby3` (`sequenceby3`),
KEY `idx_for_sequenceby5` (`sequenceby5`),
KEY `idx_for_sequenceby7` (`sequenceby7`),
KEY `idx_for_updatetime` (`_updatetime`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='招聘職位信息流水錶';
其中:
- id 是文檔id,這裏是varchar類型,可以是int類型,類型定義必須與全量數據表中的主鍵定義一致。數據也要一致。
- doctype表示對流水數據的修改類型,取值0,1,2分別表示增,刪,修改。
- sequenceby3 表示當前文檔主鍵id被hash成hash數值後求餘3後的餘數,sequenceby5,sequenceby7類型,表示求餘5和7後的餘數。該字段設計的思想是為了支持文檔id分段:餘數相同的文檔落到同一個段上去,被同一個EsInstBuilder處理。這樣設計就支持了多機器分佈式多進程實時構建索引,能顯著提高構建實時索引的效率。該字段預先就存好,並構建數據庫索引,查詢時速度能非常快。
- _updatetime 是實時文檔的更新時間戳,其內容必須與全量索引中的一致。
- handled 字段表示是否被處理過,0未處理,1已處理。
- 最後注意,為了加速mysql查詢,idx_for_sequenceby3,idx_for_sequenceby5,idx_for_sequenceby7,idx_for_updatetime 都建了mysql索引。
如果説按照文檔id分段是顯著提高了實時索引構建效率,引入一個id段內支持多機器分佈式多進程EsInstBuilder的策略,則顯著提高了實時索引構建系統的安全性和可靠性。因為每個id段有多個EsInstBuilder進程,且都分佈在多個機器上,如果其中的1個進程掛了,還有其它進程,並不影響實時構建流程。這兩個策略是我們本文實時構建索引系統設計的精華和靈魂,也是亮點。
因為我們只存儲了idx_for_sequenceby3、idx_for_sequenceby5、idx_for_sequenceby7,即表明我們目前只希望支持id分3/5/7段,更多的其他分段方案只需要修改流水錶即可支持。本文實踐以id分3段演示。
增量構建索引過程的配置文件仍然是同全量構建索引一個文件,如上圖所示。
各參數解釋如下:
上圖是構建es索引的java 工程示意圖,EsInstBuilder是主類和入口。Esbuilder-config.properties同上文所述,是配置文件。
編譯方法還是mvn clean install -U 。
程序運行方法是:仍然將config、lib、esbuider.jar 3個文件或目錄平行放置在某個目錄下。然後運行java -cp esbuilder.jar com.freedom.es.es.main. EsInstBuilder 即可。
接下來,我們以id分3段,即es.instbuilder.buildercount設置為3,es.instbuilder.buildersequence分別為0,1,2。注意,同一個id段內我們設置了2個進程,共同消費同一個id段內的實時文檔。部署分佈如下圖所示:
上圖可見3個分段,buildersequence分別要設置成為0,1,2,同一個分段內有2個進程。
再看看zk上樹形結構分佈如下圖所示:
目前是空的。
再看看ES索引文檔情況,如下圖:
這還是之前構建完全量索引後的45822篇文檔。
目前流水錶是空的。
接下來,我們分別在3個機上的以命令:java -cp esbuilder.jar com.freedom.es.es.main. EsInstBuilder 啓動6個EsInstBuilder進程,如下圖:
可見6個進程中,同屬同一id分段內的2個進程在同時爭搶1把分佈式鎖,獲得鎖的進程就去消費流水錶中的流水數據,往ES中新建實時索引。
再看看這時候的zk上樹形結點:
由上圖可見,zk上目前有6個節點代表每個進程的心跳結點,在分別共同爭搶3把鎖,有序地進行着消費流水數據的過程。因為目前流水錶中是空的,所以6個進程就在空轉。
接下來為了模擬線上實時數據,我們利用java工程中的com.freedom.es.util.TmpInstImportData主類的功能來從全量數據表中抽取所有_updatetime>1594518666324時刻S的所有文檔,動態每個40毫秒插入一條到實時流水錶中:position_instflow。
執行java -cp esbuilder.jar com.freedom.es.util. TmpInstImportData ,它會每40毫秒插入一條實時數據到流水錶中,如下圖所示:
可見這邊在插入流水數據過程中了。
再看看EsInstBuilder這邊的日誌,如下圖:
可見都分別在有序地消費流水數據了。注意圖中的耗時10秒是程序為了演示過程特地sleep 10秒每一輪,實際中消費流水速度都是非常快的。
再看看流水錶中:
流水錶中的handled字段為0表示沒有被消費,為1表示已經被消費了。圖中全部為1可見這些流水數據都被建入到ES索引中去了。
再看看此時ES索引中文檔的情況,如下圖所示:
可見es索引中文檔數據在增加,從45822已經增加到了52481,這個數字還在不斷增大。
生產環境下,上述6個EsInstBuilder要一直啓動着,永遠不能停止,以保證遠遠不斷消費線上實時來的流水數據,再構建到ES索引中去。這個過程是沒有盡頭的。
至此,構建增量索引過程已經完畢。接下來,也是最後,我們給出一個java api 查詢es服務的API demo,供生產環境使用。
1.4. ES查詢JAVA API demo
Es提供了豐富的java API 用於搜索查詢。上圖是java工程中給出的一個簡單的java API demo,它展示了檢索 position_description中含有“自然語言”這個詞的檢索召回結果。可見一共有匹配52條,要求召回了top10條。
更詳細豐富的ES java search API使用方法,可根據需要參見官方ES文檔:https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_search_apis.html