动态

详情 返回 返回

IK 字段級別詞典的升級之路 - 动态 详情

背景知識:詞庫的作用

IK 分詞器是一款基於詞典匹配的中文分詞器,其準確性和召回率與 IK 使用的詞庫也有不小的關係。

這裏我們先了解一下詞典匹配法的作用流程:

  1. 預先準備一個大規模的詞典,用算法在文本中尋找詞典裏的最長匹配項。這種方法實現簡單且速度快。
  2. 但面臨歧義切分和未登錄詞挑戰:同一序列可能有不同切分方式(例如“北京大學生”可以切成“北京大學/生”或“北京/大學生”),需要規則或算法消除歧義;
  3. 而詞典中沒有的新詞(如網絡流行語、人名等)無法正確切分。

可以看到詞庫是詞元產生的比對基礎,一個完善的中文詞庫能大大提高分詞器的準確性和召回率。

IK 使用的詞庫是中文中常見詞彙的合集,完善且豐富,ik_smart 和 ik_max_word 也能滿足大部分中文分詞的場景需求。但是針對一些專業的場景,比如醫藥這樣的行業詞庫、電商搜索詞、新聞熱點詞等,IK 是很難覆蓋到的。這時候就需要使用者自己去維護自定義的詞庫了。

IK 的自定義詞庫加載方式

IK 本身也支持自定義詞庫的加載和更新的,但是隻支持一個集羣使用一個詞庫。

這裏主要的制約因素是,詞庫對象與 ik 的中文分詞器執行對象是一一對應的關係。

image.png

這導致了 IK 的詞庫面對不同中文分詞場景時較低的靈活性,使用者並不能做到字段級別的詞庫加載。並且基於文件或者 http 協議的詞庫加載方式也需要不小的維護成本。

字段級別詞庫的加載

鑑於上述的背景問題,INFINI lab 加強了 IK 的詞庫加載邏輯,做到了字段級別的詞庫加載。同時將自定義詞庫的加載方式由外部文件/遠程訪問改成了內部索引查詢。

主要邏輯如圖:

image.png

這裏 IK 多中文詞庫的加載優化主要基於 IK 可以加載多詞類對象(即下面這段代碼)的靈活性,將原來遍歷一個 CJK 詞類對象修改成遍歷多個 CJK 詞類對象,各個自定義詞庫可以附着在 CJK 詞庫對象上實現不同詞庫的分詞。

do{
    //遍歷子分詞器
    for(ISegmenter segmenter : segmenters){
        segmenter.analyze(context);
    }
    //字符緩衝區接近讀完,需要讀入新的字符
    if(context.needRefillBuffer()){
          break;
    }
}

對默認詞庫的新增支持

對於默認詞庫的修改,新版 IK 也可以通過寫入詞庫索引方式支持,只要將 dict_key 設置為 default 即可。

POST .analysis_ik/_doc
{
  "dict_key": "default",
  "dict_type": "main_dicts",
  "dict_content":"楊樹林"
}

效率測試

測試方案 1:單條測試

測試方法:寫入一條數據到默認 ik_max_word 和自定義詞庫,查看是否有明顯的效率差距

  1. 創建測試索引,自定義一個包括默認詞庫的 IK 分詞器
PUT my-index-000001
{
  "settings": {
    "number_of_shards": 3,
    "analysis": {
      "analyzer": {
        "my_custom_analyzer": {
          "type": "custom",
          "tokenizer": "my_tokenizer"
        }
      },
      "tokenizer": {
        "my_tokenizer": {

          "type": "ik_max_word",
          "custom_dict_enable": true,
          "load_default_dicts":true,
          "lowcase_enable": true,
          "dict_key": "test_dic"
        }
      }
    }
  },
  "mappings": {
    "properties": {
      "test_ik": {
        "type": "text",
        "analyzer": "my_custom_analyzer"
      }
    }
  }
}
  1. 將該詞庫重複默認詞庫的內容
POST .analysis_ik/_doc
{
  "dict_key": "test_dic",
  "dict_type": "main_dicts",
  "dict_content":"""xxxx  #詞庫內容
  """
}

# debug 日誌
[2025-07-09T16:37:43,112][INFO ][o.w.a.d.Dictionary       ] [ik-1] Loaded 275909 words from main_dicts dictionary for dict_key: test_dic
  1. 測試默認詞庫和自定義詞庫的分詞效率
