Hudi的precombine.field释疑

发布时间 2023-05-10 09:54:48作者: -见

从不同资料,可看到四个让人迷惑的 precombine.field 配置项:

  • precombine.field

  • write.precombine.field

  • hoodie.table.precombine.field

  • hoodie.datasource.write.precombine.field

它们是完全相同,还是有什么关系了?

  • hoodie.datasource.write.precombine.field

HoodieWriteConfig.java

public class HoodieWriteConfig extends HoodieConfig {
  public static final ConfigProperty<String> PRECOMBINE_FIELD_NAME = ConfigProperty
      .key("hoodie.datasource.write.precombine.field")
      .defaultValue("ts")
      .withDocumentation("Field used in preCombining before actual write. When two records have the same key value, "
          + "we will pick the one with the largest value for the precombine field, determined by Object.compareTo(..)");
}
  • hoodie.table.precombine.field

HoodieTableConfig.java

/**
 * Configurations on the Hoodie Table like type of ingestion,
 * storage formats, hive table name etc Configurations are loaded from hoodie.properties,
 * these properties are usually set during
 * initializing a path as hoodie base path and never changes during the lifetime of a hoodie table.
 *
 * @see HoodieTableMetaClient
 * @since 0.3.0
 */
public class HoodieTableConfig extends HoodieConfig {
  public static final ConfigProperty<String> PRECOMBINE_FIELD = ConfigProperty
      .key("hoodie.table.precombine.field")
      .noDefaultValue()
      .withDocumentation("Field used in preCombining before actual write. By default, when two records have the same key value, "
          + "the largest value for the precombine field determined by Object.compareTo(..), is picked.");
}
  • precombine.field

这个是 FlinkSQL 专用的,不能在 SparkSQL 等上使用,write.precombine.field 也是如此。

FlinkOptions.java

/**
 * Hoodie Flink config options.
 *
 * <p>It has the options for Hoodie table read and write. It also defines some utilities.
 */
@ConfigClassProperty(name = "Flink Options",
    groupName = ConfigGroups.Names.FLINK_SQL,
    description = "Flink jobs using the SQL can be configured through the options in WITH clause."
        + " The actual datasource level configs are listed below.")
public class FlinkOptions extends HoodieConfig {
  public static final String NO_PRE_COMBINE = "no_precombine";
  public static final ConfigOption<String> PRECOMBINE_FIELD = ConfigOptions
      .key("precombine.field")
      .stringType()
      .defaultValue("ts")
      // HoodieWriteConfig.PRECOMBINE_FIELD_NAME 为 hoodie.datasource.write.precombine.field
      .withFallbackKeys("write.precombine.field", HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key())
      .withDescription("Field used in preCombining before actual write. When two records have the same\n"
          + "key value, we will pick the one with the largest value for the precombine field,\n"
          + "determined by Object.compareTo(..)");
}

从上面的 precombine.field 定义可以看到,precombine.field 同 write.precombine.field、hoodie.datasource.write.precombine.field 是一样的,最底层用的都是 hoodie.datasource.write.precombine.field 。

  • write.precombine.field

完全等同于 precombine.field

从上面还没看出 hoodie.table.precombine.field 同其它三个有和关系,实际上也是一样的,这从 HoodieTableFactory.java 的实现可以看到。

/**
 * Hoodie data source/sink factory.
 */
public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
  /**
   * Supplement the table config options if not specified.
   */
  private void setupTableOptions(String basePath, Configuration conf) {
    StreamerUtil.getTableConfig(basePath, HadoopConfigurations.getHadoopConf(conf))
        .ifPresent(tableConfig -> {
          // HoodieTableConfig.RECORDKEY_FIELDS 为 hoodie.table.recordkey.fields
          // FlinkOptions.RECORD_KEY_FIELD 为 hoodie.datasource.write.recordkey.field
          if (tableConfig.contains(HoodieTableConfig.RECORDKEY_FIELDS)
              && !conf.contains(FlinkOptions.RECORD_KEY_FIELD)) {
            conf.setString(FlinkOptions.RECORD_KEY_FIELD, tableConfig.getString(HoodieTableConfig.RECORDKEY_FIELDS));
          }
          // HoodieTableConfig.PRECOMBINE_FIELD 为 hoodie.table.precombine.field
          // FlinkOptions.PRECOMBINE_FIELD 为 precombine.field 和 write.precombine.field、hoodie.datasource.write.precombine.field
          if (tableConfig.contains(HoodieTableConfig.PRECOMBINE_FIELD)
              && !conf.contains(FlinkOptions.PRECOMBINE_FIELD)) {
            conf.setString(FlinkOptions.PRECOMBINE_FIELD, tableConfig.getString(HoodieTableConfig.PRECOMBINE_FIELD));
          }
          if (tableConfig.contains(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE)
              && !conf.contains(FlinkOptions.HIVE_STYLE_PARTITIONING)) {
            conf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING, tableConfig.getBoolean(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE));
          }
        });
  }
}
  • 总结

precombine.field 和 write.precombine.field 仅限 FLinkSQL 使用。

HoodieConfig.java

/**
 * This class deals with {@link ConfigProperty} and provides get/set functionalities.
 */
public class HoodieConfig implements Serializable {
  public <T> String getString(ConfigProperty<T> configProperty) {
    Option<Object> rawValue = getRawValue(configProperty);
    return rawValue.map(Object::toString).orElse(null);
  }

 private <T> Option<Object> getRawValue(ConfigProperty<T> configProperty) {
    if (props.containsKey(configProperty.key())) {
      // 从 key 取到值
      return Option.ofNullable(props.get(configProperty.key()));
    }

    // 从 key 没有取到值,遍历所有的将废弃的 keys
    for (String alternative : configProperty.getAlternatives()) {
      if (props.containsKey(alternative)) {
        LOG.warn(String.format("The configuration key '%s' has been deprecated "
                + "and may be removed in the future. Please use the new key '%s' instead.",
            alternative, configProperty.key()));
        return Option.ofNullable(props.get(alternative));
      }
    }
    return Option.empty();
  }
}