Logstash全量、增量数据同步

发布时间 2024-01-10 09:59:41作者: 粒子先生

Logstash-input-jdbc安装

logstash-input-jdbc插件是logstash 的一个个插件,使用ruby语言开发。所以要先安装ruby,也是为了好使用ruby中的gem安装插件,下载地址: https://rubyinstaller.org/downloads/,下载下来之后,进行安装

然后修改gem的源,使用以下命令查看gem源

 
gem sources -l

删除默认的源

 
gem sources --remove https://rubygems.org/

添加新的源

 
gem sources -a http://gems.ruby-china.org/
gem sources -l

更改成功,还的修改Gemfile的数据源地址。步骤如下:

 
gem install bundler
bundle config mirror.https://rubygems.org https://gems.ruby-china.org

然后就是安装logstash-input-jdbc,在logstash-5.5.1/bin目录下,执行安装命令

.\logstash-plugin.bat install logstash-input-jdbc
 

全量数据同步

input {
  jdbc {
    jdbc_driver_library => "./config/mysql-connector-java-5.1.39.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    # 数据库相关配置
    jdbc_connection_string => "jdbc:ip:port/jdcomm?characterEncoding=UTF-8&useSSL=false"
    jdbc_user => "root"
    jdbc_password => "password"
    statement => "SELECT * FROM blwjxb_z"
    jdbc_paging_enabled => "true"
    jdbc_page_size => "50000"
    schedule => "*/10 * * * *"
  }
}
  
filter {
   json {
        source => "message"
        remove_field => ["message"]
    }
}
  
output {
  stdout {
    codec => rubydebug
  }
  elasticsearch {
    hosts => "localhost"
    #将mysql数据加入myindex索引下,会自动创建
    index => "myindex"
    # 自增ID 需要关联的数据库中有有一个id字段,对应索引的id号
    document_id => "%{id}"
  }       
}

 

增量数据同步

input {
  jdbc {
    jdbc_driver_library => "./config/mysql-connector-java-5.1.39.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    # 数据库相关配置
    jdbc_connection_string => "jdbc:ip:port/jdcomm?characterEncoding=UTF-8&useSSL=false"
    jdbc_user => "root"
    jdbc_password => "password"
    statement => "SELECT * FROM blwjxb_z where id > :sql_last_value"
    #使用其它字段追踪,而不是用时间
    use_column_value => true
    #追踪的字段
    tracking_column => id
    record_last_run => true
    #上一个sql_last_value值的存放文件路径, 必须要在文件中指定字段的初始值
    last_run_metadata_path => "./config/station_parameter.txt"
    jdbc_paging_enabled => "true"
    jdbc_page_size => "50000"
    schedule => "* * * * *"
  }
}
  
filter {
   json {
        source => "message"
        remove_field => ["message"]
    }
}
  
output {
  stdout {
    codec => rubydebug
  }
  elasticsearch {
    hosts => "localhost"
    #将mysql数据加入myindex索引下,会自动创建
    index => "myindex"
    # 自增ID 需要关联的数据库中有有一个id字段,对应索引的id号
    document_id => "%{id}"
  }       
}

思路就是每次脚本定时执行的时候会去找id>station_parameter.txt中设置的数值,每次增量数据同步后,station_parameter.txt中的数值会自动更新。

官网地址

https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html