GET my-index-000001/_analyze
{
  "analyzer": "my_custom_analyzer",
  "text":"自強不息,楊樹林"
}

GET my-index-000001/_analyze
{
  "analyzer": "ik_max_word",
  "text":"自強不息,楊樹林"
}

圖片

圖片

打開 debug 日誌,可以看到自定義分詞器在不同的詞庫找到了 2 次“自強不息”

...
[2025-07-09T16:52:22,937][INFO ][o.w.a.c.CN_QuantifierSegmenter] [ik-1] 當前掃描詞元[息]不需要啓動量詞掃描
[2025-07-09T16:52:22,937][INFO ][o.w.a.c.CJKSegmenter     ] [ik-1] >>> WORD FOUND [自強不息] from dict [default]
[2025-07-09T16:52:22,937][INFO ][o.w.a.c.CJKSegmenter     ] [ik-1] >>> WORD FOUND [不息] from dict [default]
[2025-07-09T16:52:22,937][INFO ][o.w.a.c.CJKSegmenter     ] [ik-1] >>> WORD FOUND [自強不息] from dict [test_dic]
[2025-07-09T16:52:22,937][INFO ][o.w.a.c.CJKSegmenter     ] [ik-1] >>> WORD FOUND [不息] from dict [test_dic]
[2025-07-09T16:52:22,937][INFO ][o.w.a.c.CN_QuantifierSegmenter] [ik-1] 當前掃描詞元[,]不需要啓動量詞掃描
...

而默認詞庫只有一次

...
[2025-07-09T16:54:22,618][INFO ][o.w.a.c.CN_QuantifierSegmenter] [ik-1] 當前掃描詞元[息]不需要啓動量詞掃描
[2025-07-09T16:54:22,618][INFO ][o.w.a.c.CJKSegmenter     ] [ik-1] >>> WORD FOUND [自強不息] from dict [default]
[2025-07-09T16:54:22,618][INFO ][o.w.a.c.CJKSegmenter     ] [ik-1] >>> WORD FOUND [不息] from dict [default]
[2025-07-09T16:54:22,618][INFO ][o.w.a.c.CN_QuantifierSegmenter] [ik-1] 當前掃描詞元[,]不需要啓動量詞掃描
...

測試方案 2:持續寫入測試

測試方法:在 ik_max_word 和自定義詞庫的索引裏,分別持續 bulk 寫入,查看總體寫入延遲。

測試索引:

# ik_max_word索引
PUT ik_max_test
{
  "mappings": {
    "properties": {
      "chapter": {
        "type": "keyword"
      },
      "content": {
        "type": "text",
        "analyzer": "ik_max_word"
      },
      "paragraph_id": {
        "type": "keyword"
      },
      "random_field": {
        "type": "text"
      },
      "timestamp": {
        "type": "keyword"
      },
      "word_count": {
        "type": "integer"
      }
    }
  },
  "settings": {
    "index": {
      "number_of_shards": "1",
      "number_of_replicas": "0"
    }
  }
}

# 自定義詞庫索引
PUT ik_custom_test
{
  "mappings": {
    "properties": {
      "chapter": {
        "type": "keyword"
      },
      "content": {
        "type": "text",
        "analyzer": "my_custom_analyzer"
      },
      "paragraph_id": {
        "type": "keyword"
      },
      "random_field": {
        "type": "text"
      },
      "timestamp": {
        "type": "keyword"
      },
      "word_count": {
        "type": "integer"
      }
    }
  },
  "settings": {
    "index": {
      "number_of_shards": "1",
      "analysis": {
        "analyzer": {
          "my_custom_analyzer": {
            "type": "custom",
            "tokenizer": "my_tokenizer"
          }
        },
        "tokenizer": {
          "my_tokenizer": {
            "load_default_dicts": "true",
            "type": "ik_max_word",
            "dict_key": "test_dic",
            "lowcase_enable": "true",
            "custom_dict_enable": "true"
          }
        }
      },
      "number_of_replicas": "0"
    }
  }
}

這裏利用腳本循環寫入了一段《四世同堂》的文本,比較相同次數下,兩次寫入的總體延遲。

測試腳本內容如下:

#!/usr/bin/env python3

# -_- coding: utf-8 -_-

"""
四世同堂中文內容隨機循環寫入 Elasticsearch 腳本
目標:生成指定 bulk 次數的索引內容
"""

