【20230424】logstash生产开发总结汇总

发布时间 2023-04-25 10:41:56作者: 王双颖

logstash 生产开发总结汇总

一、基础开发

简单的启动脚本

#!/bin/sh

pjname=test1
conf_file=../conf/${pjname}.conf
data_path=../path/${pjname}
log_file=../log/${pjname}.log
rm -rf ${data_path}
rm -rf ${log_file}

nohup /opt/app/logstash/bin/logstash  -f ${conf_file} --path.data=${data_path} >> ${log_file} &

字段过滤

  • drop
if '"project":"workWeiXin"' not in [message] {
drop {}
}

解析Json嵌套

  • ruby
 filter {
  ruby {
    path => "/home/streaming/kafkaToAds/wsy/kafka2Oracle_cover.rb"
    script_params => {"message" => "%{message}"}
    remove_field => ["message"]
  }
 }

  • kafka2Oracle_cover.rb
def register(params)
  @message = params["message"]
end

def filter(event)
  require 'json'
  txt = event.get('message')
  begin
    obj1 = JSON.parse(txt)
    txt2 = obj1["CONTANT"]
    type1 = obj1['MSG_TYPE']
    type2 = obj1['MSG_SUBTYPE']
    ensure
    if type1 = 'PROD_RESERV_ORDER' and type1 ! = nil
      if type2 = 'ADD_ORDER' and type2 ! = nil
        txt2.each do |k,v|
          if v != ''
            event.set(k,v)
          end
        end
        event.set("MSG_TYPR",type1)
        event.set("MSG_SUBTYPE",type2)
        return [event]
      end
    end
    return []
  end
end

时间转换类

1、@timestamp 时区相差8小时问题

  • 下面重定向也可以将业务时间替换@timestamp
  ruby {
    code => "event.set('timestamp',event.get('@timestamp').time.localtime + 8*60*60)"
  }

  date {
    match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"] #匹配timestamp字段
    target => "@timestamp"  #将匹配到的数据写到@timestamp字段中
  }

template模板【mapping】

  • zhfx_recent.json
curl -u elastic:123456 -H "Content-Type: application/json" -XPUT node01:7029/_template/recent_tem -d '
{
  "index_patterns": ["recent*"],
  "order":0,
  "settings":{
    "number_of_shards":1,
    "number_of_replicas":15
  },
  "mappings":{
    "dynamic":"strict",
    "_source":{"enabled":true},
    "properties":{
      "tradedate":{"type":"integer"},
      "recent":{"type":"keyword"},
      "end_date":{"type":"keyword"}
    }
  }
  
}

'

二、程序开发及部署

1、conf配置文件【核心】

我将它拆分成三个部分来写 input、filter、output

input

  • 将文件读完程序结束【实际生产使用,用于读离线csv文件】
input {
   stdin{}
}
  • 读csv文件 测试使用
input {
  file {
    path => "/home/es/details_cp.csv"
    start_position => beginning
  }
}
  • 读kafka【生产实时使用】
input {
  kafka {
    bootstrap_servers => ["node01:9092,node02:9092,node03:9092"] #kafka地址
    auto_offset_reset => "latest" # earliest表示从头消费kafka数据,latest表示从当前位置开始消费
    group_id => "qw_details_topic_20220427_2357"
    consumer_threads => 3 #消费kafka线程数
    topics => ["event_topic"]
    #优化参数
    fetch_max_wait_ms => "1000"
    fetch_min_bytes => "1000000"
  }
}

filter

  • 过滤实时数据
