【Java】ElasticSearch 在项目里的应用

发布时间 2023-09-22 17:09:38作者: emdzz

一、前言:

好久没写笔记了,最近忙一个项目,用到ES查询,以往的笔记写ES都是搭建环境,用Kibana玩一玩

这次是直接调用API操作了,话不多说,进入主题

 

二、环境前提:

公司用的还是纯ElasticSearch的API库,并没有Spring-Data-ES的包装

ElasticSearch版本是7.3.1

这是封装的包:

<!-- es start -->
<dependency>
	<groupId>cn.ymcd.comm</groupId>
	<artifactId>comm-elasticsearch</artifactId>
	<version>1.0.3</version>
</dependency>

然后看下里面的ES依赖:

<dependency>
  <groupId>org.elasticsearch.client</groupId>
  <artifactId>elasticsearch-rest-high-level-client</artifactId>
  <version>7.3.0</version><!--$NO-MVN-MAN-VER$-->
</dependency>

<dependency>
  <groupId>org.elasticsearch</groupId>
  <artifactId>elasticsearch</artifactId>
  <version>7.3.0</version><!--$NO-MVN-MAN-VER$-->
</dependency>

<dependency>
  <groupId>org.elasticsearch.client</groupId>
  <artifactId>transport</artifactId>
  <version>7.3.0</version><!--$NO-MVN-MAN-VER$-->
</dependency>

<dependency>
  <groupId>org.elasticsearch.plugin</groupId>
  <artifactId>transport-netty4-client</artifactId>
  <version>7.3.0</version><!--$NO-MVN-MAN-VER$-->
</dependency>

<dependency>
  <groupId>org.elasticsearch.client</groupId>
  <artifactId>elasticsearch-rest-client</artifactId>
  <version>7.3.0</version><!--$NO-MVN-MAN-VER$-->
</dependency>

  

包源码只有一个客户端类:

 

类似JDBC的连接,提供主机,账户密码信息,调用客户端对象方法获取连接资源

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//

package cn.ymcd.comm.elasticsearch;

import cn.ymcd.comm.base.log.LogFactory;
import cn.ymcd.comm.base.log.YmcdLogger;
import java.util.Arrays;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;

@Configuration
@Component("esClient")
public class EsRestClient implements AutoCloseable {
    private YmcdLogger logger = LogFactory.getLogger(this.getClass());
    @Value("${elasticsearch.host-name}")
    private String hostName;
    @Value("${elasticsearch.port:9200}")
    private int port;
    @Value("${elasticsearch.cluster:}")
    private String cluster;
    @Value("${elasticsearch.userName:}")
    private String userName;
    @Value("${elasticsearch.password:}")
    private String password;
    protected RestHighLevelClient client;

    public EsRestClient() {
    }

    public RestHighLevelClient getEsClient() {
        RestClientBuilder builder = null;
        if (StringUtils.isNotBlank(this.cluster)) {
            this.logger.debug("connect to cluster server...");
            List<HttpHost> esHosts = (List)Arrays.stream(this.cluster.split(",")).map(HttpHost::create).collect(Collectors.toList());
            builder = RestClient.builder((HttpHost[])esHosts.toArray(new HttpHost[esHosts.size()]));
        } else {
            this.logger.debug("connect to single node server...");
            builder = RestClient.builder(new HttpHost[]{new HttpHost(this.hostName, this.port)});
        }

        if (StringUtils.isNotBlank(this.userName) && StringUtils.isNotBlank(this.password)) {
            final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
            credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(this.userName, this.password));
            builder.setHttpClientConfigCallback(new HttpClientConfigCallback() {
                public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
                    return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                }
            });
        }

        this.client = new RestHighLevelClient(builder);
        this.logger.debug("connected to server!");
        return this.client;
    }

    public void close() {
        if (this.client != null) {
            this.logger.debug("close es client...");

            try {
                this.client.close();
            } catch (Exception var2) {
                this.logger.error("close es client error!", var2);
            }
        }

    }
}

 

三、API封装:

封装了,但是没完全封装

1、我要做一个翻页查询都没有,还得我自己加上去整一个,麻了