import random
import time
import json
from datetime import datetime
import requests
import logging
import os
import argparse
import urllib3

# 配置日誌

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(**name**)

class ESDataGenerator:
def **init**(self, es_host='localhost', es_port=9200, index_name='sisitontang_content',
target_bulk_count=10000, batch_size=1000, use_https=False, username=None, password=None, verify_ssl=True):
"""
初始化 ES 連接和配置
"""
protocol = 'https' if use_https else 'http'
self.es_url = f'{protocol}://{es_host}:{es_port}'
self.index_name = index_name
self.target_bulk_count = target_bulk_count # 目標 bulk 次數
self.batch_size = batch_size
self.check_interval = 1000 # 每 1000 次 bulk 檢查一次進度

        # 設置認證信息
        self.auth = None
        if username and password:
            self.auth = (username, password)
            logger.info(f"使用用户名認證: {username}")

        # 設置請求會話
        self.session = requests.Session()
        if self.auth:
            self.session.auth = self.auth

        # 處理HTTPS和SSL證書驗證
        if use_https:
            self.session.verify = False  # 始終禁用SSL驗證以避免證書問題
            logger.info("警告:已禁用SSL證書驗證(適合開發測試環境)")
            urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

            # 設置SSL適配器以處理連接問題
            from requests.adapters import HTTPAdapter
            from urllib3.util.retry import Retry

            # 配置重試策略
            retry_strategy = Retry(
                total=3,
                backoff_factor=1,
                status_forcelist=[429, 500, 502, 503, 504],
            )

            adapter = HTTPAdapter(max_retries=retry_strategy)
            self.session.mount("https://", adapter)

            # 設置更寬鬆的SSL上下文
            self.session.verify = False

        logger.info(f"ES連接地址: {self.es_url}")

        # 創建索引映射
        self.create_index()

    def create_index(self):
        """創建索引和映射"""
        mapping = {
            "mappings": {
                "properties": {
                    "chapter": {"type": "keyword"},
                    "content": {"type": "text", "analyzer": "ik_max_word"},
                    "timestamp": {"type": "date"},
                    "word_count": {"type": "integer"},
                    "paragraph_id": {"type": "keyword"},
                    "random_field": {"type": "text"}
                }
            }
        }

        try:
            # 檢查索引是否存在
            response = self.session.head(f"{self.es_url}/{self.index_name}")
            if response.status_code == 200:
                logger.info(f"索引 {self.index_name} 已存在")
            else:
                # 創建索引
                response = self.session.put(
                    f"{self.es_url}/{self.index_name}",
                    headers={'Content-Type': 'application/json'},
                    json=mapping
                )
                if response.status_code in [200, 201]:
                    logger.info(f"創建索引 {self.index_name} 成功")
                else:
                    logger.error(f"創建索引失敗: {response.status_code} - {response.text}")
        except Exception as e:
            logger.error(f"創建索引失敗: {e}")

    def load_text_content(self, file_path='sisitontang.txt'):
        """
        從文件加載《四世同堂》的完整文本內容
        如果文件不存在,則返回擴展的示例內容
        """
        if os.path.exists(file_path):
            try:
                with open(file_path, 'r', encoding='utf-8') as f:
                    content = f.read()
                    logger.info(f"從文件 {file_path} 加載了 {len(content)} 個字符的文本內容")
                    return content
            except Exception as e:
                logger.error(f"讀取文件失敗: {e}")

        # 如果文件不存在,返回擴展的示例內容
        logger.info("使用內置的擴展示例內容")
        return self.get_extended_sample_content()

    def get_extended_sample_content(self):
        """
        獲取擴展的《四世同堂》示例內容
        """
        content = """
        小羊圈衚衕是北平城裏的一個小衚衕。它不寬,可是很長,從東到西有一里多路。在這條衚衕裏,從東邊數起,有個小茶館,幾個小門臉,和一羣小房屋。小茶館的斜對面是個較大的四合院,院子裏有幾棵大槐樹。這個院子就是祁家的住所,四世同堂的大家庭就在這裏度過了最困難的歲月。

        祁老人是個善良的老頭兒,雖然年紀大了,可是還很有精神。他的一生見證了太多的變遷,從清朝的衰落到民國的建立,再到現在的戰亂,他都以一種達觀的態度面對着。他的兒子祁天佑是個教書先生,為人正直,在衚衕裏很有威望。祁家的兒媳婦韻梅是個賢惠的女人,把家裏打理得井井有條,即使在最困難的時候,也要維持着家庭的尊嚴。

        錢默吟先生是個有學問的人,他的詩寫得很好,可是性格有些古怪。他住在衚衕深處的一個小院子裏,平時很少出門,只是偶爾到祁家坐坐,和祁天佑聊聊古今。他對時局有着自己獨特的見解,但更多的時候,他選擇在自己的小天地裏尋找精神的慰藉。戰爭的殘酷現實讓這個文人感到深深的無力,但他依然堅持着自己的文人氣節。

        小順子是個活潑的孩子,他每天都在衚衕裏跑來跑去,和其他的孩子們一起玩耍。他的笑聲總是能感染到周圍的人,讓這個古老的衚衕充滿了生機。即使在戰爭的陰霾下,孩子們依然保持着他們的天真和快樂,這或許就是生活的希望所在。小順子不懂得大人們的煩惱,他只是簡單地享受着童年的快樂。

        李四大爺是個老實人,他在衚衕裏開了個小雜貨鋪。雖然生意不大,但是童叟無欺,街坊鄰居們都願意到他這裏買東西。他的妻子是個能幹的女人,把小鋪子管理得很好。在那個物資匱乏的年代,能夠維持一個小鋪子的經營已經很不容易了。李四大爺經常幫助鄰居們,即使自己的生活也不寬裕。

        衚衕裏的生活是平靜的,每天清晨,人們就開始忙碌起來。有的人挑着水桶去井邊打水,有的人牽着羊去街上賣奶,有的人挑着菜擔子去菜市場。這種平靜的生活在戰爭來臨之前是那麼的珍貴,人們都珍惜着這樣的日子。鄰里之間相互照顧,孩子們在院子裏玩耍,老人們在門口曬太陽聊天。

        冠曉荷是個複雜的人物,他有文化,也有野心。在日本人佔領北平的時候,他選擇了與敵人合作,這讓衚衕裏的人們都看不起他。但是他的妻子還是個好人,只是被丈夫連累了。冠曉荷的選擇代表了那個時代一部分知識分子的軟弱和妥協,他們在民族大義和個人利益之間選擇了後者。

        春天來了,衚衕裏的槐樹發芽了,小鳥們在枝頭歌唱。孩子們在院子裏玩耍,老人們在門口曬太陽。這樣的日子讓人感到温暖和希望。即使在最黑暗的時期,生活依然要繼續,人們依然要保持對美好未來的希望。春天的到來總是能夠給人們帶來新的希望和力量。

        戰爭的陰雲籠罩着整個城市,衚衕裏的人們也感受到了壓力。有的人選擇了抗爭,有的人選擇了妥協,有的人選擇了逃避。每個人都在用自己的方式應對這個艱難的時代。祁瑞宣面臨着痛苦的選擇,他既不願意與日本人合作,也不敢公開反抗,這種內心的煎熬讓他備受折磨。

        老舍先生用他細膩的筆觸描繪了衚衕裏的眾生相,每個人物都有自己的特點和命運。他們的喜怒哀樂構成了這部偉大作品的豐富內涵。從祁老爺子的達觀,到祁瑞宣的痛苦,從韻梅的堅強,到冠曉荷的墮落,每個人物都是那個時代的縮影。

        在那個動盪的年代,普通人的生活是不容易的。他們要面對戰爭的威脅,要面對生活的困難,要面對道德的選擇。但是他們依然堅強地活着,為了家人,為了希望。即使在最困難的時候,人們依然保持着對美好生活的嚮往。

        衚衕裏的鄰里關係是複雜的,有友好的,也有矛盾的。但是在大的困難面前,大家還是會相互幫助。這種鄰里之間的温情是中華民族傳統文化的重要組成部分。在那個特殊的年代,這種人與人之間的温情顯得更加珍貴。

        祁瑞宣是個有理想的青年,他受過良好的教育,有自己的抱負。但是在日本人佔領期間,他的理想和現實之間產生了尖鋭的矛盾。他不願意做漢奸,但是也不能完全抵抗。這種內心的矛盾和痛苦是那個時代很多知識分子的真實寫照。

        小妞子是個可愛的孩子,她的天真無邪給這個沉重的故事增添了一絲亮色。她不懂得大人們的複雜心理,只是簡單地生活着,快樂着。孩子們的天真和快樂在那個黑暗的年代顯得格外珍貴,它們代表着生活的希望和未來。

        程長順是個樸實的人,他沒有什麼文化,但是有自己的原則和底線。他不願意向日本人低頭,寧願過艱苦的生活也要保持自己的尊嚴。他的堅持代表了中國人民不屈不撓的精神,即使在最困難的時候也不願意妥協。

        衚衕裏的生活節奏是緩慢的,人們有時間去觀察周圍的變化,去思考生活的意義。這種慢節奏的生活在今天看來是珍貴的,它讓人們有機會去體驗生活的細節。在那個年代,即使生活艱難,人們依然能夠從平凡的日常中找到樂趣。

        老二是個有個性的人,他不願意受約束,喜歡自由自在的生活。但是在戰爭年代,這種個性給他帶來了麻煩,也給家人帶來了擔憂。他的反叛精神在某種程度上代表着年輕一代對傳統束縛的反抗,但在那個特殊的時代,這種反抗往往會帶來意想不到的後果。

        衚衕裏的四合院是北京傳統建築的代表,它們見證了一代又一代人的生活。每個院子裏都有自己的故事,每個房間裏都有自己的記憶。這些古老的建築承載着深厚的歷史文化底藴,即使在戰爭的破壞下,依然堅強地屹立着。

        在《四世同堂》這部作品中,老舍先生不僅描繪了個人的命運,也反映了整個民族的命運。小衚衕裏的故事其實就是大中國的縮影。每個人物的遭遇都代表着那個時代某一類人的命運,他們的選擇和結局反映了整個民族在那個特殊歷史時期的精神狀態。

        戰爭結束了,但是人們心中的創傷需要時間來癒合。衚衕裏的人們重新開始了正常的生活,但是那段艱難的經歷永遠不會被忘記。歷史的教訓提醒着人們珍惜和平,珍惜現在的美好生活。四世同堂的故事將永遠流傳下去,成為後人瞭解那個時代的重要窗口。
        """
        return content.strip()

    def split_text_randomly(self, text, min_length=100, max_length=200):
        """
        將文本按100-200字的隨機長度進行分割
        """
        # 清理文本,移除多餘的空白字符
        text = ''.join(text.split())

        segments = []
        start = 0

        while start < len(text):
            # 隨機選擇段落長度
            segment_length = random.randint(min_length, max_length)
            end = min(start + segment_length, len(text))

            segment = text[start:end]
            if segment.strip():  # 確保段落不為空
                segments.append(segment.strip())

            start = end

        return segments

    def generate_random_content(self, base_content):
        """
        基於基礎內容生成隨機變化的內容
        """
        # 隨機選擇一個基礎段落
        base_paragraph = random.choice(base_content)

        # 隨機添加一些變化
        variations = [
            "在那個年代,",
            "據説,",
            "人們常常説,",
            "老一輩人總是提到,",
            "歷史記錄顯示,",
            "根據回憶,",
            "有人説,",
            "大家都知道,",
            "傳説中,",
            "眾所周知,"
        ]

        endings = [
            "這就是當時的情況。",
            "這樣的事情在那個年代很常見。",
            "這個故事至今還在流傳。",
            "這是一個值得回憶的故事。",
            "這樣的經歷讓人難以忘懷。",
            "這就是老北京的生活。",
            "這種精神值得我們學習。",
            "這個時代已經過去了。",
            "這樣的生活現在已經很難看到了。",
            "這是歷史的見證。"
        ]

        # 隨機組合內容
        if random.random() < 0.3:
            content = random.choice(variations) + base_paragraph
        else:
            content = base_paragraph

        if random.random() < 0.3:
            content += random.choice(endings)

        return content

    def generate_document(self, text_segments, doc_id):
        """基於文本段落生成一個文檔"""
        # 隨機選擇一個文本段落
        content = random.choice(text_segments)

        # 生成隨機的額外字段以增加文檔大小
        random_field = ''.join(random.choices('abcdefghijklmnopqrstuvwxyz0123456789', k=random.randint(100, 500)))

        doc = {
            "chapter": f"第{random.randint(1, 100)}章",
            "content": content,
            "timestamp": datetime.now(),
            "word_count": len(content),
            "paragraph_id": f"para_{doc_id}",
            "random_field": random_field
        }

        return doc

    def get_index_size_gb(self):
        """獲取索引大小(GB)"""
        try:
            response = self.session.get(f"{self.es_url}/_cat/indices/{self.index_name}?bytes=b&h=store.size&format=json")
            if response.status_code == 200:
                data = response.json()
                if data and len(data) > 0:
                    size_bytes = int(data[0]['store.size'])
                    size_gb = size_bytes / (1024 * 1024 * 1024)
                    return size_gb
            return 0
        except Exception as e:
            logger.error(f"獲取索引大小失敗: {e}")
            return 0

    def bulk_insert(self, documents):
        """批量插入文檔使用HTTP bulk API"""
        # 構建bulk請求體
        bulk_data = []
        for doc in documents:
            # 添加action行
            action = {"index": {"_index": self.index_name}}
            bulk_data.append(json.dumps(action))
            # 添加文檔行
            bulk_data.append(json.dumps(doc, ensure_ascii=False, default=str))

        # 每行以換行符結束,最後也要有換行符
        bulk_body = '\n'.join(bulk_data) + '\n'

        try:
            response = self.session.post(
                f"{self.es_url}/_bulk",
                headers={'Content-Type': 'application/x-ndjson'},
                data=bulk_body.encode('utf-8'),
                timeout=30  # 添加超時設置
            )

            if response.status_code == 200:
                result = response.json()
                # 檢查是否有錯誤
                if result.get('errors'):
                    error_count = 0
                    error_details = []
                    for item in result['items']:
                        if 'error' in item.get('index', {}):
                            error_count += 1
                            error_info = item['index']['error']
                            error_details.append(f"類型: {error_info.get('type')}, 原因: {error_info.get('reason')}")

                    if error_count > 0:
                        logger.warning(f"批量插入有 {error_count} 個錯誤")
                        # 打印前5個錯誤的詳細信息
                        for i, error in enumerate(error_details[:5]):
                            logger.error(f"錯誤 {i+1}: {error}")
                        if len(error_details) > 5:
                            logger.error(f"... 還有 {len(error_details)-5} 個類似錯誤")
                return True
            else:
                logger.error(f"批量插入失敗: HTTP {response.status_code} - {response.text}")
                return False
        except requests.exceptions.SSLError as e:
            logger.error(f"SSL連接錯誤: {e}")
            logger.error("建議檢查ES集羣的SSL配置或使用 --no-verify-ssl 參數")
            return False
        except requests.exceptions.ConnectionError as e:
            logger.error(f"連接錯誤: {e}")
            logger.error("請檢查ES集羣地址和端口是否正確")
            return False
        except requests.exceptions.Timeout as e:
            logger.error(f"請求超時: {e}")
            logger.error("ES集羣響應超時,可能負載過高")
            return False
        except Exception as e:
            logger.error(f"批量插入失敗: {e}")
            logger.error(f"錯誤類型: {type(e).__name__}")
            return False

    def run(self):
        """運行數據生成器"""
        start_time = time.time()
        start_datetime = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        logger.info(f"開始生成數據,開始時間: {start_datetime},目標bulk次數: {self.target_bulk_count}")

        # 加載文本內容
        text_content = self.load_text_content()

        # 將文本分割成100-200字的段落
        text_segments = self.split_text_randomly(text_content, min_length=100, max_length=200)
        logger.info(f"分割出 {len(text_segments)} 個文本段落")

        doc_count = 0
        bulk_count = 0
        bulk_times = []  # 記錄每次bulk的耗時

        while bulk_count < self.target_bulk_count:
            # 生成批量文檔
            documents = []
            for i in range(self.batch_size):
                doc = self.generate_document(text_segments, doc_count + i)
                documents.append(doc)

            # 記錄單次bulk開始時間
            bulk_start = time.time()

            # 批量插入
            if self.bulk_insert(documents):
                bulk_end = time.time()
                bulk_duration = bulk_end - bulk_start
                bulk_times.append(bulk_duration)

                doc_count += self.batch_size
                bulk_count += 1

                # 定期檢查和報告進度
                if bulk_count % self.check_interval == 0:
                    current_size = self.get_index_size_gb()
                    avg_bulk_time = sum(bulk_times[-self.check_interval:]) / len(bulk_times[-self.check_interval:])
                    logger.info(f"已完成 {bulk_count} 次bulk操作,插入 {doc_count} 條文檔,當前索引大小: {current_size:.2f}GB,最近{self.check_interval}次bulk平均耗時: {avg_bulk_time:.3f}秒")

            # 避免過於頻繁的插入
            #time.sleep(0.01)  # 減少延遲,提高測試速度

        end_time = time.time()
        end_datetime = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        total_duration = end_time - start_time

        # 計算統計信息
        final_size = self.get_index_size_gb()
        avg_bulk_time = sum(bulk_times) / len(bulk_times) if bulk_times else 0
        total_docs_per_sec = doc_count / total_duration if total_duration > 0 else 0
        bulk_per_sec = bulk_count / total_duration if total_duration > 0 else 0

        logger.info(f"數據生成完成!")
        logger.info(f"開始時間: {start_datetime}")
        logger.info(f"結束時間: {end_datetime}")
        logger.info(f"總耗時: {total_duration:.2f}秒 ({total_duration/60:.2f}分鐘)")
        logger.info(f"總計完成: {bulk_count} 次bulk操作")
        logger.info(f"總計插入: {doc_count} 條文檔")
        logger.info(f"最終索引大小: {final_size:.2f}GB")
        logger.info(f"平均每次bulk耗時: {avg_bulk_time:.3f}秒")
        logger.info(f"平均bulk速率: {bulk_per_sec:.2f}次/秒")
        logger.info(f"平均文檔寫入速率: {total_docs_per_sec:.0f}條/秒")

