动态

详情 返回 返回

用 pyparsing 3.x 將與或非邏輯表達式轉換為Elasticsearch查詢語句(qbit) - 动态 详情

前言

  • 技術棧
Python    3.11
pyparsing 3.2.3
lark      1.2.2
loguru    0.7.2

案例

  • 測試代碼
# encoding: utf-8
# author: qbit
# date: 2024-04-23
# summary: 將與或非邏輯表達式轉換為 ES 表達式

import json
import pyparsing as pp
from loguru import logger

line = 'owner=x_111 AND doc_type=%x%_222 OR author=x_333 OR organ=x_444 AND (NOT pub_year=x_555)'

operator = (
            pp.Literal(r'=x_') |            # 全等精確匹配
            pp.Literal(r'=%x%_')            # 前後模糊匹配
        )
field = pp.Word(pp.alphanums + '_')
value = pp.Word(pp.alphanums)
exprGroup: pp.Group = pp.Group(field("field") + operator("operator") + value("value"))
logicAND = pp.Word('AND')('logic')
logicOR = pp.Word('OR')('logic')
logicNOT = pp.Word('NOT')('logic')

exprForward = pp.infixNotation(
    exprGroup("Expr"),
    [
        (logicAND, 2, pp.opAssoc.LEFT, ),
        (logicOR, 2, pp.opAssoc.LEFT, ),
        (logicNOT, 1, pp.opAssoc.RIGHT, ),
    ]
).setResultsName("Result", True)

result: pp.results.ParseResults = exprForward.parseString(line, parseAll=True)
logger.debug(f"result list: \n{json.dumps(result.as_list(), indent=4)}")

def list2dsl(lst):
    r''' 將 pyparsing 解析出來的列表遞歸轉化為 Elasticsearch DSL '''
    if (len(lst) == 1) and isinstance(lst[0], list):     # 列表中只有一個列表元素
        return list2dsl(lst[0])
    if lst[0] == 'NOT':
        return {
            'bool': {
                'must_not': list2dsl(lst[1])
            }
        }
    
    match lst[1]:
        case 'AND':
            mustList = []
            for item in lst:
                if item != 'AND':
                    mustList.append(list2dsl(item))
            return {
                'bool': {
                    'must': mustList
                }
            }
        case 'OR':
            shouldList = []
            for item in lst:
                if item != 'OR':
                    shouldList.append(list2dsl(item))
            return {
                'bool': {
                    'should': shouldList
                }
            }
        case r'=x_':
            return {
                'query_string': f"{lst[0]}:{lst[2]}"
            }
        case r'=%x%_':
            return {
                'query_string': f"{lst[0]}:*{lst[2]}*"
            }
        case _:
            pass

esdsl = json.dumps(list2dsl(result.as_list()), indent=4)
logger.debug(f"es dsl: \n {esdsl}")
logger.debug(f"line: {line}")
  • 輸出列表
[
    [
        [
            [
                "owner",
                "=x_",
                "111"
            ],
            "AND",
            [
                "doc_type",
                "=%x%_",
                "222"
            ]
        ],
        "OR",
        [
            "author",
            "=x_",
            "333"
        ],
        "OR",
        [
            [
                "organ",
                "=x_",
                "444"
            ],
            "AND",
            [
                "NOT",
                [
                    "pub_year",
                    "=x_",
                    "555"
                ]
            ]
        ]
    ]
]
  • 輸出 DSL
 {
    "bool": {
        "should": [
            {
                "bool": {
                    "must": [
                        {
                            "query_string": "owner:111"
                        },
                        {
                            "query_string": "doc_type:*222*"
                        }
                    ]
                }
            },
            {
                "query_string": "author:333"
            },
            {
                "bool": {
                    "must": [
                        {
                            "query_string": "organ:444"
                        },
                        {
                            "bool": {
                                "must_not": {
                                    "query_string": "pub_year:555"
                                }
                            }
                        }
                    ]
                }
            }
        ]
    }
}