2、有提供一个ES的索引名称注解和泛型声明,为什么返回类型没有一个按泛型返回的,还得是自己写

package cn.ymcd.perception.common.service;

import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.sort.SortBuilder;

import java.util.List;

/**
 * ES接口
 *
 * @projectName: perception-task-server
 * @author: panx
 * @date: 2023年09月12日 10:04
 * @version: 1.0
 */
public interface IEsBaseService<T> {

    /**
     * 基础查询 根据id查询 数据
     *
     * @param entity
     * @param id
     * @return java.lang.String
     * @author panx
     * @createTime 2023/9/12 0012 17:14
     */
    String getById(T entity, String id);

    /**
     * 根据多个id 查询数据信息
     *
     * @param entity
     * @param list
     * @return org.elasticsearch.action.search.SearchResponse
     * @author panx
     * @createTime 2023/9/12 0012 17:14
     */
    SearchResponse findByIdsList(T entity, List<String> list);

    /**
     * 查询所有信息
     *
     * @param from
     * @param size
     * @param entity
     * @return org.elasticsearch.search.SearchHit[]
     * @author panx
     * @createTime 2023/9/12 0012 17:15
     */
    SearchHit[] queryAll(int from, int size, T entity);

    /**
     * 根据条件查询
     *
     * @param indexName
     * @param page
     * @param boolQueryBuilder
     * @param highlightBuilder
     * @param sortBuilder
     * @return org.elasticsearch.action.search.SearchResponse
     * @author panx
     * @createTime 2023/9/12 0012 17:15
     */
    SearchResponse whereQuery(String indexName, Page page, BoolQueryBuilder boolQueryBuilder, HighlightBuilder highlightBuilder, SortBuilder sortBuilder);


    /**
     * @author OnCloud9
     * @date 2023/9/14 13:36
     * @description 翻页查询
     * @param tClass
     * @param page
     * @param boolQueryBuilder
     * @param highlightBuilder
     * @param sortBuilder
     * @return com.baomidou.mybatisplus.extension.plugins.pagination.Page<T>
     */
    <Entity> Page<Entity> pageQuery(Class<Entity> tClass, Page page, BoolQueryBuilder boolQueryBuilder, HighlightBuilder highlightBuilder, SortBuilder sortBuilder);

    /**
     * 设置高亮显示字段
     *
     * @param fields
     * @return org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder
     * @author panx
     * @createTime 2023/9/12 0012 17:15
     */
    HighlightBuilder highlightBuilder(String... fields);

    /**
     * 获取排序规则
     *
     * @param fieldSort
     * @param isDesc
     * @return org.elasticsearch.search.sort.SortBuilder
     * @author panx
     * @createTime 2023/9/12 0012 17:15
     */
    SortBuilder getSortBuilder(String fieldSort, Boolean isDesc);

    /**
     * 获取高亮显示的值
     *
     * @param hit
     * @param field
     * @return java.lang.String
     * @author panx
     * @createTime 2023/9/12 0012 17:15
     */
    String getHighlightContent(SearchHit hit, String field);

    /**
     * 查询 索引中所有满足条件数据 游标  查询
     *
     * @param response
     * @param restHighLevelClient
     * @return java.util.List<org.elasticsearch.search.SearchHits>
     * @author panx
     * @createTime 2023/9/12 0012 17:16
     */
    List<SearchHits> getAllData(SearchResponse response, RestHighLevelClient restHighLevelClient);

    /**
     * 获取ES中的总数
     *
     * @param indexName
     * @param boolQueryBuilder
     * @return long
     * @author panx
     * @createTime 2023/9/12 0012 17:16
     */
    long getCount(String indexName, BoolQueryBuilder boolQueryBuilder);

    /**
     * 获取 注解索引名称
     *
     * @param entity
     * @return java.lang.String
     * @author panx
     * @createTime 2023/9/12 0012 17:16
     */
    String getIndexName(T entity);

    /**
     * @author OnCloud9
     * @date 2023/9/17 17:10
     * @description 聚合查询
     * @params [tClass, aggregationBuilder, resultName]
     * @return java.util.List<java.util.Map<java.lang.String,java.lang.String>>
     */
    <Entity> List<Terms.Bucket>  getAggregationQuery(Class<Entity> tClass, AggregationBuilder aggregationBuilder, String resultName);