def main():
"""主函數"""
parser = argparse.ArgumentParser(description='四世同堂中文內容寫入 Elasticsearch 腳本')
parser.add_argument('--host', default='localhost', help='ES 主機地址 (默認: localhost)')
parser.add_argument('--port', type=int, default=9200, help='ES 端口 (默認: 9200)')
parser.add_argument('--index', required=True, help='索引名稱 (必填)')
parser.add_argument('--bulk-count', type=int, default=1000, help='目標 bulk 次數 (默認: 10000)')
parser.add_argument('--batch-size', type=int, default=1000, help='每次 bulk 的文檔數量 (默認: 1000)')
parser.add_argument('--https', action='store_true', help='使用 HTTPS 協議')
parser.add_argument('--username', help='ES 用户名')
parser.add_argument('--password', help='ES 密碼')
parser.add_argument('--no-verify-ssl', action='store_true', help='禁用 SSL 證書驗證(默認已禁用)')

    args = parser.parse_args()

    protocol = "HTTPS" if args.https else "HTTP"
    auth_info = f"認證: {args.username}" if args.username else "無認證"
    ssl_info = "禁用SSL驗證" if args.https else ""
    logger.info(f"開始運行腳本,參數: {protocol}://{args.host}:{args.port}, 索引={args.index}, bulk次數={args.bulk_count}, {auth_info} {ssl_info}")

    try:
        generator = ESDataGenerator(
            args.host,
            args.port,
            args.index,
            args.bulk_count,
            args.batch_size,
            args.https,
            args.username,
            args.password,
            not args.no_verify_ssl  # 傳入verify_ssl參數,但實際上總是False
        )
        generator.run()
    except KeyboardInterrupt:
        logger.info("用户中斷了程序")
    except Exception as e:
        logger.error(f"程序運行出錯: {e}")
        logger.error(f"錯誤類型: {type(e).__name__}")

