Hudi学习笔记5 - Hudi配置分析(1)

发布时间 2023-05-08 15:56:03作者: -见

Hudi 官方配置文档:https://hudi.apache.org/docs/configurations,从源码分析可以看到配置项 hoodie.payload.ordering.field 已经废弃,取而代之的是 hoodie.datasource.write.precombine.field 。

ConfigProperty

ConfigProperty 聚合了 HoodieConfig 。

// https://github.com/apache/hudi/blob/master/hudi-common/src/main/java/org/apache/hudi/common/config/ConfigProperty.java
public class ConfigProperty<T> implements Serializable {
  private final String key; // 配置项名
  private final T defaultValue; // 配置项默认值
  private final String docOnDefaultValue;
  private final String doc;
  private final Option<String> sinceVersion;
  private final Option<String> deprecatedVersion;
  private final Set<String> validValues;
  private final boolean advanced;
  private final String[] alternatives;

  // provide the ability to infer config value based on other configs
  private final Option<Function<HoodieConfig, Option<T>>> inferFunction;
}

HoodieConfig

HoodieConfig 是所有配置的基类,提供了公共的 get/set 接口。

public class HoodieConfig implements Serializable {
}

HoodieWriteConfig

// https://github.com/apache/hudi/blob/master/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
// 相关文档:https://hudi.apache.org/docs/configurations
public class HoodieWriteConfig extends HoodieConfig {
  //  preCombineField 配置(hoodie.datasource.write.precombine.field)
  public static final ConfigProperty<String> PRECOMBINE_FIELD_NAME = ConfigProperty
    .key("hoodie.datasource.write.precombine.field")
    .defaultValue("ts") // 默认值
    .withDocumentation("Field used in preCombining before actual write. When two records have the same key value, "
      + "we will pick the one with the largest value for the precombine field, determined by Object.compareTo(..)");

  // Payload 配置(hoodie.datasource.write.payload.class)
  public static final ConfigProperty<String> WRITE_PAYLOAD_CLASS_NAME = ConfigProperty
    .key("hoodie.datasource.write.payload.class")
    .defaultValue(OverwriteWithLatestAvroPayload.class.getName())
    .markAdvanced()
    .withDocumentation("Payload class used. Override this, if you like to roll your own merge logic, when upserting/inserting. "
       + "This will render any value set for PRECOMBINE_FIELD_OPT_VAL in-effective");
}

HoodiePayloadConfig

从 HoodiePayloadConfig 的实现可以看到配置项 hoodie.payload.ordering.field 已被废弃,取而代之的是 hoodie.datasource.write.precombine.field 。

// https://github.com/apache/hudi/blob/master/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java
public class HoodiePayloadConfig extends HoodieConfig {
  public static final ConfigProperty<String> E // hoodie.payload.event.time.field
    .defaultValue("ts")
    .markAdvanced()
    .withDocumentation("Table column/field name to derive timestamp associated with the records. This can"
      + "be useful for e.g, determining the freshness of the table.");

  public static final ConfigProperty<String> PAYLOAD_CLASS_NAME = ConfigProperty
    .key("hoodie.compaction.payload.class")
    .defaultValue(OverwriteWithLatestAvroPayload.class.getName())
    .markAdvanced()
    .withDocumentation("This needs to be same as class used during insert/upserts. Just like writing, compaction also uses "
      + "the record payload class to merge records in the log against each other, merge again with the base file and "
      + "produce the final record to be written after compaction.");

  // hoodie.payload.ordering.field 已经废弃了,
  // 已由 hoodie.datasource.write.precombine.field 取代。
  /** @deprecated Use {@link HoodieWriteConfig#PRECOMBINE_FIELD_NAME} and its methods instead */
  @Deprecated - 弃用
  public static final ConfigProperty<String> ORDERING_FIELD = ConfigProperty
    .key(PAYLOAD_ORDERING_FIELD_PROP_KEY) // 即 hoodie.payload.ordering.field
    .defaultValue("ts")
    .markAdvanced()
    .withDocumentation("Table column/field name to order records that have the same key, before "
      + "merging and writing to storage.");