    /**
     * @author OnCloud9
     * @date 2023/9/18 15:40
     * @description 条件聚合查询
     * @params [tClass, boolQueryBuilder, aggregationBuilder, resultName]
     * @return java.util.List<org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket>
     */
    <Entity> List<Terms.Bucket>  getConditionAggregationQuery(
            Class<Entity> tClass,
            BoolQueryBuilder boolQueryBuilder,
            AggregationBuilder aggregationBuilder,
            String resultName
    );
}

  

首先是实体的索引名称注解:

我们项目的mysql表名 直接对应到es的索引名上,数据来源也是mysql推到es上面,统一规范了

package cn.ymcd.perception.base;

import java.lang.annotation.*;

/***
 * 注解 索引信息
 * @param
 * @return
 * @author gaof
 * @createTime 2020/3/17 17:05
 * @version: 1.0
 */
@Inherited
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
public @interface EsIndex {

    /***
     * 索引名称
     * @param
     * @return java.lang.String
     * @author gaof
     * @createTime 2020/3/17 17:06
     * @version: 1.0
     */
    String indexName();

}

  

 

注解的获取方式,因为做ES的CRUD都需要知晓是在哪个索引下操作

所以每个方法必定需要索引的参数传入,

叼毛同事非要从对象反射过去找注解,我给他重载下,从字节对象读就行了

这里说下接口上的泛型声明很不好,因为我在写业务的时候发现并不是只有一个索引要操作,需要多个索引操作,这个泛型声明限定死了

所以我写的方法都改用方法泛型,这样调用的时候才支持不同的业务实体

@Override
public String getIndexName(T entity) {
    EsIndex index = entity.getClass().getAnnotation(EsIndex.class);
    String indexName = index.indexName();
    if (StringUtils.isBlank(indexName)) {
        logger.error("注解索引名称为空");
        throw new ElasticsearchException("注解indexName(索引名称)为空");
    }
    return indexName;
}
public <Entity> String getIndexName(Class<Entity> tClass) {
    EsIndex index = tClass.getAnnotation(EsIndex.class);
    String indexName = index.indexName();
    if (StringUtils.isBlank(indexName)) {
        logger.error("注解索引名称为空");
        throw new ElasticsearchException("注解indexName(索引名称)为空");
    }
    return indexName;
}

  

其它的就是如何操作API了

这里重点说下PageQuery这个方法,要解决几个问题:

1、怎么接收返回的结果,ES的结果叫命中对象,放在一个数组里面,存的是JSON串

  这里根据入参的实体类字节对象,交给可以做JSON序列化的工具活化JSON给对象就行了,这里用的FastJson

2、解决翻页问题,这里我不想冗余代码了,所以直接在同事写的whereQuery基础上套参写

  在查询前算好from + to的参数值,返回的结果集塞回Page翻页对象交出去,

  可以不传Page对象,那我默认认为调用者需要查询全部记录,就按一般索引支持的最大记录数翻页

3、索引翻页问题,因为在2上面说过,索引存在一个最大记录数的限制,有可能这个索引存了一万五千条数据,但是翻页查询只能翻到前一万条数据

  在这个封装的工具方法中可以使用getCount方法获取真实的总记录数,也可以通过查询响应的命中对象获取总共的命中数量

  解决的方法可以参考下链接: https://zhuanlan.zhihu.com/p/489562200

  无非就三种, 1 调参数加大、2 Scroll滚动查询、3 SearchAfter标记查询

  而在我的业务场景就是把ES数据带到功能上,要翻页查询,经理说不能调参数,后面两种办法又不能实现分页功能

  所以折中的办法就是不调整,查到1万位置,默认认为用户不需要再看后面的内容