if **name** == "**main**":
main()

根據腳本中的測試文本添加的詞庫如下:

POST .analysis_ik/\_doc
{
"dict_type": "main_dicts",
"dict_key": "test_dic",
"dict_content": """祁老人
祁天佑
韻梅
祁瑞宣
老二
錢默吟
小順子
李四大爺
冠曉荷
小妞子
程長順
老舍
李四大爺
小羊圈衚衕
北平城
衚衕
小茶館
小門臉
小房屋
四合院
院子
祁家
小院子
雜貨鋪
小鋪子
井邊
街上
菜市場
門口
枝頭
城市
房間
北京
清朝
民國
戰亂
戰爭
日本人
抗戰
大槐樹
槐樹
小鳥
羊
門臉
房屋
水桶
菜擔子
鋪子
老頭兒
兒子
教書先生
兒媳婦
女人
大家庭
孩子
孩子們
街坊鄰居
妻子
老人
文人
知識分子
青年
漢奸
歲月
一生
變遷
衰落
建立
態度
威望
尊嚴
學問
詩
性格
時局
見解
小天地
精神
慰藉
現實
無力
氣節
笑聲
生機
陰霾
天真
快樂
希望
煩惱
童年
生意
生活
物資
年代
經營
日子
鄰里
文化
野心
敵人
選擇
軟弱
妥協
民族大義
個人利益
温暖
時期
未來
力量
壓力
抗爭
逃避
方式
時代
煎熬
折磨
筆觸
眾生相
人物
特點
命運
喜怒哀樂
內涵
達觀
痛苦
堅強
墮落
縮影
威脅
困難
道德
家人
嚮往
關係
矛盾
温情
傳統文化
組成部分
理想
教育
抱負
佔領
寫照
亮色
心理
原則
底線
節奏
意義
細節
樂趣
個性
約束
麻煩
擔憂
反叛精神
束縛
反抗
後果
建築
代表
故事
記憶
歷史文化底藴
破壞
作品
創傷
經歷
教訓
和平
窗口
清晨
春天
內心
玩耍
聊天
曬太陽
歌唱
合作
打水
賣奶
幫助
"""
}

