logstash同步es数据(目的端先创建好mapping)

发布时间 2023-10-26 11:19:12作者: slnngk

环境:
OS:Centos 7
ES(源端和目标端):6.8.5
logstash:6.8.5

说明:
1.logstash版本尽量与es版本保持一致
2.我们这里先在目的端创建好mapping(表结构),因为不提前创建好mapping的话,logstash同步的时候根据数据情况自动推断数据字段类型定义字段类型,
创建的mapping与原来的可能不一致,比如将源端的short类型同步后变成了long类型,个别情况会把text类型变成long类型,导致最后的同步会失败.

 

1.查看源端的index

[elasticsearch@localhost bin]$ curl -u elastic:elastic -X GET "192.168.1.108:19200/_cat/indices?v"
health status index           uuid                   pri rep docs.count docs.deleted store.size pri.store.size
green  open   .security-6     TBWap6LdRY6ALA4vDIagew   1   0          6            0     19.5kb         19.5kb
yellow open   app_message_all C04l_PP1RQyFPNBSf1cjjg   5   1    1000000            0    766.2mb        766.2mb

 

2.获取源端index的mapping

[elasticsearch@localhost bin]$ curl -u elastic:elastic -H "Content-Type: application/json" -XGET "http://192.168.1.108:19200/app_message_all/_mappings?pretty=true"
{
  "app_message_all" : {
    "mappings" : {
      "_doc" : {
        "properties" : {
          "create_time" : {
            "type" : "text",
            "fields" : {
              "keyword" : {
                "type" : "keyword",
                "ignore_above" : 256
              }
            }
          },
          "deleted" : {
            "type" : "long"
          },
          "extra" : {
            "type" : "text",
            "fields" : {
              "keyword" : {
                "type" : "keyword",
                "ignore_above" : 256
              }
            }
          },
          "massive_type" : {
            "type" : "long"
          },
          "message" : {
            "type" : "text",
            "fields" : {
              "keyword" : {
                "type" : "keyword",
                "ignore_above" : 256
              }
            }
          },
          "message_id" : {
            "type" : "long"
          },
          "message_type" : {
            "type" : "long"
          },
          "send_date" : {
            "type" : "text",
            "fields" : {
              "keyword" : {
                "type" : "keyword",
                "ignore_above" : 256
              }
            }
          },
          "sender_seq_no" : {
            "type" : "text",
            "fields" : {
              "keyword" : {
                "type" : "keyword",
                "ignore_above" : 256
              }
            }
          },
          "status" : {
            "type" : "long"
          },
          "title" : {
            "type" : "text",
            "fields" : {
              "keyword" : {
                "type" : "keyword",
                "ignore_above" : 256
              }
            }
          },
          "user_id" : {
            "type" : "long"
          }
        }
      }
    }
  }
}

 


这里index名称叫:app_message_all
type叫:_doc
说明:7.0之前是可以自定义type名称的,7.0之后的版本不能自定义type名称了,全部统一是_doc

 

3.目标端创建index和mapping
首先创建一个空的index(没有任何字段信息)

[elasticsearch@localhost bin]$curl -u elastic:elastic123 -X PUT "192.168.1.109:19200/app_message_all?pretty" -H 'Content-Type: application/json' -d'
{}
'

 

查询这个时候是没有任何的mapping信息的
[elasticsearch@localhost bin]$curl -u elastic:elastic123 -H "Content-Type: application/json" -XGET "http://192.168.1.109:19200/app_message_all/_mappings?pretty=true"

 

添加mapping信息,这里只需要将源端的properties对应的各项拷贝过来即可,注意{}个数要一致,结尾的4个}去掉
这里需要指定type=_doc需要与源端一致,6版本的type是可以自定义的

curl -u elastic:elastic123 -H 'Content-Type: application/json' -XPOST "http://192.168.1.109:19200/app_message_all/_doc/_mapping?pretty" -d ' 
{
        "properties" : {
          "create_time" : {
            "type" : "text",
            "fields" : {
              "keyword" : {
                "type" : "keyword",
                "ignore_above" : 256
              }
            }
          },
          "deleted" : {
            "type" : "long"
          },
          "extra" : {
            "type" : "text",
            "fields" : {
              "keyword" : {
                "type" : "keyword",
                "ignore_above" : 256
              }
            }
          },
          "massive_type" : {
            "type" : "long"
          },
          "message" : {
            "type" : "text",
            "fields" : {
              "keyword" : {
                "type" : "keyword",
                "ignore_above" : 256
              }
            }
          },
          "message_id" : {
            "type" : "long"
          },
          "message_type" : {
            "type" : "long"
          },
          "send_date" : {
            "type" : "text",
            "fields" : {
              "keyword" : {
                "type" : "keyword",
                "ignore_above" : 256
              }
            }
          },
          "sender_seq_no" : {
            "type" : "text",
            "fields" : {
              "keyword" : {
                "type" : "keyword",
                "ignore_above" : 256
              }
            }
          },
          "status" : {
            "type" : "long"
          },
          "title" : {
            "type" : "text",
            "fields" : {
              "keyword" : {
                "type" : "keyword",
                "ignore_above" : 256
              }
            }
          },
          "user_id" : {
            "type" : "long"
          }
        }
}'

 

这个时候可以比较下源端和目的端的mapping是否一致

源端
curl -u elastic:elastic -H "Content-Type: application/json" -XGET "http://192.168.1.108:19200/app_message_all/_mappings?pretty=true"

目的端
curl -u elastic:elastic123 -H "Content-Type: application/json" -XGET "http://192.168.1.109:19200/app_message_all/_mappings?pretty=true"

 

4.logstach配置文件

[root@localhost config]# more sync_all_index.conf 
input {
    elasticsearch {
        hosts => ["http://192.168.1.108:19200"]
        index => "*,-.monitoring*,-.security*,-.kibana*" ##我这里去掉了系统的index,目前我这里只有app_message_all,所以使用了*,也可以具体指定index名称的
        user => "elastic"
        password => "elastic"
        size => 1000
        scroll => "1m"
        docinfo => true
    }
}
# 该部分被注释,表示filter是可选的
filter {
  mutate {
    remove_field => ["@timestamp", "@version"]  #过滤掉logstash 自己加上的字段
  }
}

output {
    elasticsearch {
        hosts => ["http://192.168.1.109:19200"]
        user => "elastic"
        password => "elastic123"
        index => "%{[@metadata][_index]}"
        document_type => "%{[@metadata][_type]}"
        document_id => "%{[@metadata][_id]}"
    }
}

 

5.同步
[root@localhost config]# /opt/logstash-6.8.5/bin/logstash -f /opt/logstash-6.8.5/config/sync_all_index.conf

 

验证:
1.发现使用logstash同步过来的数据字段顺序与源端是不一致的
_id是eVpxaYsBeUT5Hwt-LE-o 查出来的数据字段顺序不一致
[elasticsearch@localhost bin]$ curl -u elastic:elastic -XGET 'http://192.168.1.108:19200/app_message_all/_doc/eVpxaYsBeUT5Hwt-LE-o?pretty'
[elasticsearch@localhost bin]$ curl -u elastic:elastic123 -XGET 'http://192.168.1.109:19200/app_message_all/_doc/eVpxaYsBeUT5Hwt-LE-o?pretty'

 

2.目的端创建maping的时候可以少一些字段,但是同步会自动添加少的那些字段,若想不同步该字段可以通过filter实现

filter {
  mutate {
    remove_field => ["@timestamp", "@version","user_id"]
  }
}