使用logstash同步mysql到ES

发布时间 2023-11-01 15:13:10作者: slnngk

环境:
OS:Centos 7
es:6.8.5
logstash:6.8.5
mysql:5.7

 

1.mysql创建表

create table tb_es (
  id bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键id',
  name varchar(32) not null,
  f_int int,
  f_dou double(10,2),
  f_flo float(9,2),
  create_time timestamp not null default current_timestamp,
  update_time timestamp not null default current_timestamp on update current_timestamp,
  primary key (id)
);

 

2.写入测试数据

insert into tb_es (name,f_int,f_dou,f_flo,create_time,update_time) values('name1',100,123.12,16.26,now(),now());
insert into tb_es (name,f_int,f_dou,f_flo,create_time,update_time) values('name2',200,323.12,26.46,now(),now());
insert into tb_es (name,f_int,f_dou,f_flo,create_time,update_time) values('name3',300,423.12,36.36,now(),now());
insert into tb_es (name,f_int,f_dou,f_flo,create_time,update_time) values('name4',400,623.12,46.56,now(),now());
insert into tb_es (name,f_int,f_dou,f_flo,create_time,update_time) values('name5',500,723.12,56.66,now(),now());

 

3.logstash配置文件

vi sync_mysql2es.conf

input {
  #jdbc输入配置,用来指定mysql中需要同步的数据查询SQL及同步周期
  jdbc {
    jdbc_driver_library => "/soft/mysql-connector-java-5.1.49.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://192.168.1.14:13306/db_hxl"
    jdbc_user => root
    jdbc_password => mysql
    # 是否开启分页
    jdbc_paging_enabled => true
    # 是否开启记录上次追踪的结果,也就是上次更新的时间,这个会记录到 last_run_metadata_path 的文件
    use_column_value => true
    #用来控制增量更新的字段,一般是自增id或者创建,更新时间,注意这里要采用sql语句中select采用的字段别名
    tracking_column => "unix_ts_in_secs"
    # tracking_column 对应字段的类型
    tracking_column_type => "numeric"
    # 设置定时任务间隔  含义:分、时、天、月、年,全部为*默认含义为每分钟跑一次任务,这里设置为每5分钟同步一次
    schedule => "*/5 * * * * *"
    #同步数据的查询sql语句
    statement => "select *, unix_timestamp(update_time) as unix_ts_in_secs from tb_es where (unix_timestamp(update_time) > :sql_last_value AND update_time < NOW()) ORDER BY update_time ASC"
  }
}

#logstash输入数据的字段匹配和数据过滤
filter {
  mutate {
    copy => { "id" => "[@metadata][_id]"}
    remove_field => ["@version", "@timestamp","unix_ts_in_secs"]
  }
}

#logstash输出配置
output {
  #采用stdout可以将同步数据输出到控制台,主要是调试阶段使用
  stdout { codec =>  "rubydebug"}

  #指定输出到ES的具体索引
  elasticsearch {
      hosts => ["http://192.168.1.134:19200"]
      user => "elastic"
      password => "elastic"
      index => "index_tb_es"
      document_id => "%{[@metadata][_id]}"
  }
}

 

4.执行同步
/opt/logstash-6.8.5/bin/logstash -f /opt/logstash-6.8.5/config/sync_mysql2es.conf

 

5.查看同步的数据

查看index

[root@host135 soft]# curl -u elastic:elastic -H "Content-Type: application/json" -X GET 'http://192.168.1.134:19200/_cat/indices?v'
health status index              uuid                   pri rep docs.count docs.deleted store.size pri.store.size
yellow open   index_tb_es        fLGPfjfWSfCH0WLsn0176w   5   1          5            0     20.3kb         20.3kb
yellow open   vaccination_report mD_OEHpHQNeT7-S7wMhP9A   5   1     147693            0     52.7mb         52.7mb
green  open   .security-6        KUv3Qpw5Qg-bDQ6kE0csHQ   1   0          6            0     19.5kb         19.5kb

 

查看index数据

curl -u elastic:elastic -H "Content-Type: application/json" -XPOST '192.168.1.134:19200/index_tb_es/_search?pretty' -d '
{
"query": { "match_all": {} },
"size":10
}'

 

查看mapping

