通过StreamSets实现SQLServer实时更新数据至ElasticSearch

发布时间 2023-08-06 19:30:11作者: 一克猫

前言

  网上许多关于StreamSets增量更新的教程几乎都是单单INSERT操作,这使得目标数据库会出现重复数据,而实际需求上我们往往更多是需要INSERTUPDATE操作,利用SQL ServerTIMESTAMP(时间戳)特性,可以很容易实现这一点。

源数据库配置

  需要明白一点,在SQL Server中的TIMESTAMP和时间无关,每次对INSERTUPDATE操作,对于TIMESTAMP列所在的行中的值均会更新。

  将时间戳字段LastTimestamp作为偏移量填入Offset Column处,偏移量初始值Initial Offset设为0。

1.png

时间戳处理

  由于ElaticSearch没有TIMESTAMP或相似的类型,故作了转换处理,即上图的BIGINT类型,而直接将转换后的数据映射到目标数据库却会报错,我暂时不知道怎么解决,就通过Field Remover做个移除。

2.png

目标数据库配置

  注意Default Operation需要选择UPDATE with doc_as_upsert

3.png