Java Spring Boot 集成 elasticsearch6.8.x

发布时间 2023-12-23 16:06:32作者: 进击的davis

在全文搜索领域,毫无疑问,当下 elasticsearch 应用广泛,其优势自不必说,凭借全文快速搜索,可以在短时内实现大数据量的查询。

今天学习下在 Spring Boot 中 集成 elasticsearch 开发,这里主要展示可以怎么用,至于开发人员向通过 ElasticsearchORM 封装,也可以参考下面的示例。

环境:

  • Spring Boot: 3.1.6
  • JDK:17
  • elasticsearch:6.8.23

依赖

在添加依赖需要注意,如果服务段的 es 是什么版本,请在我们的 客户端 依赖中也用同样额版本,别问为啥,问就是版本兼容性。

<!-- High-level-Rest-Client-->
<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
    <version>6.8.23</version>
</dependency>
<!-- es 依赖 -->
<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch</artifactId>
    <version>6.8.23</version>
</dependency>
<!--   @Data     -->
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.30</version>
</dependency>

自定义配置及读取

配置文件

application.yaml

elasticsearch:
  host: 172.xx.xx.xx
  port: 9200
  user: admin
  password: password
  scheme: http

配置读取

配置类

package com.example.springbootesdemo.config;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

@Data
@Configuration
@ConfigurationProperties(prefix = "elasticsearch")
public class ESConfig {
    private String host;

    private int port;

    private String scheme;

    private String user;

    private String password;

    @Override
    public String toString() {
        return String.format("elasticsearch{host=%s, port=%d, user=%s, password=%s}", host, port, user, password);
    }
}

es客户端

package com.example.springbootesdemo.config;

import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ElasticSearchClientConfig {
    @Autowired
    private ESConfig es;

    @Bean(name = "restHighLevelClient")
    public RestHighLevelClient restHighLevelClient() {
        return new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost(es.getHost(), es.getPort(), es.getScheme())
                )
        );
    }
}

后面的测试中会用到下面的 User bean:

package com.example.springbootesdemo.model;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class User {
    private String name;

    private Integer age;

    private String[] hobbies;
}

集成使用示例

索引操作

package com.example.springbootesdemo;

import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.io.IOException;

@SpringBootTest
public class EsIndexTest {
    @Autowired
    private RestHighLevelClient restHighLevelClient;

    /**
     * 查询索引是否存在
     */
    @Test
    public void testIndexIsExist() throws IOException {
        String indexName = "demo";
        // 1.create req
        GetIndexRequest req = new GetIndexRequest(indexName);
        // 2.client run req
        boolean exists = restHighLevelClient.indices().exists(req, RequestOptions.DEFAULT);
        System.out.println(String.format("%s exist: %b", indexName, exists));
    }

    /**
     * 创建索引
     */
    @Test
    public void testIndexCreate() {
        // 1.create req
        String indexName = "demo1222";
        CreateIndexRequest req = new CreateIndexRequest(indexName);
        // 2.run req
        try {
            CreateIndexResponse resp = restHighLevelClient.indices().create(req, RequestOptions.DEFAULT);
            System.out.println("create resp: " + resp.isAcknowledged());
        } catch (IOException ex) {
            ex.printStackTrace();
        }
    }

    /**
     * 通过自定义构建索引
     * @throws IOException
     */
    @Test
    public void testIndexCreateUseXContent() throws IOException {
        XContentBuilder builder = XContentFactory.jsonBuilder();
        // build mappings and settings
        // 注意start开始,end结束
        builder.startObject()

                .startObject("settings")
                .field("priority", 80)
                .field("number_of_shards", 1)
                .field("number_of_replicas", 0)
                .endObject()

                .startObject("mappings")
                .endObject()

                .endObject();

        CreateIndexRequest request = new CreateIndexRequest("demo1223");
        request.source(builder);

        CreateIndexResponse response = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);

        System.out.println("isAcknowledge: " + response.isAcknowledged());
    }

    /**
     * 删除索引
     */
    @Test
    public void testIndexDelete() {
        String index = "demo*";
        DeleteIndexRequest req = new DeleteIndexRequest(index);
        try {
            AcknowledgedResponse resp = restHighLevelClient.indices().delete(req, RequestOptions.DEFAULT);
            System.out.println("delete result: " + resp.isAcknowledged());
        } catch (IOException ex) {
            ex.printStackTrace();
        }
    }
}

文档操作

package com.example.springbootesdemo;

import com.example.springbootesdemo.model.User;
import net.minidev.json.JSONValue;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.io.IOException;

@SpringBootTest
public class EsDocTest {
    @Autowired
    private RestHighLevelClient restHighLevelClient;

    /**
     * 插入数据
     * @throws IOException
     */
    @Test
    public void testAddDoc() throws IOException {
        User user = new User("Alice", 21, new String[]{"table tennis", "soccer"});
        IndexRequest request = new IndexRequest("demo1222");
        // set docID, refresh time
        request.type("_doc"); // 6.8.x 需要
        request.id("10001");
        request.timeout("1s");

        // put data to req, and trans to json format
        request.source(JSONValue.toJSONString(user), XContentType.JSON);

        // req
        IndexResponse response = restHighLevelClient.index(request, RequestOptions.DEFAULT);
        System.out.println(response.toString());
        System.out.println(response.status());
    }

    /**
     * 获取文档
     */
    @Test
    public void testGetDoc() throws IOException {
        GetRequest request = new GetRequest("demo1222","_doc",  "10001");
        GetResponse response = restHighLevelClient.get(request, RequestOptions.DEFAULT);

        System.out.println(response);
        System.out.println(response.getSourceAsString());
    }

