重新编译kyuubi-1.6.1版本使其提交flink sql流式任务时不阻塞

发布时间 2023-10-07 16:14:18作者: 爱睡懒觉的我

kyuubi-1.6.1版本对于flink sql的支持不是很好,在提交流式任务时会阻塞进程,为了修复这个缺陷,需要修改源代码并重新编译

待编译的kyuubi版本:kyuubi-1.6.1-incubating

适配的flink版本:flink-1.14.4

1、下载kyuubi-1.6.1-incubating版本的源代码,并导入IDEA中

git clone -b v1.6.1-incubating https://github.com/apache/kyuubi.git

2、在根目录找到总项目的pom文件,修改部分配置属性

将flink.version和flink.module.scala.suffix属性改为下面所示

<flink.version>1.14.4</flink.version>
<flink.module.scala.suffix>_2.12</flink.module.scala.suffix>

3、修改类org.apache.kyuubi.engine.flink.operation.ExecuteStatement

找到runOperation方法,将其按照以下代码修改

private def runOperation(operation: Operation): Unit = {
    // FLINK-24461 executeOperation method changes the return type
    // from TableResult to TableResultInternal
    val executeOperation = DynMethods.builder("executeOperation")
      .impl(executor.getClass, classOf[String], classOf[Operation])
      .build(executor)
    val result = executeOperation.invoke[TableResult](sessionId, operation)
    jobId = result.getJobClient.asScala.map(_.getJobID)
    // return jobId when submit success and table.dml-sync is false
    if (operation.getClass == classOf[CatalogSinkModifyOperation]) {
      resultSet = ResultSet.builder
        .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
        .columns(Column.physical("jobId", DataTypes.STRING()))
        .data(Array(Row.of(jobId.map(_.toHexString).getOrElse("null"))))
        .build
    } else {
      resultSet = ResultSet.fromTableResult(result)
    }
  }

4、修改scalastyle-config.xml文件

scalastyle-config.xml文件在项目的根目录下,找到以下配置

<check level="error" class="org.scalastyle.scalariform.ImportOrderChecker" enabled="true">
    <parameters>
        <parameter name="groups">java,scala,3rdParty,kyuubi</parameter>
        <parameter name="group.java">javax?\..*</parameter>
        <parameter name="group.scala">scala\..*</parameter>
        <parameter name="group.3rdParty">(?!org\.apache\.kyuubi\.).*</parameter>
        <parameter name="group.kyuubi">org\.apache\.kyuubi\..*</parameter>
    </parameters>
</check>

将enabled属性修改为false(如果为true,编译的时候会检查类导入顺序,容易导致编译失败),修改后如下所示

<check level="error" class="org.scalastyle.scalariform.ImportOrderChecker" enabled="false">
    <parameters>
        <parameter name="groups">java,scala,3rdParty,kyuubi</parameter>
        <parameter name="group.java">javax?\..*</parameter>
        <parameter name="group.scala">scala\..*</parameter>
        <parameter name="group.3rdParty">(?!org\.apache\.kyuubi\.).*</parameter>
        <parameter name="group.kyuubi">org\.apache\.kyuubi\..*</parameter>
    </parameters>
</check>

5、重新编译

在linux环境下,执行以下命令进行编译

cd kyuubi && ./build/dist --tgz --spark-provided --flink-provided --hive-provided

编译完成后,在项目的根目录即可找到编译后的包