优化-iceberg调参优化

发布时间 2024-01-02 09:36:56作者: 黑水滴

一、建表优化

1、iceberg表支持更新操作。

文档:https://iceberg.apache.org/docs/latest/configuration/

功能描述:因v1只支持insert,如果有更新场景,则需要建表时指定format为V2版本

参数:'format-version'='2'

CREATE TABLE data_lake_ods.test3 (
`id` int ,
`empcode` STRING
) USING iceberg
TBLPROPERTIES(
'format-version'='2'
);

2、建表时设置metadata.json保留版本个数

功能描述:每次插入数据都会生成一个metadata文件,插入次数太多会影响查询,所以设置保留版本个数

详细介绍和测试文档:Iceberg元数据合并-metadata.json文件

CREATE TABLE data_lake_ods.test3 (
`id` int ,
`empcode` STRING
) USING iceberg
TBLPROPERTIES(
'format-version'='2'
,'write.metadata.delete-after-commit.enabled'='true'
,'write.metadata.previous-versions-max'='3'
);
--插入和更新数据
insert into table iceberg_test.test3 values (1,"code1");
update iceberg_test.test3 set empcode='code2' where id=1;

3、删表时指定清理方式-定制

功能描述:默认drop table 不会清理hdfs数据,使用官方 DROP TABLE spark_catalog.db.sample PURGE时会清理数据,但是还留存【表/data】、【表/metadata】文件。

因社区主干分支不支持,需要使用特定的包结合社区1.3.1代码合并,其他版本也可以。

使用方式:

        !!!使用优化后的icebrg包

        在建表时需要开启'table.drop.base-path.enabled'='true'

        删表时:DROP TABLE spark_catalog.iceberg_test.test3 PURGE

修改包:iceberg-spark-runtime-3.2_2.12-1.3.1.jar

社区提交代码:https://github.com/apache/iceberg/pull/1839/files

官方ddl文档:https://iceberg.apache.org/docs/1.3.1/spark-ddl/

备注:iceberg0.13.1 需要使用SparkCatalog才可以删除数据,iceberg1.3.1使用SparkCatalog、SparkSessionCatalog都可以操作。

CREATE TABLE iceberg_test.test3 (
`id` int ,
`empcode` STRING
) USING iceberg
TBLPROPERTIES(
'format-version'='2'
,'write.metadata.delete-after-commit.enabled'='true'
,'write.metadata.previous-versions-max'='3'
,'table.drop.base-path.enabled'='true'
);

二、元数据治理-存储过程

官网文档:https://iceberg.apache.org/docs/latest/spark-procedures/

1、合并小文件(spark-sql)

详细测试使用文档:Iceberg小文件合并测试

如果是大表,则先执行max-file-group-size-bytes=1的把删除文件合并,max-concurrent-file-group-rewrites设置为maxExecutors个数

CALL spark_catalog.system.rewrite_data_files(
table => 'iceberg_test.order_info1',
options => map(
  'max-concurrent-file-group-rewrites','15',
  'max-file-group-size-bytes','1',
  'rewrite-all','true'
  )
);
 

然后再执行,开始真正合并小文件(分组大小1GB)

CALL spark_catalog.system.rewrite_data_files(
table => 'iceberg_test.order_info1',
options => map(
  'max-concurrent-file-group-rewrites','1',
  'max-file-group-size-bytes','1073741824',
  'target-file-size-bytes','67108864',
  'rewrite-all','true'
  )
);

其它优化参数

(1)rewrite-job-order=bytes-asc
说明:根据该值强制指定重写作业顺序
    bytes-asc:则首先重写最小的作业组。
    bytes-desc:则首先重写最大的作业组。
    files-asc:则首先重写文件最少的作业组。
    files-desc:则首先重写文件最多的作业组。
    none(默认):则按照计划的顺序重写作业组(无特定顺序)。
(2)target-file-size-bytes
说明:目标输出文件大小
默认值:536870912(512 MB)
可以修改成:67108864(64MB)

2、删除过期快照(合并小文件后文件还不会清理,需要执行删除过期快照命令,这样才真正删除数据文件)

CALL spark_catalog.system.expire_snapshots(table => 'iceberg_test.order_info1', older_than => TIMESTAMP '2023-12-07 10:40:00.000');

 

 

 

 

 

参考文章:

1、Spark 合并 Iceberg 小文件内存溢出问题定位和解决方案

https://xie.infoq.cn/article/50259945d7663d7194a5e2763

2、通过flink、spark优化iceberg表小文件项目

https://github.com/zhuxiaoshang/flink-be-god/blob/master/flink-iceberg/src/main/java/flink/iceberg/compaction/SparkCompaction.java