一、建表优化
1、iceberg表支持更新操作。
文档:https://iceberg.apache.org/docs/latest/configuration/
功能描述:因v1只支持insert,如果有更新场景,则需要建表时指定format为V2版本
参数:'format-version'='2'
CREATE TABLE data_lake_ods.test3 (
`id` int ,
`empcode` STRING
) USING iceberg
TBLPROPERTIES(
'format-version'='2'
);
2、建表时设置metadata.json保留版本个数
功能描述:每次插入数据都会生成一个metadata文件,插入次数太多会影响查询,所以设置保留版本个数
详细介绍和测试文档:Iceberg元数据合并-metadata.json文件
CREATE TABLE data_lake_ods.test3 (
`id` int ,
`empcode` STRING
) USING iceberg
TBLPROPERTIES(
'format-version'='2'
,'write.metadata.delete-after-commit.enabled'='true'
,'write.metadata.previous-versions-max'='3'
);
--插入和更新数据
insert into table iceberg_test.test3 values (1,"code1");
update iceberg_test.test3 set empcode='code2' where id=1;
3、删表时指定清理方式-定制
功能描述:默认drop table 不会清理hdfs数据,使用官方 DROP TABLE spark_catalog.db.sample PURGE时会清理数据,但是还留存【表/data】、【表/metadata】文件。
因社区主干分支不支持,需要使用特定的包结合社区1.3.1代码合并,其他版本也可以。
使用方式:
!!!使用优化后的icebrg包
在建表时需要开启'table.drop.base-path.enabled'='true'
删表时:DROP TABLE spark_catalog.iceberg_test.test3 PURGE
修改包:iceberg-spark-runtime-3.2_2.12-1.3.1.jar
社区提交代码:https://github.com/apache/iceberg/pull/1839/files
官方ddl文档:https://iceberg.apache.org/docs/1.3.1/spark-ddl/
备注:iceberg0.13.1 需要使用SparkCatalog才可以删除数据,iceberg1.3.1使用SparkCatalog、SparkSessionCatalog都可以操作。
CREATE TABLE iceberg_test.test3 (
`id` int ,
`empcode` STRING
) USING iceberg
TBLPROPERTIES(
'format-version'='2'
,'write.metadata.delete-after-commit.enabled'='true'
,'write.metadata.previous-versions-max'='3'
,'table.drop.base-path.enabled'='true'
);
二、元数据治理-存储过程
官网文档:https://iceberg.apache.org/docs/latest/spark-procedures/
1、合并小文件(spark-sql)
详细测试使用文档:Iceberg小文件合并测试
如果是大表,则先执行max-file-group-size-bytes=1的把删除文件合并,max-concurrent-file-group-rewrites设置为maxExecutors个数
CALL spark_catalog.system.rewrite_data_files(
table => 'iceberg_test.order_info1',
options => map(
'max-concurrent-file-group-rewrites','15',
'max-file-group-size-bytes','1',
'rewrite-all','true'
)
);
|
然后再执行,开始真正合并小文件(分组大小1GB)
CALL spark_catalog.system.rewrite_data_files(
table => 'iceberg_test.order_info1',
options => map(
'max-concurrent-file-group-rewrites','1',
'max-file-group-size-bytes','1073741824',
'target-file-size-bytes','67108864',
'rewrite-all','true'
)
);
其它优化参数
(1)rewrite-job-order=bytes-asc
说明:根据该值强制指定重写作业顺序
bytes-asc:则首先重写最小的作业组。
bytes-desc:则首先重写最大的作业组。
files-asc:则首先重写文件最少的作业组。
files-desc:则首先重写文件最多的作业组。
none(默认):则按照计划的顺序重写作业组(无特定顺序)。
(2)target-file-size-bytes
说明:目标输出文件大小
默认值:536870912(512 MB)
可以修改成:67108864(64MB)
2、删除过期快照(合并小文件后文件还不会清理,需要执行删除过期快照命令,这样才真正删除数据文件)
CALL spark_catalog.system.expire_snapshots(table => 'iceberg_test.order_info1', older_than => TIMESTAMP '2023-12-07 10:40:00.000');
参考文章:
1、Spark 合并 Iceberg 小文件内存溢出问题定位和解决方案
https://xie.infoq.cn/article/50259945d7663d7194a5e2763
2、通过flink、spark优化iceberg表小文件项目