es修正query

发布时间 2023-04-08 01:34:13作者: NAVYSUMMER
def es_mapping2dict(mapping):
    mapping_dict = dict()

    if isinstance(mapping, dict):
        if "properties" in mapping:
            for k, v in mapping.get("properties").items():
                if isinstance(v, dict):
                    if "properties" not in v:
                        if "fields" not in v and "type" in v:
                            field_type = v.get("type")
                            mapping_dict[k] = field_type
                        elif "fields" in v and "type" in v:
                            field_type = v.get("type")
                            mapping_dict[k] = field_type
                            if isinstance(v.get("fields"), dict):
                                for fk, fv in v.get("fields").items():
                                    if "type" in fv:
                                        mapping_dict[f"{k}.{fk}"] = fv.get("type")

                    else:
                        mapping_dict[k] = es_mapping2dict(v)

    return mapping_dict


def data2single_dict(source, parent_name: str = ""):
    result = {}
    if isinstance(source, dict):
        for k, v in source.items():
            column_name = f"{parent_name}.{k}" if parent_name else k
            if isinstance(v, dict):
                result.update(data2single_dict(v, column_name))
            elif isinstance(v, list) or isinstance(v, tuple):
                for i, vv in enumerate(v):
                    cn = f"{column_name}.{i}"
                    result.update(data2single_dict(vv, cn))
            else:
                result[column_name] = v
    elif isinstance(source, list) or isinstance(source, tuple):
        for i, v in enumerate(source):
            column_name = f"{parent_name}.{i}" if parent_name else f"{i}"
            if isinstance(v, dict):
                result.update(data2single_dict(v, column_name))
            elif isinstance(v, list) or isinstance(v, tuple):
                for vv in v:
                    result.update(data2single_dict(vv, column_name))
            else:
                result[column_name] = v
    else:
        result[parent_name] = source
    return result


def amend_query_keyword(query, amend_dict: dict):
    if isinstance(query, dict):
        for key, value in query.items():
            if key in ["term", "terms", 'range', "wildcard"]:
                new_value = dict()
                for k, v in value.items():
                    if not k.endswith(".keyword"):
                        field_type = amend_dict.get(f"{k}.keyword")
                        if field_type == "keyword":
                            k = f"{k}.keyword"
                    else:
                        field_type = amend_dict.get(k)
                        if field_type is None:
                            k = k[:-len(".keyword")]
                    new_value.update({k: v})
                query[key] = new_value
            elif key in ["match", "match_phrase", "match_phrase_prefix"]:
                new_value = dict()
                for k, v in value.items():
                    if k.endswith(".keyword"):
                        k = k[:-len(".keyword")]
                    new_value.update({k: v})
                query[key] = new_value
            elif key == "multi_match":
                new_fields = list()
                fields = query[key].get("fields", [])
                for field in fields:
                    if field.endswith(".keyword"):
                        field = field[:-len(".keyword")]
                    new_fields.append(field)
                query[key] = {
                    "query": query[key].get("query"),
                    "fields": new_fields
                }
            elif key == "exists":
                field = query[key].get("field")
                if not field.endswith(".keyword"):
                    field_type = amend_dict.get(f"{field}.keyword")
                    if field_type == "keyword":
                        field = f"{field}.keyword"
                else:
                    field_type = amend_dict.get(field)
                    if field_type is None:
                        field = field[:-len(".keyword")]
                query[key] = {"field": field}
            else:
                query[key] = amend_query_keyword(value, amend_dict)
    elif isinstance(query, list):
        query = [amend_query_keyword(q, amend_dict) for q in query]
    return query


def amend_query(query, mapping):
    dict_mapping = es_mapping2dict(mapping)
    single_dict = data2single_dict(dict_mapping)
    new_query = amend_query_keyword(query, single_dict)
    return new_query


mapping = {
    "properties": {
        "basic": {
            "properties": {
                "establish_date": {
                    "type": "date"
                },

            }
        },
        "name": {
            "type": "text",
            "fields": {
                "keyword": {
                    "type": "keyword",
                    "ignore_above": 256
                }
            }
        }
    }
}

query = {
    "query": {
        "bool": {
            "must": [
                {"wildcard": {
                    "name": {
                        "value": "*北京*"
                    }
                }}, {
                    "term": {
                        "name": {
                            "value": "北京百度科技有限公司"
                        }
                    }
                }, {
                    "terms": {
                        "name": ["北京百度科技有限公司"]
                    }
                },
                {
                    "match": {
                        "name.keyword": "北京"
                    }
                },
                {
                    "range": {
                        "basic.establish_date.keyword": {
                            "gte": "2001-01-01",
                            "lte": "2023-12-31"
                        }
                    }
                }, {
                    "exists": {
                        "field": "name"
                    }
                }
            ]
        }
    },
    "track_total_hits": True
}

print(amend_query(query, mapping))