logstash同步oracle到es

发布时间 2023-11-01 18:10:00作者: slnngk

环境:

DB:11.2.0.4

 

同步的表

create table tb_test (
  id number primary key,
  name varchar(32) not null,
  f_int number,
  f_dou number(10,2),
  f_flo number(9,2)
);



insert into tb_test (id,name,f_int,f_dou,f_flo) values(1,'name1',100,123.12,16.26);
insert into tb_test (id,name,f_int,f_dou,f_flo) values(2,'name2',200,323.12,26.46);
insert into tb_test (id,name,f_int,f_dou,f_flo) values(3,'name3',300,423.12,36.36);
insert into tb_test (id,name,f_int,f_dou,f_flo) values(4,'name4',400,623.12,46.56);
insert into tb_test (id,name,f_int,f_dou,f_flo) values(5,'name5',500,723.12,56.66);
insert into tb_test (id,name,f_int,f_dou,f_flo) values(6,'name5',500,723.12,56.66);
insert into tb_test (id,name,f_int,f_dou,f_flo) values(7,'name5',500,723.12,56.66);

 

配置文件

[root@host135 config]# more sync_ora2es_byid.conf 
#logstash输入配置
input {
  #jdbc输入配置,用来指定mysql中需要同步的数据查询SQL及同步周期
  jdbc {

    jdbc_driver_library => "/soft/ojdbc/ojdbc8.jar"
    jdbc_driver_class => "Java::oracle.jdbc.OracleDriver"
    jdbc_connection_string => "jdbc:oracle:thin:@192.168.1.7:1521:orcl"
    jdbc_user => "hxl"
    jdbc_password => "oracle"

    # 是否开启分页
    jdbc_paging_enabled => true
    # 是否开启记录上次追踪的结果,也就是上次更新的时间,这个会记录到 last_run_metadata_path 的文件
    use_column_value => true
    #用来控制增量更新的字段,一般是自增id或者创建,更新时间,注意这里要采用sql语句中select采用的字段别名
    tracking_column => "id"
    # tracking_column 对应字段的类型
    tracking_column_type => "numeric"
    # 设置定时任务间隔  含义:分、时、天、月、年,全部为*默认含义为每分钟跑一次任务,这里设置为每5分钟同步一次
    schedule => "*/5 * * * * *"
    #同步数据的查询sql语句
    statement => "select id,name,f_int,f_dou,f_flo from tb_test where id>:sql_last_value"
  }
}

#logstash输入数据的字段匹配和数据过滤
filter {
  mutate {
    copy => { "id" => "[@metadata][_id]"}
    remove_field => ["@version","@timestamp"]
  }
}

#logstash输出配置
output {
  #采用stdout可以将同步数据输出到控制台,主要是调试阶段使用
  stdout { codec =>  "rubydebug"}

  #指定输出到ES的具体索引
  elasticsearch {
      hosts => ["http://192.168.1.134:19200"]
      user => "elastic"
      password => "elastic"
      index => "index_ora_tb_test"
      document_id => "%{[@metadata][_id]}"
  }
}

 

jdbc驱动下载ojdbc8.jar

https://www.oracle.com/cn/database/technologies/appdev/jdbc-downloads.html

 

好像ojdbc11.jar不支持,我这里的logstash版本是:6.8.5