  /** @deprecated Use {@link #PAYLOAD_CLASS_NAME} and its methods instead */
  @Deprecated - 弃用
  public static final String DEFAULT_PAYLOAD_CLASS = PAYLOAD_CLASS_NAME.defaultValue();

  /** @deprecated Use {@link #PAYLOAD_CLASS_NAME} and its methods instead */
  @Deprecated - 弃用
  public static final String PAYLOAD_CLASS_PROP = PAYLOAD_CLASS_NAME.key();
}

// payload 类和 HoodiePayloadConfig 都要用到的公共类
// https://github.com/apache/hudi/blob/master/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePayloadProps.java
public class HoodiePayloadProps {
  public static final String PAYLOAD_ORDERING_FIELD_PROP_KEY = "hoodie.payload.ordering.field";
  public static final String PAYLOAD_EVENT_TIME_FIELD_PROP_KEY = "hoodie.payload.event.time.field";
  public static final String PAYLOAD_IS_UPDATE_RECORD_FOR_MOR = "hoodie.is.update.record.for.mor";
}

FlinkOptions

FlinkOptions 对部分 hudi 配置做了转换,比如使用 payload.class 替代 hoodie.datasource.write.payload.class 。

// https://github.com/apache/hudi/blob/master/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
public class FlinkOptions extends HoodieConfig {
  public static final ConfigOption<String> OPERATION = ConfigOptions
    .key("write.operation")
    .stringType()
    .defaultValue(WriteOperationType.UPSERT.value()) // 默认值为 upsert
    .withDescription("The write operation, that this write should do");

  @AdvancedConfig
  public static final ConfigOption<String> PAYLOAD_CLASS_NAME = ConfigOptions
    .key("payload.class")
    .stringType()
    .defaultValue(EventTimeAvroPayload.class.getName())
    .withFallbackKeys("write.payload.class", HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key()) // 实为 hoodie.datasource.write.payload.class
    .withDescription("Payload class used. Override this, if you like to roll your own merge logic, when upserting/inserting.\n"
      + "This will render any value set for the option in-effective");

  /**
   * Flag to indicate whether to drop duplicates before insert/upsert.
   * By default false to gain extra performance.
   */
  @AdvancedConfig
  public static final ConfigOption<Boolean> PRE_COMBINE = ConfigOptions
    .key("write.precombine")
    .booleanType()
    .defaultValue(false) // 默认值为 false
    .withDescription("Flag to indicate whether to drop duplicates before insert/upsert.\n"
      + "By default these cases will accept duplicates, to gain extra performance:\n"
      + "1) insert operation;\n"
      + "2) upsert for MOR table, the MOR table deduplicate on reading");

  public static final ConfigOption<String> RECORD_KEY_FIELD = ConfigOptions
    // RECORDKEY_FIELD_NAME 在 hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorOptions.java 中定义
    .key(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()) // hoodie.datasource.write.recordkey.field
    .stringType()
    .defaultValue("uuid") // 默认值
    .withDescription("Record key field. Value to be used as the `recordKey` component of `HoodieKey`.\n"
      + "Actual value will be obtained by invoking .toString() on the field value. Nested fields can be specified using "
      + "the dot notation eg: `a.b.c`");
}

KeyGeneratorOptions

// https://github.com/apache/hudi/blob/master/hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorOptions.java
// Hudi maintains keys (record key + partition path) for uniquely identifying a particular record.
// This config allows developers to setup the Key generator class that will extract these out of incoming records.
public class KeyGeneratorOptions extends HoodieConfig {
  public static final ConfigProperty<String> RECORDKEY_FIELD_NAME = ConfigProperty
      .key("hoodie.datasource.write.recordkey.field")
      .noDefaultValue()
      .withDocumentation("Record key field. Value to be used as the `recordKey` component of `HoodieKey`.\n"
          + "Actual value will be obtained by invoking .toString() on the field value. Nested fields can be specified using\n"
          + "the dot notation eg: `a.b.c`");
}