    /**
     * 更新文档
     * @throws IOException
     */
    @Test
    public void testUpdateDoc() throws IOException {
        UpdateRequest request = new UpdateRequest("demo1222", "_doc", "10001");
        User user = new User("Alice", 33, new String[]{"table tennis", "soccer"});
        // put obj to doc
        request.doc(JSONValue.toJSONString(user), XContentType.JSON);

        UpdateResponse response = restHighLevelClient.update(request, RequestOptions.DEFAULT);
        System.out.println(response.status() + " " + response.getResult());
    }

    /**
     * 删除文档
     * @throws IOException
     */
    @Test
    public void testDeleteDoc() throws IOException {
        DeleteRequest request = new DeleteRequest("demo1222", "_doc", "10001");

        DeleteResponse response = restHighLevelClient.delete(request, RequestOptions.DEFAULT);
        System.out.println(response.status() + " " + response.getResult());
    }
}

批量操作

package com.example.springbootesdemo;

import com.example.springbootesdemo.model.User;
import net.minidev.json.JSONValue;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.io.IOException;
import java.util.ArrayList;

@SpringBootTest
public class EsBulkTest {

    @Autowired
    private RestHighLevelClient restHighLevelClient;

    @Test
    public void testBulkInsertDocs() throws IOException {
        BulkRequest request = new BulkRequest();

        // set timeout
        request.timeout("10s");

        // build docs
        ArrayList<User> users = new ArrayList<>();
        users.add(new User("aaa", 11, new String[]{"111", "222"}));
        users.add(new User("bbb", 22, new String[]{"111", "222"}));
        users.add(new User("ccc", 33, new String[]{"111", "222"}));
        users.add(new User("ddd", 44, new String[]{"111", "222"}));
        users.add(new User("eee", 55, new String[]{"111", "222"}));

        int id = 1000;
        // bulk handle
        for (User user: users) {
            request.add(new IndexRequest("demo1222")
                    .id("" + (id++))
                    .type("_doc")
                    .source(JSONValue.toJSONString(user), XContentType.JSON)
            );
        }

        BulkResponse response = restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
        System.out.println(response.hasFailures());
    }
}

条件查询与聚合

package com.example.springbootesdemo;

import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.aggregations.*;
import org.elasticsearch.search.aggregations.bucket.terms.ParsedStringTerms;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.min.MinAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.min.ParsedMin;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.io.IOException;
import java.util.List;
import java.util.Map;

@SpringBootTest
public class EsSearchTest {
    @Autowired
    private RestHighLevelClient restHighLevelClient;

    /**
     * 条件查询
     * @throws IOException
     */
    @Test
    public void testSearch() throws IOException {
        SearchRequest request = new SearchRequest();
        // add search add, can put indices
        request.indices("demo1222");
        request.types("_doc");
        // build search query
        SearchSourceBuilder builder = new SearchSourceBuilder();

        builder.size(10000);
        builder.from(0);

        builder.sort("age");

        builder.fetchSource(new String[]{"name", "age"}, new String[]{"hobbies"});

        // query all
        builder.query(QueryBuilders.matchAllQuery());
        // query terms
        builder.query(QueryBuilders.termsQuery("name", new String[]{"aaa", "bbb", "ccc"}));

        // add source
        request.source(builder);

        // send req
        SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);

        // total docs
        System.out.println("total:" + response.getHits().getTotalHits());

        // output result
        SearchHit[] results = response.getHits().getHits();
        for (SearchHit result: results) {
            System.out.println("score: " + result.getScore() + " " + result.getId());
            Map<String, Object> source = result.getSourceAsMap();
            for (Map.Entry<String, Object> s: source.entrySet()) {
                System.out.println(s.getKey() + "--" + s.getValue());
            }
        }
    }

    /**
     * 聚合查询
     * @throws IOException
     */
    @Test
    public void testAggSearch() throws IOException {
        SearchRequest request = new SearchRequest();
        // add index
        request.indices("demo1222");
        request.types("_doc");

        // build search builder
        SearchSourceBuilder builder = new SearchSourceBuilder();

        builder.size(0);

        // build agg builder
        // if string, use keyword, or use field name self
        TermsAggregationBuilder termsAggregationBuilder = AggregationBuilders.terms("indexName")
                .field("name.keyword")
                .size(10000);
        MinAggregationBuilder minAggregationBuilder = AggregationBuilders.min("minAge").field("age");
        // sub agg
        termsAggregationBuilder.subAggregation(minAggregationBuilder);

        // put agg to search builder
        builder.aggregation(termsAggregationBuilder);

        // put search builder to request
        request.source(builder);

        // send request
        SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);

        Map<String, Aggregation> stringAggMap = response.getAggregations().asMap();
        System.out.println(stringAggMap.get("indexName"));

        ParsedStringTerms stringTerms = (ParsedStringTerms) stringAggMap.get("indexName");

        List<? extends Terms.Bucket> buckets = stringTerms.getBuckets();

        // parse and get bucket key value, minAgg's value
        for (Terms.Bucket bucket: buckets) {
            String name = (String) bucket.getKey();

            Map<String, Aggregation> ageMap = bucket.getAggregations().asMap();

            ParsedMin ageAgg = (ParsedMin) ageMap.get("minAge");
            long age = (long) ageAgg.getValue();

            System.out.println(String.format("name: %s, age: %d", name, age));
        }
    }
}

参考: