Apache Hudi 使用指南

发布时间 2023-09-27 09:27:27作者: LittleDonkey

欢迎参观我的博客,一个Vue 与 SpringBoot结合的产物:https://poetize.cn

原文链接:https://poetize.cn/article?id=31

Apache Hudi 中文官网

Maven

<dependency>
    <groupId>org.apache.hudi</groupId>
    <artifactId>hudi-flink1.14-bundle</artifactId>
    <version>0.13.0</version>
</dependency>
CREATE TABLE user (
    id STRING primary key,
    name STRING,
    ts TIMESTAMP,
    `partition` STRING
)
PARTITIONED BY (`partition`)
WITH (
    'connector' = 'hudi',
    'path' = 'hdfs://ip:9003/data/hudi/user',
    'hive_sync.enable' = 'false',
    'hive_sync.db' = 'ods',
    'hive_sync.table' = 'user',
    'hive_sync.mode' = 'hms',
    'hive_sync.jdbc_url' = 'jdbc:hive2://ip:10000',
    'hive_sync.metastore.uris' = 'thrift://ip:9083',
    'hive_sync.username' = 'root',
    'hive_sync.password' = 'aaa',
    'table.type' = 'MERGE_ON_READ',
    'write.operation' = 'upsert',
    'hoodie.datasource.write.recordkey.field' = 'id',
    'precombine.field' = 'ts',
    'changelog.enabled' = 'true',
    'read.streaming.enabled' = 'true',
    'read.start-commit' = 'earliest',
    'read.streaming.check-interval' = '10'
)

DataStream

/**
 * Source
 */
String targetTable = "user";
String basePath = "hdfs://ip:9003/data/hudi/user";

Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.PATH.key(), basePath);
options.put(FlinkOptions.CHANGELOG_ENABLED.key(), "true");
options.put(FlinkOptions.READ_AS_STREAMING.key(), "true");
options.put(FlinkOptions.READ_START_COMMIT.key(), "yyyyMMddHHmmss");
options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name());

HoodiePipeline.Builder builder = HoodiePipeline.builder(targetTable)
    .column("id STRING")
    .column("name STRING")
    .column("ts TIMESTAMP")
    .column("`partition` STRING")
    .pk("id")
    .partition("partition")
    .options(options);

DataStream<RowData> rowDataDataStream = builder.source(env);
rowDataDataStream.print();


/**
 * Sink
 */
String targetTable = "user";
String basePath = "hdfs://ip:9003/data/hudi/user";

Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.PATH.key(), basePath);
options.put(FlinkOptions.CHANGELOG_ENABLED.key(), "true");
options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name());
options.put(FlinkOptions.PRECOMBINE_FIELD.key(), "ts");

DataStream<RowData> dataStream = env.addSource(...);
HoodiePipeline.Builder builder = HoodiePipeline.builder(targetTable)
    .column("id STRING")
    .column("name STRING")
    .column("ts TIMESTAMP")
    .column("`partition` STRING")
    .pk("id")
    .partition("partition")
    .options(options);

builder.sink(dataStream, false);

Hudi Hive

  • COPY_ON_WRITE:会生成原表名,可以提供实时视图查询以及增量视图查询。
  • MERGE_ON_READ:映射为两张Hive外部表原表名_ro(ro表)和原表名_rt(rt表)。ro表提供读优化视图查询,rt表提供实时视图查询以及增量视图查询。

使用 Hive 查询 Hudi 表前,需要通过 set 命令设置hive.input.format

增量查询还需要 set 命令额外设置3个参数:

set hoodie.tableName.consume.mode=INCREMENTAL;
set hoodie.tableName.consume.start.timestamp=commitTime;
set hoodie.tableName.consume.max.commits=3;

| 参数名 | 描述 |
| :-: | :-: | :-: |
| hoodie.tableName.consume.mode | Hudi表的查询模式。增量查询 :INCREMENTAL。非增量查询:不设置或者设为SNAPSHOT
| hoodie.tableName.consume.start.timestamp | Hudi表增量查询起始时间
| hoodie.tableName.consume.max.commits | Hudi表基于hoodie.tableName.consume.start.timestamp之后要查询的增量commit次数。例如:设置为3时,增量查询从指定的起始时间之后commit 3次的数据;设为-1时,增量查询从指定的起始时间之后提交的所有数据

COW 表查询

实时视图
设置 hive.input.format 为以下两个之一:

org.apache.hadoop.hive.ql.io.HiveInputFormat;
org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;

增量视图
除了要设置 hive.input.format,还需要设置上述的3个增量查询参数,且增量查询语句中必须添加 where 关键字并将 _hoodie_commit_time > 'startCommitTime' 作为过滤条件(这地方主要是hudi的小文件合并会把新旧commit的数据合并成新数据,hive是没法直接从parquet文件知道哪些是新数据哪些是老数据):

set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
set hoodie.hudicow.consume.mode=INCREMENTAL;
set hoodie.hudicow.consume.max.commits=-1;
set hoodie.hudicow.consume.start.timestamp=xxx;
select * from hudicow where _hoodie_commit_time > 'xxx'

MOR 表查询

实时视图
设置了 hive.input.format 之后,即可查询到Hudi源表的最新数据:

set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;

读优化视图
ro表全称 read oprimized table,对于 MOR 表同步的 xxx_ro 表,只暴露压缩后的 parquet。其查询方式和COW表类似。设置完 hiveInputFormat 之后 和普通的 Hive 表一样查询即可。

增量视图
这个增量查询针对的rt表,不是ro表。同 COW 表的增量查询类似:

set hive.input.format=org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;	//这地方指定为HoodieCombineHiveInputFormat
set hoodie.hudimor.consume.mode=INCREMENTAL;
set hoodie.hudimor.consume.max.commits=-1;
set hoodie.hudimor.consume.start.timestamp=xxx;
select * from hudimor_rt where _hoodie_commit_time > 'xxxx'

批量插入

write.operation = bulk_insert
满足快照数据导入需求。如果快照数据来自其他数据源,请使用该模式快速将快照数据导入Hudi。
bulk_insert消除了序列化和数据合并,跳过重复数据删除,因此用户需要保证数据的唯一性。

索引引导

index.bootstrap.enabled = true
如果已经通过bulk_insert批量插入到Hudi中。 用户可以实时插入,并使用索引引导功能确保数据不重复。
启用索引引导后,Hudi表中的剩余记录将一次性加载到Flink状态。

使用方法:

  • CREATE TABLE创建与Hudi表对应的语句,并设置index.bootstrap.enabled = true
  • 设置Flink检查点故障容限:在flink-conf.yaml配置execution.checkpointing.tolerable-failed-checkpoints = n
  • 等待第一个检查点成功,指示索引引导已完成。
  • 索引引导完成后,用户可以退出并保存保存点。
  • 重新启动作业,设置index.bootstrap.enable = false

注意:

  • 索引引导程序是阻塞的,因此在索引引导期间无法完成检查点。
  • 索引引导程序由输入数据触发,用户需要确保每个分区中至少有一个记录。
  • 第一个成功的检查点表示索引引导已完成。从检查点恢复时,无需再次加载索引。