filter {
  ruby {
    path => "/home/common/wsy/logstash_project/rb/qw_details_kafka2es.rb"
    script_params => {"message" => "%{message}"}
    remove_field => ["message"]
  }

   ruby {
      code => "event.set('time',Time.at((event.get('time').to_i/1000)).strftime('%Y-%m-%d %H:%m:%S'))"
   }

        mutate {
          copy => {
            "time"  =>  "copytime"
          }
        }

   date {
     match => ["copytime","YYYY-MM-dd HH:mm:ss"]
     target => "copytime"
   }

   ruby {
      code => "event.set('date',Time.at((event.get('copytime').to_i)).strftime('%Y%m%d'))"
   }

   mutate {
      convert => ["browse_time","integer"]
      convert => ["date","integer"]
    }
   mutate {
     remove_field => ["@timestamp","shop_id","open_id","path","shop_name","mobile","union_id","user_type","strict_dynamic_mapping_exception","host","tags","@version","$screen_height","$network_type","$os","$browser_version","$manufacturer","$os_version","$lib_version","$country","$city","$timezone_offset","$ip","title","$screen_width","platform_type","$is_first_day","studio_name","$app_id","$lib","$is_login_id","$province","$model","$latest_scene","$browser","product_name","$latest_share_url_path", "$latest_share_distinct_id", "$share_distinct_id","$latest_share_method","$is_first_time","$latest_share_depth","$scene","$share_url_path","$share_depth","$share_method","$url_path", "$url_query","copytime","$referrer","$title","$url","$app_version","$brand","$app_version","$url"]
    }

}
  • 过滤离线数据 【以处理qw_details.csv离线文件为例】
filter {
  csv {
    separator => "|$|"
  }

   ruby {
      code => "event.set('check_in_time',Time.at((event.get('check_in_time').to_i/1000)).strftime('%Y-%m-%d %H:%m:%S'))"
   }

   ruby {
      code => "event.set('check_out_time',Time.at((event.get('check_out_time').to_i/1000)).strftime('%Y-%m-%d %H:%m:%S'))"
   }

   date {
     match => ["date","YYYY-MM-dd HH:mm:ss"]
     target => "date"
   }

   ruby {
      code => "event.set('date',Time.at((event.get('date').to_i)).strftime('%Y%m%d'))"
   }

    mutate {
        convert => {"corp_id" => "string"}
        convert => {"qw_staff_id" => "string"}
        convert => {"studio_id" => "string"}
        convert => {"event" => "string"}
        convert => {"page_name" => "string"}
        convert => {"seq" => "string"}       
        convert => {"date" => "integer"}
    }
   mutate {
     remove_field => ["message","@timestamp","path","host","@version"]
    }
}

output

  • 输出到控制台【一般用于测试】
output {
   stdout { codec => rubydebug }
}
  • 输出到ES

    • 离线读csv到ES,每天传参数启动

      output {
        elasticsearch {
          hosts => ["node01:7920"]
          index => "qw_details-${date}"
          user => "elastic"
          password => "123456"
        }
      }
      
    • 实时读kafka到ES,一只运行,需要自动创建索引

      output {
        elasticsearch {
          hosts => ["node01:7920"]
          index => "qw_details-%{date}"
          user => "elastic"
          password => "123456"
        }
      }
      

2、rb过滤处理核心

需求:过滤条件

project = 'workWeiXin' 
and
(
(page_name in ('资讯','产品') and event in ('jhsd_resource_detail','$MPShare'))
or (page_name='首页' and event='$MPShare')
or event = '$MPLaunch'
)
  • kafka2es_qw_details.rb 【实时kafka数据过滤处理】
def register(params)
  @message = params["message"]
end

def filter(event)
  require 'json'
  txt = event.get('message')
  begin
    obj1 = JSON.parse(txt)
    txt2 = obj1['properties']
    type1 = obj1['event']
    type2 = obj1['properties']['page_name']
    time = obj1['time']
    type3 = obj1['project']
    ensure
    if type3 == 'workWeiXin' and(type1 == '$MPLaunch' and type1 != nil) or 
      (type1 == '$MPShare' and type2 == '首页'  and type2 != nil  and type1 != nil) or 
      ( (type1 == 'jhsd_resource_detail' or type1 == '$MPShare' ) and (type2 == '资讯' or type2 == '产品' )  and type2 != nil  and type1 != nil )
      txt2.each do |k,v|  
        if v != ''
          event.set(k,v)
          end
        end
      event.set('event',type1)
      event.set('time',time)
      return [event]
    end
    return []
    end
  end

3、积累常见问题

1、jps不出来es

问题样式:

bash: jps:command not found ...

1.要知道jps跟jdk有关,也就是跟自己安装的java相关

2.知道自己java的安装路径 whereis java

$which java
/usr/bin/java

再找到/usr/bin/java的超链接位置发现还是超链接

$ls -lrt /usr/bin/java
lrwxrwxrwx 1 root root 22 Jul 27 11:43 /usr/bin/java -> /etc/alternatives/java

再来一次,发现最终位置