[root@host135 logstash-6.8.5]# curl -u elastic:elastic -H "Content-Type: application/json" -XGET "http://192.168.1.134:19200/index_tb_es/_mappings?pretty=true"
{
  "index_tb_es" : {
    "mappings" : {
      "doc" : {
        "properties" : {
          "create_time" : {
            "type" : "date"
          },
          "f_dou" : {
            "type" : "float"
          },
          "f_flo" : {
            "type" : "float"
          },
          "f_int" : {
            "type" : "long"
          },
          "id" : {
            "type" : "long"
          },
          "name" : {
            "type" : "text",
            "fields" : {
              "keyword" : {
                "type" : "keyword",
                "ignore_above" : 256
              }
            }
          },
          "update_time" : {
            "type" : "date"
          }
        }
      }
    }
  }
}

 

6.重新做全量同步

每次做完同步后,同步的时间点会记录到文件.logstash_jdbc_last_run,该文件是隐藏文件,文件位置使用find查找
[root@host135 config]# find / -name *logstash_jdbc_last_run*
/root/.logstash_jdbc_last_run

删除该文件重新同步

 

##################提前创建好mapping的同步#########################

1.提前创建好空的index和mapping

curl -u elastic:elastic -X PUT "192.168.1.134:19200/index_tb_es01?pretty" -H 'Content-Type: application/json' -d'
{}
'

 

2.创建mapping

6.8.5版本的type默认值为doc,所有我们这里使用doc

curl -u elastic:elastic -H 'Content-Type: application/json' -XPOST "http://192.168.1.134:19200/index_tb_es01/doc/_mapping?pretty" -d ' 
{
        "properties" : {
          "create_time" : {
            "type" : "date",
            "format" : "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
          },
          "f_dou" : {
            "type" : "float"
          },
          "f_flo" : {
            "type" : "float"
          },
          "f_int" : {
            "type" : "long"
          },
          "id" : {
            "type" : "long"
          },
          "name" : {
            "type" : "text",
            "fields" : {
              "keyword" : {
                "type" : "keyword",
                "ignore_above" : 256
              }
            }
          },
          "update_time" : {
            "type" : "date",
            "format" : "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
          }
        }
}'

 

执行同步发现报错误:

"reason"=>"failed to parse field [create_time] of type [date] in document with id '8'", "caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"Invalid format: \"2023-11-01T01:19:38.000Z\" is malformed at \"T01:19:38.000Z\""}}}}}

 

解决办法:
使用date_format格式化日期字段,如下:
date_format(create_time,'%Y-%m-%d %H:%i:%S') AS createTime

 

修改同步的配置文件:

[root@host135 config]# more sync_mysql2es.conf 
#logstash输入配置
input {
  #jdbc输入配置,用来指定mysql中需要同步的数据查询SQL及同步周期
  jdbc {
    jdbc_driver_library => "/soft/mysql-connector-java-5.1.49.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://192.168.1.14:13306/db_hxl"
    jdbc_user => root
    jdbc_password => mysql
    # 是否开启分页
    jdbc_paging_enabled => true
    # 是否开启记录上次追踪的结果,也就是上次更新的时间,这个会记录到 last_run_metadata_path 的文件
    use_column_value => true
    #用来控制增量更新的字段,一般是自增id或者创建,更新时间,注意这里要采用sql语句中select采用的字段别名
    tracking_column => "unix_ts_in_secs"
    # tracking_column 对应字段的类型
    tracking_column_type => "numeric"
    # 设置定时任务间隔  含义:分、时、天、月、年,全部为*默认含义为每分钟跑一次任务,这里设置为每5分钟同步一次
    schedule => "*/5 * * * * *"
    #同步数据的查询sql语句
    statement => "select id,name,f_int,f_dou,f_flo,date_format(create_time,'%Y-%m-%d %H:%i:%S') as create_time,date_fo
rmat(update_time,'%Y-%m-%d %H:%i:%S') as update_time, unix_timestamp(update_time) as unix_ts_in_secs from tb_es where 
(unix_timestamp(update_time) > :sql_last_value AND update_time < NOW()) ORDER BY update_time ASC"
  }
}

#logstash输入数据的字段匹配和数据过滤
filter {
  mutate {
    copy => { "id" => "[@metadata][_id]"}
    remove_field => ["@version","@timestamp", "unix_ts_in_secs"]
  }
}

#logstash输出配置
output {
  #采用stdout可以将同步数据输出到控制台,主要是调试阶段使用
  stdout { codec =>  "rubydebug"}

  #指定输出到ES的具体索引
  elasticsearch {
      hosts => ["http://192.168.1.134:19200"]
      user => "elastic"
      password => "elastic"
      index => "index_tb_es01"
      document_id => "%{[@metadata][_id]}"
  }
}