進行 2 次集中寫入的記錄如下:

# ik_max_test
2025-07-13 20:15:33,294 - INFO - 開始時間: 2025-07-13 19:45:07
2025-07-13 20:15:33,294 - INFO - 結束時間: 2025-07-13 20:15:33
2025-07-13 20:15:33,294 - INFO - 總耗時: 1825.31秒 (30.42分鐘)
2025-07-13 20:15:33,294 - INFO - 總計完成: 1000 次bulk操作
2025-07-13 20:15:33,294 - INFO - 總計插入: 1000000 條文檔
2025-07-13 20:15:33,294 - INFO - 最終索引大小: 0.92GB
2025-07-13 20:15:33,294 - INFO - 平均每次bulk耗時: 1.790秒
2025-07-13 20:15:33,294 - INFO - 平均bulk速率: 0.55次/秒
2025-07-13 20:15:33,294 - INFO - 平均文檔寫入速率: 548條/秒

# ik_custom_test
2025-07-13 21:17:47,309 - INFO - 開始時間: 2025-07-13 20:44:03
2025-07-13 21:17:47,309 - INFO - 結束時間: 2025-07-13 21:17:47
2025-07-13 21:17:47,309 - INFO - 總耗時: 2023.53秒 (33.73分鐘)
2025-07-13 21:17:47,309 - INFO - 總計完成: 1000 次bulk操作
2025-07-13 21:17:47,309 - INFO - 總計插入: 1000000 條文檔
2025-07-13 21:17:47,309 - INFO - 最終索引大小: 0.92GB
2025-07-13 21:17:47,309 - INFO - 平均每次bulk耗時: 1.986秒
2025-07-13 21:17:47,309 - INFO - 平均bulk速率: 0.49次/秒
2025-07-13 21:17:47,309 - INFO - 平均文檔寫入速率: 494條/秒

