ES7.3版本,批量添加,索引创建,索引判断

发布时间 2024-01-10 09:24:01作者: 挽留匆匆的美丽
import com.link.risk.model.RiskTradeDetail;
import com.link.util.BeanBuilder;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.*;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
 *
 * @className Es工具类
 * @author ysq
 * @date 2018-08-29
 */
public class EsUtils {

    protected static Logger logger = LoggerFactory.getLogger(EsUtils.class);

    private RestHighLevelClient restHighLevelClient;

    public EsUtils(RestHighLevelClient restHighLevelClient){
        this.restHighLevelClient = restHighLevelClient;
    }


    /**
     * 判断索引是否存在
     * @param restHighLevelClient
     * @param esIndex
     * @return true表示存在,false表示不存在
     */
    public Boolean checkIndexExists(String esIndex){
        try {
            return restHighLevelClient.indices().exists(new GetIndexRequest(esIndex), RequestOptions.DEFAULT);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return false;
    }

    /**
     * ES 批量插入工具方法
     * @param restHighLevelClient
     * @return BulkProcessor
     */
    public BulkProcessor createBulkProcessor() {

        BulkProcessor.Listener listener = new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long executionId, BulkRequest request) {
                logger.info("【beforeBulk】批次[{}] 携带 {} 请求数量", executionId, request.numberOfActions());
            }

            @Override
            public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
                if (!response.hasFailures()) {
                    logger.info("【afterBulk-成功】批量 [{}] 完成在 {} ms", executionId, response.getTook().getMillis());
                } else {
                    BulkItemResponse[] items = response.getItems();
                    for (BulkItemResponse item : items) {
                        if (item.isFailed()) {
                            logger.info("afterBulk-失败】批量 [{}] 出现异常的原因 : {}", executionId, item.getFailureMessage());
                            break;
                        }
                    }
                }
            }

            @Override
            public void afterBulk(long executionId, BulkRequest request,Throwable failure) {
                List<DocWriteRequest<?>> requests = request.requests();
                List<String> esIds = requests.stream().map(DocWriteRequest::id).collect(Collectors.toList());
                logger.error("【afterBulk-failure失败】es执行bluk失败,失败的esId为:{}", esIds, failure);
            }
        };

        BulkProcessor.Builder builder = BulkProcessor.builder(((bulkRequest, bulkResponseActionListener) -> {
            restHighLevelClient.bulkAsync(bulkRequest, RequestOptions.DEFAULT, bulkResponseActionListener);
        }), listener);
        //到达10000条时刷新
        builder.setBulkActions(10000);
        //内存到达8M时刷新
        builder.setBulkSize(new ByteSizeValue(8L, ByteSizeUnit.MB));
        //设置的刷新间隔10s
        builder.setFlushInterval(TimeValue.timeValueSeconds(10));
        //设置允许执行的并发请求数。
        builder.setConcurrentRequests(8);
        //设置重试策略
        builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1), 3));
        return builder.build();
    }

    /**
     * 批量添加数据
     * @param list
     * @param esIndex
     * @param <T>
     * @return
     * @throws Exception
     */
    public <T> Integer instBulkList(List<T> list,String esIndex) throws Exception {

        //确认索引存在开始写入数据
        if(this.checkIndexExists(esIndex)){
            List<IndexRequest> indexRequests = new ArrayList<>();
            try {
                for (T t : list) {

                    Class<?> clazz = t.getClass();

                    Field nameField = clazz.getDeclaredField("id");
                    // 设置name字段可访问
                    nameField.setAccessible(true);
                    // 获取name字段的值
                    String idValue = (String) nameField.get(t);

                    if(StringUtils.isBlank(idValue)){
                        throw new Exception("写入文档的值必须指定id");
                    }

                    IndexRequest request = new IndexRequest();
                    Map<String,Object> map = com.link.util.BeanBuilder.beanToMap(t);
                    request.id(idValue); //文档唯一id
                    request.index(esIndex);
                    request.source(map);
                    indexRequests.add(request);
                }
                indexRequests.forEach(this.createBulkProcessor()::add);
            }catch (Exception e){
                logger.info("--------交易数据添加异常-------------");
                logger.info(e.getMessage());
                logger.debug("交易数据添加异常,错误:{}",e.getMessage());
                throw new Exception(e.getMessage());
            }
            return list.size();
        }
        return 0;
    }

    /**
     * 创建索引
     * @param esIndex 索引名称
     * @param properties 文档json
     * @return
     * @throws IOException
     */
    public boolean careteIndex(String esIndex,String properties) throws IOException {
        CreateIndexRequest request = new CreateIndexRequest(esIndex);

        //设置分片和副本数
        request.settings(Settings.builder()
                .put("index.number_of_shards", 5)
                .put("index.number_of_replicas", 1)
                .put("index.codec","best_compression"));

        request.mapping(properties, XContentType.JSON);

        CreateIndexResponse response = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);

        boolean acknowledged = response.isAcknowledged();
        boolean shardsAcknowledged = response.isShardsAcknowledged();

        if(acknowledged && shardsAcknowledged){
            logger.info("交易索引创建成功,acknowledged:{},shardsAcknowledged:{}",acknowledged,shardsAcknowledged);
            return true;
        }else{
            logger.info("交易索引创建失败,acknowledged:{},shardsAcknowledged:{}",acknowledged,shardsAcknowledged);
            return false;
        }
    }
}