Logstash-input-jdbc安装
logstash-input-jdbc插件是logstash 的一个个插件,使用ruby语言开发。所以要先安装ruby,也是为了好使用ruby中的gem安装插件,下载地址: https://rubyinstaller.org/downloads/,下载下来之后,进行安装
然后修改gem的源,使用以下命令查看gem源
gem sources -l
删除默认的源
gem sources --remove https://rubygems.org/
添加新的源
gem sources -a http://gems.ruby-china.org/ gem sources -l
更改成功,还的修改Gemfile的数据源地址。步骤如下:
gem install bundler bundle config mirror.https://rubygems.org https://gems.ruby-china.org
然后就是安装logstash-input-jdbc,在logstash-5.5.1/bin目录下,执行安装命令
.\logstash-plugin.bat install logstash-input-jdbc
全量数据同步
input { jdbc { jdbc_driver_library => "./config/mysql-connector-java-5.1.39.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" # 数据库相关配置 jdbc_connection_string => "jdbc:ip:port/jdcomm?characterEncoding=UTF-8&useSSL=false" jdbc_user => "root" jdbc_password => "password" statement => "SELECT * FROM blwjxb_z" jdbc_paging_enabled => "true" jdbc_page_size => "50000" schedule => "*/10 * * * *" } } filter { json { source => "message" remove_field => ["message"] } } output { stdout { codec => rubydebug } elasticsearch { hosts => "localhost" #将mysql数据加入myindex索引下,会自动创建 index => "myindex" # 自增ID 需要关联的数据库中有有一个id字段,对应索引的id号 document_id => "%{id}" } }
增量数据同步
input { jdbc { jdbc_driver_library => "./config/mysql-connector-java-5.1.39.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" # 数据库相关配置 jdbc_connection_string => "jdbc:ip:port/jdcomm?characterEncoding=UTF-8&useSSL=false" jdbc_user => "root" jdbc_password => "password" statement => "SELECT * FROM blwjxb_z where id > :sql_last_value" #使用其它字段追踪,而不是用时间 use_column_value => true #追踪的字段 tracking_column => id record_last_run => true #上一个sql_last_value值的存放文件路径, 必须要在文件中指定字段的初始值 last_run_metadata_path => "./config/station_parameter.txt" jdbc_paging_enabled => "true" jdbc_page_size => "50000" schedule => "* * * * *" } } filter { json { source => "message" remove_field => ["message"] } } output { stdout { codec => rubydebug } elasticsearch { hosts => "localhost" #将mysql数据加入myindex索引下,会自动创建 index => "myindex" # 自增ID 需要关联的数据库中有有一个id字段,对应索引的id号 document_id => "%{id}" } }
思路就是每次脚本定时执行的时候会去找id>station_parameter.txt中设置的数值,每次增量数据同步后,station_parameter.txt中的数值会自动更新。
官网地址
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html