SchemaRegestry组件原生的类和方法无法实现flink消费kafka的数据动态调整schema的情况--未彻底解决

发布时间 2023-04-19 15:08:16作者: 贾彤

0、前提知识储备

Conflurent公司的SchemaRegestry组件的基本了解和使用

一、背景:

0.组件版本

flink:1.14

1.链路调整情况

原先链路:oracle-->OGG-->kafka-->flink-->数据库\湖\仓

实现链路:oracle-->OGG-->kafka(搭配conflurent公司的SchemaRegestry组件使用)-->flink-->数据库\湖\仓

2.链路调整缘由:

原链路中的kafka存储的数据格式是avro,每次源端oracle表做schema变更时,下游的相关程序都需要做停程序处理,费时费力,对运维不友好。

预期链路加入SchemaRegestry组件,它天然对avro格式数据支持,并且可以实现动态调整源端schema但不会要求程序手动停止。可以有效解耦链路的上下游,更加灵活,减少运维操作。

3.约束条件

目前项目组负责OGG和kafka的日常维护,flink程序及后续链路由其他项目组(包括我们自己的项目组)独立开发。因为SchemaRegestry组件是由我们负责引入的,在kafka之后链路上的项目组都需要做相应变更,所以需要我们项目组出一个样例代码(此代码逻辑已经实现并全链路跑通)。为使对下游代码改造的影响最小,要求个项目组在flink消费者程序中引入我们编写的反序列化代码,而不是实现自定义的SourceFunction。

二、目前困境

1.通过上面的背景可知,下游在改造时仍需要引入我们自定义的反序列化类,虽然这个类也是对flink原生类的一种具体实现。但是这种方式仍然不太友好,但是当前原生API不支持,找了官网和社区也没有发现有效的解决办法,大家都是在原生基础上按需进行封装。

2. 1.14版本的flink官网中存在AvroDeserializationSchema.forGeneric(...)这种方式,但是需要传入静态schema。目前schema是不确定的,需要根据消费信息中存储的id值去拿到对应的schema版本,这样也存在一个问题:初始化未进行消费时是无法拿到schema的,并且你也无法去解析消费信息对象去动态拿到id值进而拿到对应的schema。这样就成了一个死结。

三、目前的解决办法

1.对原生的反序列化类KafkaDeserializationSchema<GenericRecord>进行封装,实现只传入schemaregestryURL不需要传入schema就可以进行反序列化的操作,将schema变更和消费程序进行解耦。

四、未来优化方向

1.在引入SchemaRegestry组件后,优化flink消费kafka中的avro格式数据的方法,解耦schema的变更和程序运行之间的联系,确保flink消费程序可以实现:在初始化链路中无对应topic的数据时不会报错;当链路中存在积压数据的情况下,依然可以进行schema的变更,程序按照kafka的offset值顺序消费(先消费旧数据再消费新数据),并且在新旧数据的连接处可以自动实现schema的转换。

2.按照上述第一条的描述,可能需要程序在消费每条数据时拿到消息自带的id号并和缓存的schema进行比对,比对成功则反序列化数据;对比不成功则按照新id值重新获取schema文件并进行缓存,再重复上面的步骤。这样可能消费速率会比较慢,影响整个链路运行性能,具体影响多少需要进行仔细测试才能知道。