使用SQL查询elasticsearch

发布时间 2023-12-06 12:55:31作者: 伊丽莎白菜

elasticsearch Query DSL 太难写了,所以我放弃啦?

SQL REST API

kibana DevTools

直接POST /_sql端点即可调试。示例:

POST /_sql?format=csv
{
  "query": "SELECT * from \"portal-page-view-*\" LIMIT 3",
  "time_zone": "Asia/Shanghai"
}

Java

借助于org.elasticsearch.client.RestClient,即可请求es rest端点,但响应报文需要自己处理?。示例:

import cn.hutool.core.text.csv.CsvUtil;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.RandomUtil;
import com.alibaba.fastjson.JSON;
import java.io.IOException;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Date;
import java.util.List;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

public class ESTests {

  private static final String PAGE_VIEW_INDEX_PREFIX = "portal-page-view-";
  private static final String ES_DATETIME_PATTERN = "yyyy-MM-dd'T'HH:mm:ss.SSSZ";
  private static final DateTimeFormatter INDEX_SUFFIX_DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyy.MM.dd");
  private static final String JSON_ENTITY_TEMPLATE = "{\"query\":\"%s\",\"time_zone\":\"Asia/Shanghai\"}";
  private static RestHighLevelClient restHighLevelClient;

  @BeforeAll
  static void setUp() {
    String esHost = "127.0.0.1";
    int esPort = 9200;
    String esUsername = "user";
    String esPassword = "password";

    final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
    credentialsProvider.setCredentials(AuthScope.ANY,
            new UsernamePasswordCredentials(esUsername, esPassword));
    RestClientBuilder restClientBuilder = RestClient.builder(new HttpHost(esHost, esPort))
            .setHttpClientConfigCallback(httpClientBuilder -> {
              httpClientBuilder.disableAuthCaching();
              return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
            });
    restHighLevelClient = new RestHighLevelClient(restClientBuilder);
  }

  @Test
  void initData() {
    LocalDateTime now = LocalDateTime.now();
    int sum = 0;
    for(int i = 0; i < 200; i++) {
      int count = RandomUtil.randomInt(3, 20);
      LocalDateTime dateTime = now.minusDays(i);
      Date date = Date.from(dateTime.atZone(ZoneId.systemDefault()).toInstant());
      for(int j = 0; j < count; j++) {
        PageView pageView = new PageView(IdUtil.fastUUID(), date);
        IndexRequest indexRequest = new IndexRequest(PAGE_VIEW_INDEX_PREFIX + dateTime.format(INDEX_SUFFIX_DATE_FORMATTER));
        indexRequest.source(JSON.toJSONStringWithDateFormat(pageView, ES_DATETIME_PATTERN), XContentType.JSON);
        Assertions.assertDoesNotThrow(() -> restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT));
        System.out.println("插入数据成功,序号:" + sum++);
      }
    }
  }

  @Test
  void testSelect() throws IOException {
    String sql = "SELECT id,createTime FROM \\\"portal-page-view-*\\\" WHERE createTime > CURRENT_DATE";
    List<PageView> pageViewList = queryForList(String.format(JSON_ENTITY_TEMPLATE, sql), PageView.class);
    Assertions.assertFalse(pageViewList.isEmpty());
    Assertions.assertNotNull(pageViewList.stream().findAny().map(PageView::getId).get());
    Assertions.assertNotNull(pageViewList.stream().findAny().map(PageView::getCreateTime).get());
  }


  private <T> List<T> queryForList(String jsonEntity, Class<T> clazz) throws IOException {
    String responseBody = postForResponseBody(jsonEntity);
    return CsvUtil.getReader().read(responseBody, clazz);
  }

  private String postForResponseBody(String jsonEntity) throws IOException {
    Request request = new Request("POST", "/_sql");
    request.addParameter("format", "csv");
    request.setJsonEntity(jsonEntity);
    Response response = restHighLevelClient.getLowLevelClient().performRequest(request);
    return EntityUtils.toString(response.getEntity());
  }

  /**
   * 实体示例
   */
  static class PageView {
    private String id;
    private Date createTime;

    public PageView(String id, Date createTime) {
      this.id = id;
      this.createTime = createTime;
    }

    public PageView() {
    }

    public String getId() {
      return id;
    }

    public void setId(String id) {
      this.id = id;
    }

    public Date getCreateTime() {
      return createTime;
    }

    public void setCreateTime(Date createTime) {
      this.createTime = createTime;
    }
  }

}

其他

  1. elaticsearch SQL支持JDBC(驱动包org.elasticsearch.plugin:x-pack-sql-jdbc),是不是很牛很方便,但免费版的es不支持,需要付费?;
  2. SQL查询查不到文档_id,只能查到_source内容,可以通过_source内添加额外唯一标识解决;
  3. 通过/_sql/translate端点,可以将SQL查询翻译为普通的Query DSL,参考: SQL Translate API