flink之java.lang.NumberFormatException: For input string错误

发布时间 2023-05-17 16:06:23作者: -见

场景:

使用flink读取一张hudi表,将数据写入到另外一张hudi表。

错误栈:

java.lang.NumberFormatException: For input string: "test_table"
	at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
	at java.lang.Long.parseLong(Long.java:589)
	at java.lang.Long.valueOf(Long.java:803)
	at org.apache.hudi.util.DataTypeUtils.resolvePartition(DataTypeUtils.java:137)
	at org.apache.hudi.table.format.mor.MergeOnReadInputFormat.lambda$getReader$0(MergeOnReadInputFormat.java:317)
	at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)
	at org.apache.hudi.table.format.mor.MergeOnReadInputFormat.getReader(MergeOnReadInputFormat.java:306)
	at org.apache.hudi.table.format.mor.MergeOnReadInputFormat.getFullSchemaReader(MergeOnReadInputFormat.java:292)
	at org.apache.hudi.table.format.mor.MergeOnReadInputFormat.open(MergeOnReadInputFormat.java:209)
	at org.apache.hudi.source.StreamReadOperator.processSplits(StreamReadOperator.java:163)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
	at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
	at org.apache.flink.streaming.runtime.tasks.mailbox.    .processMailsNonBlocking(MailboxProcessor.java:353)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:835)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:784)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:955)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:934)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:748)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:569)
	at java.lang.Thread.run(Thread.java:748)

相关源码:

/**
 * Resolves the partition path string into value obj with given data type.
 */
public static Object resolvePartition(String partition, DataType type) {
    if (partition == null) {
      return null;
    }

    LogicalTypeRoot typeRoot = type.getLogicalType().getTypeRoot();
    switch (typeRoot) {
      case CHAR:
      case VARCHAR:
        return partition;
      case BOOLEAN:
        return Boolean.parseBoolean(partition);
      case TINYINT:
        return Integer.valueOf(partition).byteValue();
      case SMALLINT:
        return Short.valueOf(partition);
      case INTEGER:
        return Integer.valueOf(partition);
      case BIGINT:
        return Long.valueOf(partition);
      case FLOAT:
        return Float.valueOf(partition);
      case DOUBLE:
        return Double.valueOf(partition);
      case DATE:
        return LocalDate.parse(partition);
      case TIMESTAMP_WITHOUT_TIME_ZONE:
        return LocalDateTime.parse(partition);
      case DECIMAL:
        return new BigDecimal(partition);
      default:
        throw new RuntimeException(
            String.format(
                "Can not convert %s to type %s for partition value", partition, type));
    }
}

问题原因:

Flink创建表时没有指定分区,但在Flink读取表时却定义了“hoodie.datasource.write.partitionpath.field”的值。

在 SparkSQL 上查询到的结果:

_hoodie_partition_path	_hoodie_file_name	                                                    ds
                        00000005-ac62-4771-8f59-974c1bb1fc27_0-1-0_20230511185221213.parquet	20220904