可以看到,有一定損耗,自定義詞庫詞典的效率是之前的 90%。

相關閲讀

  • [IK 字段級別詞典升級:IK reload API
    ](https://infinilabs.cn/blog/2025/ik-field-level-dictionarys-2/)
  • [Easysearch 新功能: IK 字段級別詞典
    ](https://infinilabs.cn/blog/2025/ik-field-level-dictionarys/)

關於 IK Analysis

IK Analysis 插件集成了 Lucene IK 分析器,並支持自定義詞典。它支持 Easysearch\Elasticsearch\OpenSearch 的主要版本。由 INFINI Labs 維護並提供支持。

該插件包含分析器:ik_smart 和 ik_max_word,以及分詞器:ik_smart 和 ik_max_word

開源地址:https://github.com/infinilabs/analysis-ik

作者:金多安,極限科技(INFINI Labs)搜索運維專家,Elastic 認證專家,搜索客社區日報責任編輯。一直從事與搜索運維相關的工作,日常會去挖掘 ES / Lucene 方向的搜索技術原理,保持搜索相關技術發展的關注。
原文:https://infinilabs.cn/blog/2025/ik-field-level-dictionarys-3/
user avatar cuicui_623c4b541e91e 头像 thinking80s 头像
点赞 2 用户, 点赞了这篇动态!
点赞

Add a new 评论

Some HTML is okay.