Hudi的Flink配置项(1)

发布时间 2023-05-09 15:49:54作者: -见

名词

  • FallbackKeys

备选 keys,可理解为别名,当指定的 key 不存在是,则找备选 keys,在这里指配置项的名字。

相关源码

  • FlinkOptions
// https://github.com/apache/hudi/blob/master/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
// Hoodie Flink config options
// It has the options for Hoodie table read and write. It also defines some utilities.
public class FlinkOptions extends HoodieConfig {
  // ------------------------------------------------------------------------
  //  Index Options
  // ------------------------------------------------------------------------

  public static final ConfigOption<String> INDEX_TYPE = ConfigOptions
      .key("index.type")
      .stringType()
      .defaultValue(HoodieIndex.IndexType.FLINK_STATE.name()) // 默认为 FLINK_STATE 类型索引
      .withFallbackKeys(HoodieIndexConfig.INDEX_TYPE.key()) // hoodie.index.type
      .withDescription("Index type of Flink write job, default is using state backed index.");

  public static final String NO_PRE_COMBINE = "no_precombine";
  public static final ConfigOption<String> PRECOMBINE_FIELD = ConfigOptions
      .key("precombine.field")
      .stringType()
      .defaultValue("ts")
      .withFallbackKeys("write.precombine.field", HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key())
      .withDescription("Field used in preCombining before actual write. When two records have the same\n"
          + "key value, we will pick the one with the largest value for the precombine field,\n"
          + "determined by Object.compareTo(..)");
}
  • HoodieTableFactory
// https://github.com/apache/hudi/blob/3dcd7573fa26556af83cc81b108ae57cc363145c/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
/**
 * Hoodie data source/sink factory.
 */
public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
  /**
   * Supplement the table config options if not specified.
   */
  // 设置表的选项
  private void setupTableOptions(String basePath, Configuration conf) {
    StreamerUtil.getTableConfig(basePath, HadoopConfigurations.getHadoopConf(conf))
        .ifPresent(tableConfig -> {
		  // 如果指定了 hoodie.table.recordkey.fields,且没有指定 hoodie.datasource.write.recordkey.field,
		  // 则设置 hoodie.datasource.write.recordkey.field 的值为 hoodie.table.recordkey.fields 的值,
		  // hoodie.table.recordkey.fields 无默认值,但也可理解为默认值为空字符串。
          if (tableConfig.contains(HoodieTableConfig.RECORDKEY_FIELDS)
              && !conf.contains(FlinkOptions.RECORD_KEY_FIELD)) {
            conf.setString(FlinkOptions.RECORD_KEY_FIELD, tableConfig.getString(HoodieTableConfig.RECORDKEY_FIELDS));
          }
		  
		  // 如果指定了 hoodie.table.precombine.field,且没有指定 precombine.field,
		  // 则设置 precombine.field 的值为 hoodie.table.precombine.field 的值,
		  // hoodie.table.precombine.field 无默认值,但也可理解为默认值为空字符串。
          if (tableConfig.contains(HoodieTableConfig.PRECOMBINE_FIELD)
              && !conf.contains(FlinkOptions.PRECOMBINE_FIELD)) {
            conf.setString(FlinkOptions.PRECOMBINE_FIELD, tableConfig.getString(HoodieTableConfig.PRECOMBINE_FIELD));
          }
		  
		  // 如果指定了 hoodie.datasource.write.hive_style_partitioning,且没有指定 hoodie.datasource.write.hive_style_partitioning,
		  // 则设置 hoodie.datasource.write.hive_style_partitioning 的值为 hoodie.datasource.write.hive_style_partitioning 的值,
		  // hoodie.datasource.write.hive_style_partitioning 的默认值为 false。
          if (tableConfig.contains(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE)
              && !conf.contains(FlinkOptions.HIVE_STYLE_PARTITIONING)) {
            conf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING, tableConfig.getBoolean(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE));
          }
        });
  }
}

有备选的配置项

  • 可有多个备选 keys,下表中的反斜杠“/”两侧分别为不同的备选 keys,应用时任选其一即可。
Flink配置项名 备选的配置项名 默认值 作用
table.type hoodie.table.type COPY_ON_WRITE 指表类型
index.type hoodie.index.type FLINK_STATE 指定索引类型
precombine.field write.precombine.field/hoodie.datasource.write.precombine.field ts 指定合并字段名,Hudi 还有个 hoodie.table.precombine.field
payload.class write.payload.class EventTimeAvroPayload 指定 Payload 类名
record.merger.impls hoodie.datasource.write.record.merger.impls HoodieAvroRecordMerger
record.merger.strategy hoodie.datasource.write.record.merger.strategy eeb8d96f-b1e4-49fd-bbf8-28ac514178e5
cdc.enabled hoodie.table.cdc.enabled" false
cdc.supplemental.logging.mode hoodie.table.cdc.supplemental.logging.mode
metadata.enabled hoodie.metadata.enable true
hive_sync.enabled hive_sync.enable false