@Override
public String getById(T entity, String id) {
    String real = "";
    String indexName = getIndexName(entity);
    SearchSourceBuilder builder = new SearchSourceBuilder();
    SearchRequest request = new SearchRequest(indexName);
    builder.query(QueryBuilders.termQuery("id", id));
    request.source(builder);
    SearchResponse response = null;
    try (RestHighLevelClient restHighLevelClient = getEsClient()) {
        response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
        for (SearchHit hit : response.getHits()) {
            real = hit.getSourceAsString();
        }
    } catch (IOException e) {
        logger.error("根据id查询数据获取索引异常", e);
    }
    return real;
}
@Override public SearchResponse findByIdsList(T entity, List<String> list) { SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); SearchRequest request = new SearchRequest(getIndexName(entity)); sourceBuilder.query(QueryBuilders.termsQuery("id", list)); request.source(sourceBuilder); SearchResponse response = null; try (RestHighLevelClient restHighLevelClient = getEsClient()) { response = restHighLevelClient.search(request, RequestOptions.DEFAULT); } catch (IOException e) { logger.error("根据多个id查询数据异常", e); } return response; }
@Override public SearchHit[] queryAll(int from, int size, T entity) { SearchRequest searchRequest = new SearchRequest(getIndexName(entity)); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.query(); searchSourceBuilder.from(from); searchSourceBuilder.size(size); SearchHit[] hitsArr = null; try (RestHighLevelClient restHighLevelClient = getEsClient()) { searchRequest.source(searchSourceBuilder); SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); close(); SearchHits hits = searchResponse.getHits(); hitsArr = hits.getHits(); } catch (IOException e) { this.logger.error("查询数据失败", e); } return hitsArr; }
@Override public SearchResponse whereQuery(String indexName, Page page, BoolQueryBuilder boolQueryBuilder, HighlightBuilder highlightBuilder, SortBuilder sortBuilder) { if (StringUtils.isBlank(indexName)) { return null; } SearchRequest searchRequest = new SearchRequest(indexName); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); // 设置查询条数 if (null != page) { Long current = page.getCurrent(); Long size = page.getSize(); searchSourceBuilder.from(current.intValue()); searchSourceBuilder.size(size.intValue()); } //设置需要排序的字段 if (null != sortBuilder) { searchSourceBuilder.sort(sortBuilder); } // 设置高亮,使用默认的highlighter高亮器 if (null != highlightBuilder) { searchSourceBuilder.highlighter(highlightBuilder); } searchSourceBuilder.query(boolQueryBuilder); try (RestHighLevelClient restHighLevelClient = getEsClient()) { searchRequest.source(searchSourceBuilder); logger.info("query ES where... indexName = " + indexName + ":" + searchSourceBuilder.toString()); return restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); } catch (Exception e) { logger.error("查询ES数据信息失败", e); } return null; }
@Override public <Entity> Page<Entity> pageQuery(Class<Entity> tClass, Page page, BoolQueryBuilder boolQueryBuilder, HighlightBuilder highlightBuilder, SortBuilder sortBuilder) { String indexName = getIndexName(tClass); if (Objects.isNull(page)) page = new Page<>(1, 10000); Long current = page.getCurrent(); page.setCurrent((current - 1L) * page.getSize()); SearchResponse searchResponse = whereQuery(indexName, page, boolQueryBuilder, highlightBuilder, sortBuilder); SearchHits searchHits = searchResponse.getHits(); long value = searchResponse.getHits().getTotalHits().value; /* 超出最大记录数配置,按最大记录数返回 */ List<Entity> records = new ArrayList<>(); for (SearchHit searchHit : searchHits) { String recordJson = searchHit.getSourceAsString(); logger.info("recordJson " + recordJson); Entity t = JSON.parseObject(recordJson, tClass); records.add(t); } page.setRecords(records); page.setTotal(value); return page; }
@Override public HighlightBuilder highlightBuilder(String... fields) { if (null != fields && fields.length > 0) { HighlightBuilder highlightBuilder = new HighlightBuilder(); for (String field : fields) { highlightBuilder.field(field); } highlightBuilder.preTags("<span style=\"color:red;\">") .postTags("</span>"); return highlightBuilder; } return null; }
@Override public SortBuilder getSortBuilder(String fieldSort, Boolean isDesc) { if (Boolean.TRUE.equals(isDesc)) { return SortBuilders.fieldSort(fieldSort).order(SortOrder.DESC); } return SortBuilders.fieldSort(fieldSort).order(SortOrder.ASC); }
@Override public String getHighlightContent(SearchHit hit, String field) { if (StringUtils.isBlank(field)) { return null; } HighlightField highlightField = hit.getHighlightFields().get(field); StringBuilder sub = new StringBuilder(); if (null != highlightField) { Text[] contents = highlightField.getFragments(); if (null != contents) { for (Text t : contents) { sub.append(t); } } } return sub.toString(); }
@Override public List<SearchHits> getAllData(SearchResponse response, RestHighLevelClient restHighLevelClient) { boolean succeeded = false; List<SearchHits> hitList = new ArrayList<>(); try { String scrollId = response.getScrollId(); SearchHits searchHits = response.getHits(); hitList.add(searchHits); // 根据游标查询所有数据 while (searchHits.getHits() != null && searchHits.getHits().length > 0) { SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId); scrollRequest.scroll(TimeValue.timeValueMillis(30)); response = restHighLevelClient.scroll(scrollRequest, RequestOptions.DEFAULT); scrollId = response.getScrollId(); searchHits = response.getHits(); hitList.add(searchHits); } // 查询完清除游标 ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); clearScrollRequest.addScrollId(scrollId); ClearScrollResponse clearScrollResponse = restHighLevelClient.clearScroll(clearScrollRequest, RequestOptions.DEFAULT); succeeded = clearScrollResponse.isSucceeded(); } catch (IOException e) { logger.error("统一方法索引查询所有数据发生异常", e); } if (!succeeded) { return new ArrayList<>(); } return hitList; }
@Override public long getCount(String indexName, BoolQueryBuilder boolQueryBuilder) { CountRequest countRequest = new CountRequest(indexName); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.query(boolQueryBuilder); try (RestHighLevelClient restHighLevelClient = getEsClient()) { countRequest.source(searchSourceBuilder); CountResponse countResponse = restHighLevelClient.count(countRequest, RequestOptions.DEFAULT); return countResponse.getCount(); } catch (Exception e) { logger.error("统计ES数据失败", e); } return 0L; }
@SuppressWarnings("Duplicates") @Override public <Entity> List<Terms.Bucket> getAggregationQuery(Class<Entity> tClass, AggregationBuilder aggregationBuilder, String resultName) { String indexName = getIndexName(tClass); SearchRequest searchRequest = new SearchRequest(indexName); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.aggregation(aggregationBuilder); searchSourceBuilder.size(0); try (RestHighLevelClient restHighLevelClient = getEsClient()) { searchRequest.source(searchSourceBuilder); logger.info("query ES where... indexName = " + indexName + ":" + searchSourceBuilder.toString()); SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); Aggregations aggregations = searchResponse.getAggregations(); Terms terms = aggregations.get(resultName); List<? extends Terms.Bucket> buckets = terms.getBuckets(); List<Terms.Bucket> returnBuckets = new ArrayList<>(buckets.size()); returnBuckets.addAll(buckets); return returnBuckets; } catch (Exception e) { logger.error("查询ES数据信息失败", e); return Collections.emptyList(); } }
@SuppressWarnings("Duplicates") @Override public <Entity> List<Terms.Bucket> getConditionAggregationQuery(Class<Entity> tClass, BoolQueryBuilder boolQueryBuilder, AggregationBuilder aggregationBuilder, String resultN String indexName = getIndexName(tClass); SearchRequest searchRequest = new SearchRequest(indexName); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.aggregation(aggregationBuilder); searchSourceBuilder.query(boolQueryBuilder); searchSourceBuilder.size(0); try (RestHighLevelClient restHighLevelClient = getEsClient()) { searchRequest.source(searchSourceBuilder); logger.info("query ES where... indexName = " + indexName + ":" + searchSourceBuilder.toString()); SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); Aggregations aggregations = searchResponse.getAggregations(); Terms terms = aggregations.get(resultName); List<? extends Terms.Bucket> buckets = terms.getBuckets(); List<Terms.Bucket> returnBuckets = new ArrayList<>(buckets.size()); returnBuckets.addAll(buckets); return returnBuckets; } catch (Exception e) { logger.error("查询ES数据信息失败", e); return Collections.emptyList(); } }