$ls -lrt /etc/alternatives/java
lrwxrwxrwx 1 root root 35 Jul 27 11:43 /etc/alternatives/java -> /usr/java/jdk1.8.0_111/jre/bin/java

最后的这个jdk位置就是目前用的java的jdk位置

/usr/java/jdk1.8.0_111/(这个是我的,你用你自己的)

在.bashrc里面
加上一句
export JAVA_HOME=你的java安装路径

再生产中发现是系统本身的 我们这里将软连接改成es带的jdk

  • 在/etc/profile 中添加java 环境变量,并source /etc/profile
#修改环境变量
vim /etc/profile
export JAVA_HOME=/opt/jdk1.8.0_172
export PATH=:$PATH:$JAVA_HOME/bin
#刷新
source /etc/profile
#配置软连接
ln -s /opt/jdk1.8.0_172/bin/java java

2、日期转换为时间戳

数据类型 日志里一般都会有时间,格式如“2020-09-04 10:08:08”,怎么转成毫秒呢,格式如“1598609188959”?

    date{
         match => ["requestTimestamp","YYYY-MM-dd HH:mm:ss"]    
         target =>"requestTimestamp"
    }
    ruby{
        code => "event.set('requestTimestamp',event.get('requestTimestamp').to_i*1000)"
    }

  以上是时间格式为“YYYY-MM-dd HH:mm:ss”的情况,那么“YYYY-MM-dd HH:mm:ss SSS"的情况又如何呢?

    date{
         match => ["requestTimestamp","YYYY-MM-dd HH:mm:ss.SSS"]    
         target =>"requestTimestamp"
    }
    
    ruby{
        code => "event.set('requestTimestamp',(event.get('requestTimestamp').to_f.round(3)*1000).to_i)"
    }

截屏2022-05-01 22.51.24

3、将时间戳转换为日期

     ruby {
      code => "event.set('check_in_time',Time.at((event.get('check_in_time').to_i/1000)).strftime('%Y-%m-%d %H:%m:%S'))"
   }

4、字段切割

  • 切割字段 tradedate:20211105 ,取后四位:2021
grok {
		match => {"tradedate" => "(?><tradedate_year>(?<=....)(.{4}))"}
}
  • 切割字段 tradedate:20211105 ,取后两位位:05
grok {
		match => {"tradedate" => "(?><tradedate_year>(?<=......)(.{2}))"}
}
  • 切割字段 tradedate:20211105 ,取后前6位位:202111
mutate {
  gsub => ["tradedate","(?<=......)(.*)",""]
}

5、logstash 插件离线安装

执行下面命令进行安装。
./bin/logstash-plugin install file:///root/logstash-output-jdbc.zip

6、关于@timestamp 时差相差8小时问题

  ruby{
    code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)"
  }
  ruby{
    code => "event.set('@timestamp',event.get('timestamp'))"
  }
  mutate {
    remove_field => ["timestamp"]
  }

7、shell中截取字段

date=$1
var=$date
echo $date
date=${var:0:6}
echo $date

8、效率问题

先改完这个drop之后,你把这个rb都去掉,全部改成其他标准的filter组件。

你这个场景,要的数据,连万分之一都不到。

要先分析数据的

在过滤条件中添加

if  "workWeiXin" not in [message] {
	drop {}
}

9、数据有问题需要补当天数据

1、消费kafka获取满足当天条件的数据数据

/opt/app/kafka/bin/kafka-console-consumer.sh --topic qw_details_topic --bootstrap-server node01:9092,node02:9092,node03:9092 --from-beginning |grep --line-buffered '"project":"workWeiXin"' > wsy.json
/opt/app/kafka/bin/kafka-console-consumer.sh --topic qw_details_topic --bootstrap-server node01:9092,node02:9092,node03:9092 --from-beginning |grep --line-buffered '16496'   >wsy_bs_1.json

2、读消费的文件补数,我数据加了document_id ,直接覆盖补数

input {
  file {
    path => "/home/es/details_cp.csv"
    start_position => beginning
  }
}

三、脚本及调度

1、启动脚本

  • 测试文件是否正确
/home/common/wsy/logstash-7.8.1/bin/logstash -f /home/common/wsy/logstash_project/conf/qw_details_kafka2es.conf  --config.test_and_exit
实时启动脚本

需求:实时一张表一个启动脚本,离线3张数据导入一个启动脚本