lark 實現版本

  • 2025.4 發現 pyparsing 在解析多層嵌套的複雜表達式時,速度過慢,嘗試 lark 速度快很多
  • 多層嵌套表達式示例

    (doc_type=x_pvls1o2r) AND ((((((((collection=x_pjywkacw) OR (collection=x_pogw2kli)) OR (collection=x_pgj4ygwz)) OR (collection=x_p0ly5jmy)) OR (collection=x_pedp4esz)) OR (collection=x_paghniy0)) OR (collection=x_pzckpz0n)))
  • lark 代碼示例

    # encoding: utf-8
    # author: qbit
    # date: 2025-04-14
    # summary: 將與或非邏輯表達式轉換為 ES 表達式(使用lark庫)
    
    import json
    from lark import Lark, Transformer
    from loguru import logger
    
    line = "owner=x_111 AND doc_type=%x%_222 OR (author=x_333 OR author=x_331) OR organ=x_444 AND (NOT pub_year=x_555)"
    
    # 邏輯與 pyparsing 實現保持一致
    grammar = r"""
      ?start: expr
    
      ?expr: or_expr
    
      ?or_expr: and_expr
              | or_expr "OR" and_expr  -> or_op
    
      ?and_expr: not_expr
               | and_expr "AND" not_expr  -> and_op
    
      ?not_expr: atom
               | "NOT" atom  -> not_op
    
      ?atom: term
           | "(" expr ")"
    
      term: FIELD OPERATOR VALUE
    
      FIELD: /[a-zA-Z0-9_]+/
      OPERATOR: "=x_" | "=%x%_"
      VALUE: /[a-zA-Z0-9]+/
    
      %import common.WS
      %ignore WS
    """
    
    
    class LogicExprTransformer(Transformer):
      # 定義轉換器來構建解析結果
      def term(self, children):
          field, op, val = children
          return [field.value, op.value, val.value]
    
      def and_op(self, children):
          left, right = children
          result = []
    
          # 處理左側
          if isinstance(left, list) and len(left) > 2 and left[1] == "AND":
              # 左側已經是AND表達式,展開它
              result.extend(left)
          else:
              # 左側是普通表達式
              result.append(left)
    
          # 添加當前的AND操作符
          result.append("AND")
    
          # 處理右側
          if isinstance(right, list) and len(right) > 2 and right[1] == "AND":
              # 右側已經是OR表達式,添加第一個元素和剩餘部分
              result.append(right[0])  # 添加第一個元素
              result.append("AND")  # 添加AND操作符
              result.extend(right[2:])  # 添加剩餘元素
          else:
              # 右側是普通表達式
              result.append(right)
    
          return result
    
      def or_op(self, children):
          left, right = children
          result = []
    
          # 處理左側
          if isinstance(left, list) and len(left) > 2 and left[1] == "OR":
              # 左側已經是OR表達式,展開它
              result.extend(left)
          else:
              # 左側是普通表達式
              result.append(left)
    
          # 添加當前的OR操作符
          result.append("OR")
    
          # 處理右側
          if isinstance(right, list) and len(right) > 2 and right[1] == "OR":
              # 右側已經是OR表達式,添加第一個元素和剩餘部分
              result.append(right[0])  # 添加第一個元素
              result.append("OR")  # 添加OR操作符
              result.extend(right[2:])  # 添加剩餘元素
          else:
              # 右側是普通表達式
              result.append(right)
    
          return result
    
      def not_op(self, children):
          return ["NOT", children[0]]
    
    
    # 創建解析器
    parser = Lark(grammar, parser="lalr", transformer=LogicExprTransformer())
    # 解析表達式
    result = parser.parse(line)
    logger.debug(f"result list: \n{json.dumps(result, indent=4)}")

相關資料

  • https://pypi.org/project/pyparsing/
  • https://pypi.org/project/boolean-parser
  • https://pypi.org/project/parsimonious/
  • https://pypi.org/project/sympy/

文章

  • PyParsing 官方文檔:https://pyparsing-docs.readthedocs.io/en/latest/
  • Lark 官方文檔: https://lark-parser.readthedocs.io/en/latest/index.html
  • pyparsing 學習(博客園)
  • 取代正則-使用pyparsing來定製自己的解析器(知乎)
  • Pyparsing快速構建解釋器 | 實戰搜索查詢語法(知乎)
  • parsing logical expression with pyparsing
  • Pyparsing實戰(知乎)
  • pyparsing 2.x 案例
本文出自 qbit snap
user avatar yejianfeixue 头像 aipaobudefanqie 头像 chen_christins 头像
点赞 3 用户, 点赞了这篇动态!
点赞

Add a new 评论

Some HTML is okay.