def es_mapping2dict(mapping): mapping_dict = dict() if isinstance(mapping, dict): if "properties" in mapping: for k, v in mapping.get("properties").items(): if isinstance(v, dict): if "properties" not in v: if "fields" not in v and "type" in v: field_type = v.get("type") mapping_dict[k] = field_type elif "fields" in v and "type" in v: field_type = v.get("type") mapping_dict[k] = field_type if isinstance(v.get("fields"), dict): for fk, fv in v.get("fields").items(): if "type" in fv: mapping_dict[f"{k}.{fk}"] = fv.get("type") else: mapping_dict[k] = es_mapping2dict(v) return mapping_dict def data2single_dict(source, parent_name: str = ""): result = {} if isinstance(source, dict): for k, v in source.items(): column_name = f"{parent_name}.{k}" if parent_name else k if isinstance(v, dict): result.update(data2single_dict(v, column_name)) elif isinstance(v, list) or isinstance(v, tuple): for i, vv in enumerate(v): cn = f"{column_name}.{i}" result.update(data2single_dict(vv, cn)) else: result[column_name] = v elif isinstance(source, list) or isinstance(source, tuple): for i, v in enumerate(source): column_name = f"{parent_name}.{i}" if parent_name else f"{i}" if isinstance(v, dict): result.update(data2single_dict(v, column_name)) elif isinstance(v, list) or isinstance(v, tuple): for vv in v: result.update(data2single_dict(vv, column_name)) else: result[column_name] = v else: result[parent_name] = source return result def amend_query_keyword(query, amend_dict: dict): if isinstance(query, dict): for key, value in query.items(): if key in ["term", "terms", 'range', "wildcard"]: new_value = dict() for k, v in value.items(): if not k.endswith(".keyword"): field_type = amend_dict.get(f"{k}.keyword") if field_type == "keyword": k = f"{k}.keyword" else: field_type = amend_dict.get(k) if field_type is None: k = k[:-len(".keyword")] new_value.update({k: v}) query[key] = new_value elif key in ["match", "match_phrase", "match_phrase_prefix"]: new_value = dict() for k, v in value.items(): if k.endswith(".keyword"): k = k[:-len(".keyword")] new_value.update({k: v}) query[key] = new_value elif key == "multi_match": new_fields = list() fields = query[key].get("fields", []) for field in fields: if field.endswith(".keyword"): field = field[:-len(".keyword")] new_fields.append(field) query[key] = { "query": query[key].get("query"), "fields": new_fields } elif key == "exists": field = query[key].get("field") if not field.endswith(".keyword"): field_type = amend_dict.get(f"{field}.keyword") if field_type == "keyword": field = f"{field}.keyword" else: field_type = amend_dict.get(field) if field_type is None: field = field[:-len(".keyword")] query[key] = {"field": field} else: query[key] = amend_query_keyword(value, amend_dict) elif isinstance(query, list): query = [amend_query_keyword(q, amend_dict) for q in query] return query def amend_query(query, mapping): dict_mapping = es_mapping2dict(mapping) single_dict = data2single_dict(dict_mapping) new_query = amend_query_keyword(query, single_dict) return new_query mapping = { "properties": { "basic": { "properties": { "establish_date": { "type": "date" }, } }, "name": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } } } } query = { "query": { "bool": { "must": [ {"wildcard": { "name": { "value": "*北京*" } }}, { "term": { "name": { "value": "北京百度科技有限公司" } } }, { "terms": { "name": ["北京百度科技有限公司"] } }, { "match": { "name.keyword": "北京" } }, { "range": { "basic.establish_date.keyword": { "gte": "2001-01-01", "lte": "2023-12-31" } } }, { "exists": { "field": "name" } } ] } }, "track_total_hits": True } print(amend_query(query, mapping))