实时明细表(qw_details)启动脚本

  • 明细表(qw_details)【从kafka读】

脚本名称:qw_details_kafka_start.sh

执行:./qw_details_kafka_start.sh

/home/common/wsy/logstash-7.8.1/bin/logstash -f /home/common/wsy/logstash_project/conf/qw_details_kafka2es.conf --path.data=/home/common/wsy/logstash_project/path/qw_details_kafka2es  > /home/common/wsy/logstash_project/log/qw_details_kafka2es.out 2>&1 &
离线启动脚本
  • 明细表(qw_details)、全量汇总表(qw_agg)、折线图-天(qw_line_day)、折线图-昨日时段(qw_line_lastday)【读csv文件】
  • 需求:需要做统一传参

脚本名称:qw_csv_start.sh

执行:./qw_csv_start.sh index date

  • index
    • qw_details
    • qw_agg
    • qw_line_day
    • qw_line_lastday
  • date 具体的日期

例如:./qw_csv_start.sh qw_agg 20220428

index=$1
date=$2
export date=$date
filepath="/home/common/wsy/data/${index}.csv"

rm -rf /home/common/wsy/logstash_project/path/${index}_csv/


/home/common/wsy/logstash-7.8.1/bin/logstash -f /home/common/wsy/logstash_project/conf/${index}_csv.conf    < $filepath   --path.data=/home/common/wsy/logstash_project/path/${index}_csv > /home/common/wsy/logstash_project/log/${index}_csv.out 2>&1 &

2、mapping模版脚本

样例:添加索引模版

脚本名称:qw_agg_mapping.json

权限:chmod 755 qw_agg_mapping.json

部署地址:/home/common/wsy/logstash_project/mapping

执行方式:$ ./qw_agg_mapping.json

curl -u elastic:123456 -H "Content-Type: application/json" -XPUT node01:7920/_template/qw_agg_tem -d '
{
  "index_patterns" : {"qw_agg*"},
  "order" : 1,
  "settings" : {
  		"number_of_shards" : 1,
  		"number_of_replicas": 1
  },
  "mappings" : {
  		"dynamic" : "strict",
  		"_source" : {"enabled" : true},
  		"properties" : {
  		  "corp_id": {"type": "keyword"}
  		}
  }
}
'

3、别名调度脚本

方案一

离线表导入需要每天传参调度修改别名

全量汇总表(qw_agg)

折线图-天(qw_line_day)

折线图-昨日时段(qw_line_lastday)

样例:添加和删除别名

删除将 add 改为 remove

脚本名称:qw_details_alias_add.json

部署地址:/home/common/wsy/logstash_project/alias

权限:chmod 755 qw_details_alias_add.json

执行方式:$ ./qw_details_alias_add.json 20220428

date=$1
echo "The date is ${date} "
export date=$date

curl -u elastic:123456 -H "Content-Type: application/json" -XPOST node01:7920/_aliases -d '
{
  "actions": [
    {
      "add": {
        "index": "qw_details-'${date}'",
       "alias": "qw_details"
      }
    }
  ]
}
'
方案二【最终版本】

领导要求将四个别名写成一个脚本,传入索引和日期

脚本名称:qiwei-index.sh

修改方式:qiwei-index.sh mode index date

  • mode
    • add 「添加别名」
    • remove 「删除别名」
  • index
    • qw_details
    • qw_agg
    • qw_line_day
    • qw_line_lastday
  • date 具体的日期

例如: ./qiwei-index.sh remove qw_details 20220428

部署地址:/home/common/wsy/logstash_project/alias

mode=$1
index=$2
date=$3
curl -u elastic:123456 -H "Content-Type: application/json" -XPOST node01:7920/_aliases -d '
{
  "actions": [
    {
      "'${mode}'": {
        "index": "‘${index}’-'${date}'",
       "alias": "‘${index}’"
      }
    }
  ]
}
'

4、索引生命周期脚本

https://www.cnblogs.com/wei325/p/16101258.html

https://blog.csdn.net/wangshuminjava/article/details/106430552

参考案例一

#/bin/bash

#设置定时任务(脚本路径需自行修改)
#crontab -e
#0 1 * * * /bin/bash /opt/data/zabbixclean.sh

#指定日期(7天前)
time=7
#指定IP端口
IP=XXXXXXX
#指定用户名密码
PASSWORD=XXXXXXXXX
#指定日志文件
log=/opt/data/zabbixclean.log

