【Flink系列二十一】深入理解 JVM的类型加载约束,解决 Flink 类型加载冲突问题的通用方法

发布时间 2023-12-29 12:25:40作者: 一杯半盏

class ByteArrayDeserializer is not an instance of org.apache.kafka.common.serialization.Deserializer

Debugging Classloading

类似的 X cannot be cast to X exceptions

如何理解这类异常?

这类异常可以归纳为类型异常,按个人有限经验,现象分为两种常见情况:

  • 类型赋值检查:不能 assign、或者 is not instance
  • 类型转换检查:不能 cast

都是因为同一种机制。JVM的类加载器,默认按照双亲委派模型进行加载。我们一般编写的应用程序,不会打破这个机制。

类型加载顺序反转

像 Tomcat,Flink 这类运行时类型框架(Runtime Framework),尤其是具备 plugin 特性时,考虑到一些便利性,好处是允许用户的库能够覆盖框架自身的一些库。

Flink 的文档指出:大多数情况下都能正常工作,然而某些时候,免不了遇到类型冲突。

对 JVM 类型系统的理解

  1. 在不满足双亲委派的情况下,不同的类加载器能够同时分别加载同一个字节码文件。
  2. 这两个类加载器所对应的 class 被认为不是同一种类型。

参考链接:
Loading Constraints

5.3.4 主要内容概括

检查约束
在5.3.1,5.3.2,5.3.3 阶段(也就是各种类加载器对类进行加载的过程),JVM会记录这个类被哪个类加载器加载。

When a class or interface C = <N1, L1> makes a symbolic reference to a field or method of another class or interface D = <N2, L2>, the symbolic reference includes a descriptor specifying the type of the field, or the return and argument types of the method. It is essential that any type name N mentioned in the field or method descriptor denote the same class or interface when loaded by L1 and when loaded by L2.

当class或者interface C = <N1, L1> 符号引用了 class或者interface D = <N2, L2> 的 field 或者 method,符号引用包含了一个描述符,标明了 field 的类型,或者方法形参类型和返回值类型。基本地,当任何类型 N 出现在 field 或者 method 的描述符中,且存在相同的 class 或者 interface,被 L1加载 或者 L2加载。

The situations described here are the only times at which the Java Virtual Machine checks whether any loading constraints have been violated. A loading constraint is violated if, and only if, all the following four conditions hold:

  • There exists a loader L such that L has been recorded by the Java Virtual Machine as an initiating loader of a class C named N.

  • There exists a loader L' such that L' has been recorded by the Java Virtual Machine as an initiating loader of a class C ' named N.

  • The equivalence relation defined by the (transitive closure of the) set of imposed constraints implies NL = NL'.

  • C ≠ C '.

这里描述的情形,仅发生在当 JVM 检查是否存在违反加载约束时。当且仅当同时满足下列四种条件时,违反加载约束:

  1. 存在一个 L,被记录为类 C 的初始加载器,命名为 N。
  2. 存在一个 L', 被记录为类 C' 的初始加载器,命名为 N。
  3. 强制约束(传递闭包)所定义的等价性关系隐含着 NL = NL'。
  4. C ≠ C '

老美有点啰嗦,咱可以“简单”理解:

对于咱眼里的某个类型,他被两个类加载器分别加载了,但是咱是要求他们能互相赋值的,这样就违反了类型加载约束。

再看错误

Caused by: org.apache.kafka.common.KafkaException: 
  class org.apache.kafka.common.serialization.ByteArrayDeserializer is not an instance of org.apache.kafka.common.serialization.Deserializer

这个时候就很明显了,我们可以理解为:

某个类里面符号引用了一个Deserializer,这个类被 parent loader 加载了,同时 ByteArrayDeserializer 被另一个 Flink UserCode Classloader (child loader)加载了
然后某个地方,对他俩进行了赋值或者类型转换,违反了类型加载约束

结论

网络上关于这类问题的解决办法很简单,改变 flink 的 classloader 加载优先级策略。

官方指出的方法 classloader.resolve-order=parent-first

这里记录另一个方法。

细粒度调整类型加载顺序

classloader.parent-first-patterns.additional=org.apache.commons.collections
它改变了 common-collections 库的加载顺序。

classloader.parent-first-patterns.additional=org.apache.kafka
它改变了 kafka-clients 库的加载顺序。

注意: classloader.parent-first-patterns.additional 为正确写法,classloader.parent-first-patterns-additional 为错误写法。

加深理解

实际上,为什么这个问题这么常见?

查看 org.apache.kafka.common.serialization.Deserializer 这个类被谁符号引用了就能知道答案。

举例说明:

这个类已知被 flink-kafka-connector 中的某个类符号引用了。

KafkaRecordDeserializationSchema 看 import 就知道:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.Deserializer;