接上篇:如何在项目中实现ES查询功能?

发布时间 2023-07-10 15:40:09作者: 码农小凡

大家好,之前我们教大家如何将 MySQL 数据同步到 ES。

这篇文章在技术派项目中实现 ES 查询功能。

不多说上文章目录:

 

01 背景

在 SpringBoot 整合 ES 中,有两种常见方法,一种是 ElasticsearchRestTemplate,另一种是 RestHighLevelClient。

ElasticsearchRestTemplate 是 ES 基于 Spring 集成用法,但是用法比较单一,有些复杂查询无法完成;另一种 RestHighLevelClient 是 ES 原生客户端,适合各种场景查询。

综合考虑下来技术派采用后者 ES 原生客户端 RestHighLevelClient。

02 构造客户端

2.1 引入依赖

在 paicoding-service 模块中的 pom 文件中引入 es 和 es-client 相关依赖。

<!--引入es-high-level-client相关依赖  start-->
<dependency>
  <groupId>org.elasticsearch</groupId>
  <artifactId>elasticsearch</artifactId>
  <version>6.8.2</version>
</dependency>

<dependency>
  <groupId>org.elasticsearch.client</groupId>
  <artifactId>elasticsearch-rest-client</artifactId>
  <version>6.8.2</version>
</dependency>
<dependency>
  <groupId>org.elasticsearch.client</groupId>
  <artifactId>elasticsearch-rest-high-level-client</artifactId>
  <version>6.8.2</version>
</dependency>

2.2 配置文件

在 paicoding-web 模块中 src->main->resource-env->dev->application-dal.yml 中配置 ES 相关配置,配置如下所示:

# elasticsearch配置
elasticsearch:
  # 是否开启ES?本地启动如果没有安装ES,可以设置为false关闭ES
  open: true
  # es集群名称
  clusterName: elasticsearch
  hosts: 127.0.0.1:9200
  userName: elastic
  password: elastic
  # es 请求方式
  scheme: http
  # es 连接超时时间
  connectTimeOut: 1000
  # es socket 连接超时时间
  socketTimeOut: 30000
  # es 请求超时时间
  connectionRequestTimeOut: 500
  # es 最大连接数
  maxConnectNum: 100
  # es 每个路由的最大连接数
  maxConnectNumPerRoute: 100

注意 yml 格式,千万别出错。

 

配置的具体介绍我就不再重复介绍了,在 yml 的注解中已经详细的标注了,请小伙伴们详细看看哦。

2.3 配置类

配置类其实也就是交由 Spring 去创建 RestHighLevelClient 客户端,配置类代码如下所示:

package com.github.paicoding.forum.service.config.es;

import lombok.Data;
import lombok.extern.slf4j.Slf4j;

import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * es配置类
 *
 * @author ygl
 * @since 2023-05-25
 **/
@Slf4j
@Data
@Configuration
@ConfigurationProperties(prefix = "elasticsearch")
public class ElasticsearchConfig {

    // 是否开启ES
    private Boolean open;

    // es host ip 地址(集群)
    private String hosts;

    // es用户名
    private String userName;

    // es密码
    private String password;

    // es 请求方式
    private String scheme;

    // es集群名称
    private String clusterName;

    // es 连接超时时间
    private int connectTimeOut;

    // es socket 连接超时时间
    private int socketTimeOut;

    // es 请求超时时间
    private int connectionRequestTimeOut;

    // es 最大连接数
    private int maxConnectNum;

    // es 每个路由的最大连接数
    private int maxConnectNumPerRoute;


    /**
     * 如果@Bean没有指定bean的名称,那么这个bean的名称就是方法名
     */
    @Bean(name = "restHighLevelClient")
    public RestHighLevelClient restHighLevelClient() {

        // 此处为单节点es
        String host = hosts.split(":")[0];
        String port = hosts.split(":")[1];
        HttpHost httpHost = new HttpHost(host, Integer.parseInt(port));

        // 构建连接对象
        RestClientBuilder builder = RestClient.builder(httpHost);

        // 设置用户名、密码
        CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(userName, password));

        // 连接延时配置
        builder.setRequestConfigCallback(requestConfigBuilder -> {
            requestConfigBuilder.setConnectTimeout(connectTimeOut);
            requestConfigBuilder.setSocketTimeout(socketTimeOut);
            requestConfigBuilder.setConnectionRequestTimeout(connectionRequestTimeOut);
            return requestConfigBuilder;
        });
        // 连接数配置
        builder.setHttpClientConfigCallback(httpClientBuilder -> {
            httpClientBuilder.setMaxConnTotal(maxConnectNum);
            httpClientBuilder.setMaxConnPerRoute(maxConnectNumPerRoute);
            httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
            return httpClientBuilder;
        });

        return new RestHighLevelClient(builder);
    }
}

其实配置类中也就是构造 RestHighLevelClient 客户端。注意是放在 paicoding-service 模块中。

 

注意:在 yml 中的 elasticsearch.hosts 中千万不要加 https 前缀,只需要 127.0.0.1:9200 即可(我在这里翻车过);否则在配置类中 new HttpHost() 会出错。

03 查询 ES

在整合之前我先来说下大致逻辑:

  • 根据查询条件去ES中查询出数据;
  • 从查询出所有数据中取出 id,然后将 id 去 add 进 ids 中;
  • 拿到 ids 后去 MySQL 数据库中捞数据。

这一套流程下来,要比根据查询条件进行 like% 值 % 不走索引查询效率性能好的多。

然后开始编写实现代码,代码实现及过程如下所示:

 

04 兼容 MySQL

考虑到有的小伙伴没有安装 ES 和 Canal,为了让代码能够查询首页时还是走 MySQL 数据库,我这边特意做了兼容,即使没有安装 ES 等也是可以正常运行。

首先我在 yml 中增加了是否开启 ES 值。

 

然后在 Service 层将 open 值注入。

 

最后在代码层面业务层实现。

 

05 代码仓库

为了方便大家学习功能演变的过程,每个模块都会单独开个分支,代码分支、代码 Diff 如下:

  • 代码分支:feature/mysql_es_20230523
  • 代码 Diff:https://github.com/itwanger/paicoding/pull/51/files

建议大家先看代码 Diff,这样你就知道所有的改动,非常便于自己学习。

 

06 总结

我们再回顾一下整体执行流程:

 

写到这里,就结束了,是不是满满的干货呢?这两篇文章,从原理到实践,手把手教你如何将 MySQL 同步到 ES,并应用到实际场景中,如果你的项目也需要用到该场景,基本可以直接照搬。