top=`curl -s  -XGET "http://$PASSWORD$IP/_cat/indices/?v"|grep uint|wc -l`

echo `date` >>$log
for((time=$time;$time<$top;time++));
do
date=`date -d "$time days ago" +%Y-%m-%d`
curl -H "Content-Type:application/json" -XDELETE http://$PASSWORD$IP/uint_$date   &>/dev/null
curl -H "Content-Type:application/json" -XDELETE http://$PASSWORD$IP/dbl_$date    &>/dev/null
curl -H "Content-Type:application/json" -XDELETE http://$PASSWORD$IP/log_$date    &>/dev/null
curl -H "Content-Type:application/json" -XDELETE http://$PASSWORD$IP/text_$date   &>/dev/null
curl -H "Content-Type:application/json" -XDELETE http://$PASSWORD$IP/str_$date    &>/dev/null

if [ $? -eq 0 ];then

  echo "The $date index clean completed" >>$log

fi
done

参考案例二

#/bin/bash
#指定日期(7天前)
DATA=`date -d "1 week ago" +%Y%m%d`

echo "开始清理  $DATA 索引"

#当前日期
time=`date`

#删除7天前的日志
curl --user account:pwd -XGET "http://your_ip:9200/_cat/indices/?v"|grep $DATA
if [ $? == 0 ];then
  curl --user account:pwd -XDELETE "http://your_ip:9200/*_${DATA}"
  echo "于 $time 清理 $DATA 索引!"
fi

这个脚本还是很简单的,我来介绍下:

1、第一句是脚本的开头,这个就不说了

2、首先确定你需要删除哪个时间以前的索引,我这里是删除的一周也就是7天前的,所以我这里DATA取的是当前时间一周以前的时间。

3、下面两句是为了打印,可以写也可以不写

4、查询匹配一周前的这个索引是否存在,这里需要注意添上你es的账号,密码,和部署es的机器ip,如果你没有设置es的账号和密码可以不加。

5、如果这个索引存在,那么就执行删除这个索引的操作,并进行打印,最后输出删除结果

最后你把这个脚本保存好,再设置一个定时脚本,定时脚本设置如下

0 1 * * * sh /data/shscript/ES-index-clear.sh > /data/shscript/log/es-index-clear.log

我这里是每天早上1点执行这个删除脚本,然后将执行的输出结果输出到日志文件中,就行了。

测试一

#/bin/bash

curl -u elastic:123456 -H "Content-Type:application/json" -XDELETE http://node01:7920/qw_details_`date -d "7 days ago" +%Y-%m-%d`   &>/dev/null


测试二

脚本名称:Index_lifecycle.sh

#/bin/bash
#指定日期(7天前)
DATA=`date -d "7 day ago" +%Y%m%d`
log=/opt/project/qiWei_project/kafka2ES/log/Index_lifecycle.log
echo "开始清理  $DATA 索引" >>$log

#当前日期
time=`date`

#删除7天前的日志
curl --user elastic:123456 -H "Content-Type:application/json" -XGET "http://node01:7920/_cat/indices/?v"|grep $DATA
if [ $? == 0 ];then
  curl --user elastic:123456  -H "Content-Type:application/json" -XDELETE http://node01:7920/qw_details-$DATA
  echo "于 $time 清理 qw_details-$DATA 索引!" >>$log
fi

生产

#/bin/bash

#当前日期
time=`date`

#指定日期(7天前)
DATA_qw_details=`date -d "7 ady ago" +%Y%m%d`
#删除7天前的日志
curl -u elastic:123456 -H "Content-Type: application/json"  -XGET "http://node01:7920/_cat/indices/?v"|grep $DATA_qw_details
if [ $? == 0 ];then
echo "开始清理  qw_details-$DATA_qw_details 索引"
  curl -u elastic:123456 -H "Content-Type: application/json" -XDELETE http://node01:7920/qw_details-$DATA_qw_details
  echo "于 $time 清理 $DATA_qw_details 索引!"
fi


DATA_qw_agg=`date -d "7 ady ago" +%Y%m%d`

#删除7天前的日志
curl -u elastic:123456 -H "Content-Type: application/json"  -XGET "http://node01:7920/_cat/indices/?v"|grep $DATA_qw_agg
if [ $? == 0 ];then
echo "开始清理  qw_details-$DATA_qw_agg 索引"
  curl -u elastic:123456 -H "Content-Type: application/json" -XDELETE http://node01:7920/qw_details-$DATA_qw_agg
  echo "于 $time 清理 $DATA_qw_agg 索引!"
fi

DATA_qw_line_day=`date -d "7 ady ago" +%Y%m%d`

#删除7天前的日志
curl -u elastic:123456 -H "Content-Type: application/json"  -XGET "http://node01:7920/_cat/indices/?v"|grep $DATA_qw_line_day
if [ $? == 0 ];then
echo "开始清理  qw_details-$DATA_qw_line_day 索引"
  curl -u elastic:123456 -H "Content-Type: application/json" -XDELETE http://node01:7920/qw_details-$DATA_qw_line_day
  echo "于 $time 清理 $DATA_qw_line_day 索引!"
fi

DATA_qw_line_lastday=`date -d "7 ady ago" +%Y%m%d`

#删除7天前的日志
curl -u elastic:123456 -H "Content-Type: application/json"  -XGET "http://node01:7920/_cat/indices/?v"|grep $DATA_qw_line_lastday
if [ $? == 0 ];then
echo "开始清理  qw_details-$DATA_qw_line_lastday 索引"
  curl -u elastic:123456 -H "Content-Type: application/json" -XDELETE http://node01:7920/qw_details-$DATA_qw_line_lastday
  echo "于 $time 清理 $DATA_qw_line_lastday 索引!"
fi


  • 执行
0 1 * * * sh /home/common/wsy/logstash_project/shell/Index_lifecycle.sh > /home/common/wsy/logstash_project/log/Index_lifecycle.log

5、调度执行参考

4351651820601_.pic

服务器:105.219/common

程序部署目录:

/home/common/wsy/logstash_project

程序启动

  • 实时
bin/qw_details_kafka_start.sh
  • 离线
bin/qw_csv_start.sh 索引名 日期
例子:
/home/common/wsy/logstash_project/bin/qw_csv_start.sh qw_agg 20220504

别名

实时离线可以统一更改

脚本名称:qiwei-index.sh

修改方式:qiwei-index.sh  mode index date

- mode
  - add 「添加别名」
  - remove 「删除别名」
- index
  - qw_details
  - qw_agg
  - qw_line_day
  - qw_line_lastday
- date 具体的日期

例如: ./qiwei-index.sh remove qw_details 20220428

部署地址:/home/common/wsy/logstash_project/alias

6、需求脚本修改

根据需求,还要对上面的脚本进行修改

1、别名,需要一个脚本启动所有的改别名的程序,调度只需要传日期,并且删除生命周期外的别名

2、启动,一个脚本启动所有的离线启动脚本,删除旧索引

原来 qw_csv_start.sh

index=$1
date=$2
export date=$date
filepath="/home/common/wsy/data/${index}.csv"

rm -rf /home/common/wsy/logstash_project/path/${index}_csv/
/home/common/wsy/logstash-7.8.1/bin/logstash -f /home/common/wsy/logstash_project/conf/${index}_csv.conf    < $filepath   --path.data=/home/common/wsy/logstash_project/path/${index}_csv > /home/common/wsy/logstash_project/log/${index}_csv.out 2>&1 &


更改

传入月时间

qiwei_csv_start.sh 202205

#!/bin/bash
cd /home/common/wsy/logstash_project/bin
date=$1
sh qw_csv_start.sh  qw_details $date
sh qw_csv_start.sh  qw_agg $date
sh qw_csv_start.sh  qw_line $date
sh qw_csv_start.sh  qw_line_lastdat $date

添加 document_id

8871651754768_.pic

1、重定向去掉,添加 echo $?

2、蒋修改别名和删除历史索引写入一个脚本,统一传参

四、常用查询命令

1、查看索引

curl -u elastic:sjjsZS9470_201 -XGET http://10.189.145.201:7920/_cat/indices?v

2、查看单个索引

curl -u elastic:sjjsZS9470_201 -XGET http://10.189.145.201:7920/索引名/_search

3、查看索引mapping

curl -u elastic:sjjsZS9470_201 -XGET http://10.189.145.201:7920/索引名/_mapping?pretty

4、查看单个索引别名

http://node01:7920/qw_details-20220411/_alias/*