性能测试-Oceanus 测试FLink mysql到Iceberg性能

发布时间 2024-01-02 10:07:10作者: 黑水滴

一、任务依赖信息

1、mysql测试库信息

地址:127.0.0.1、gomysql_bdg_test

库:bdg_test

表:order_info1

2、iceberg库

hive地址:thrift://127:7004

catalog-name:spark_catalog

Format版本:v2

库:iceberg_test

表:order_info1

3、kafka信息

地址:腾讯kafka

topic:test_wangshida_order

group:read-test-wang

分区数:10

4、任务信息

全量增量同步:test_mysql_iceberg_order_info3

mysql到kafka:test_mysql_kafka_order

kafka到iceberg:test_kafka_iceberg_order

 

二、元数据信息

1、mysql建表

CREATE TABLE `order_info1` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'ID',
  `order_number` bigint(20) NOT NULL COMMENT '订单号,由外部提供(全局唯一)',
  `user_id` bigint(20) NOT NULL COMMENT '下单的用户Id student_number'
PRIMARY KEY (`id`),
  UNIQUE KEY `orderInfoNumberUniqueIndex` (`order_number`),
  KEY `idx_create_time` (`create_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 ROW_FORMAT=DYNAMIC COMMENT='订单信息'
 

2、iceberg建表

CREATE TABLE iceberg_test.order_info1(
  `id` bigint
  ,`order_number` bigint
  ,`user_id` bigint
)USING iceberg
TBLPROPERTIES('format-version'='2','property-version'='2','write.upsert.enabled'='true')
 

3、kafka信息

CREATE TABLE test_kafka_order(
    `id` bigint
    ,`order_number` bigint
    ,`user_id` bigint
) WITH (
    -- 定义 Kafka 参数
    'connector' = 'kafka',
    'topic' = 'test_wangshida_order',
    'scan.startup.mode' = 'earliest-offset', -- 可以是 latest-offset / earliest-offset / specific-offsets / group-offsets / timestamp 的任何一种
    'properties.bootstrap.servers' = '127:9092',  -- 替换为您的 Kafka 连接地址
    'properties.group.id' = 'read-test-wang', -- 必选参数, 一定要指定 Group ID
    -- 定义数据格式 (JSON 格式)
    'format' = 'json',
    'json.fail-on-missing-field' = 'false',  -- 如果设置为 false, 则遇到缺失字段不会报错。
    'json.ignore-parse-errors' = 'true'    -- 如果设置为 true,则忽略任何解析报错。
);
 

 

三、任务配置信息

1、引用jar包

2、高级参数

pipeline.max-parallelism: 2048
security.kerberos.login.principal: bdg_app@EMR
security.kerberos.login.keytab: bdg_app.keytab
security.kerberos.login.conf: krb5.conf
containerized.taskmanager.env.HADOOP_USER_NAME: bdg_app
containerized.master.env.HADOOP_USER_NAME: bdg_app
rest.flamegraph.enabled: true

四、任务运行信息

1、全量同步性能

同步表
同步数据量
TaskManager规格
并发
全量花费时间
速率
mysqlcpu使用率
TaskManager内存使用率
延迟时间
order_info1 2 6119 6181 1CU 2 3小时26分

高峰2.42万条/s

低峰2.19万条/s

3.5% 88% checkpoint设置一分钟情况下,一般下个checkpoint就可以写入,偶尔2个checkpoint
order_info1 2 6119 6181 0.5CU 20 37分

高峰14.24万条/s

低峰12.01万条/s

22.35% 83%

checkpoint设置一分钟情况下,一般下个checkpoint就可以写入,偶尔2个checkpoint

测试1条、10条、100条、1千条、1万条、10万条

order_info1 2 6119 6181 0.5CU 48 19分

高峰30.11万条/s

低峰21.05万条/s

57.2% 80%  

资源情况如下,全量同步完成后可以缩减资源运行。可以全量阶段加资源,全量完成后保存checkpoint,然后缩减资源重启任务。

 

2、增量同步性能

数据表
更新数据量
TM规格
并发
花费时间
速率
mysql cpu使用率
备注
order_info1 1000 0000 2CU 1 6分钟

高峰4.05万条/s

低峰4.01万条/s

波动 0%  因使用binlog更新,在更新阶段对mysql无影响,cpu等mysql指标无波动。

3、更新数据性能

数据表
更新数据量
TM规格
并发
花费时间
速率
mysql cpu使用率
备注
order_info1 1000 0000 2CU 1 7分钟

高峰6.10万条/s

低峰6.03万条/s

波动 0% 

1、更新操作是先删再增,统计数据实际2000万条数据操作。

2、因使用binlog更新,在更新阶段对mysql无影响,cpu等mysql指标无波动。

3、腾讯binlog消费速度没限制,阿里的有做限制,大多网络和磁盘io是瓶颈。

4、flink cdc读取binlog是单线程读取,对应Oceanus的1CU(1核4G)且只能使用一核。 分配过多的CU和并行度对速度提升较少。因写入iceberg也需要一些资源,建议增量阶段分配2CU 1并行度即可。提给Oceanus团队尝试优化Flink cdc期望有些逻辑可以并行处理。

 

order_info1 1000 0000 0.5CU 1 -

高峰0.86万条/s

低峰0.73万条/s

波动 0% 

 

test_wangshida_order(kafka) 2100 0000 2CU 10 3分钟内

高峰29.17万条/s

低峰4.17万条/s

波动 0% 

1、数据源数kafka开启10并行度,消费数据更新iceberg表。更新速度最高可达29万,3分钟内可以处理2000万数据。iceberg可以并行更新。

 

 

五、参数调优

修改参数:ALTER TABLE order_info3 SET TBLPROPERTIES('write.distribution-mode'='hash');

1、写入参数介绍

属性

默认值

描述

write.format.default parquet 表的默认文件格式。parquet、avro、orc
write.delete.format.default 数据文件格式 表的默认删除文件格式。parquet、avro、orc
write.parquet.row-group-size-bytes 128 MB Row Group大小,最细数据块大小
write.parquet.page-size-bytes 1 MB Page最小存储单元
write.parquet.compression-codec gzip parquet压缩编解码器。zstd, brotli, lz4, gzip, snappy, uncompressed
write.parquet.compression-level null parquet压缩等级
write.avro.compression-codec gzip Avro压缩编解码器。gzip(deflate with 9 level), zstd, snappy, uncompressed
write.avro.compression-level null Avro压缩等级
write.metadata.compression-codec none 元数据压缩编码。none、gzip
write.target-file-size-bytes 512MB 控制生成的文件的大小
write.delete.target-file-size-bytes 64MB 控制生成的删除文件的大小
write.distribution-mode none 写数据的分布方式。none不处理;Hash按分区键散列;range如果表有SortOrder则按分区键或排序键范围分布
write.delete.distribution-mode hash 写删除数据时分布方式
write.update.distribution-mode hash 写更新数据时分布方式
write.merge.distribution-mode none 写合并数据时分布方式
write.metadata.delete-after-commit.enabled false 是否在提交后删除最旧跟踪版本元数据文件
write.metadata.previous-versions-max 100 提交后删除之前保留的旧版本元数据文件最大值
write.data.path table location+/data 数据文件的基本位置
write.metadata.path table location+/metadata 元数据文件的基本位置
write.delete.mode copy-on-write 用于删除命令的模式:写时复制copy-on-write或读时合并merge-on-read(仅限v2)
write.update.mode copy-on-write 用于更新命令的模式:写时复制copy-on-write或读时合并merge-on-read(仅限v2)
write.merge.mode copy-on-write 用于合并命令的模式:写时复制copy-on-write或读时合并merge-on-read(仅限v2)
write.merge.isolation-level serializable 合并命令的隔离级别:serializable、snapshot

 

2、表行为属性

属性

默认值

描述

属性

默认值

描述

commit.retry.num-retries 4 在失败之前重试提交的次数
commit.retry.min-wait-ms 100 重试提交之前等待的最小时间(以毫秒为单位)
commit.retry.max-wait-ms 60000(1 min) 重试提交前等待的最大时间(以毫秒为单位)
commit.retry.total-timeout-ms 1800000 (30 min) 提交的总重试超时时间(以毫秒计)
commit.status-check.num-retries 3 由于提交状态未知导致提交失败之前,连接丢失后检查提交是否成功的次数
commit.status-check.min-wait-ms 1000(1s) 重新尝试状态检查之前等待的最小时间(以毫秒为单位)
commit.status-check.max-wait-ms 60000 (1 min) 在重新尝试状态检查之前等待的最大时间(以毫秒为单位)
commit.status-check.total-timeout-ms 1800000 (30 min) 提交状态检查必须成功的总超时时间,以毫秒为单位
commit.manifest.target-size-bytes 8388608 (8 MB) 合并清单文件时的目标大小
commit.manifest.min-count-to-merge 100 合并前要累积的最少清单数
commit.manifest-merge.enabled true 控制是否在写时自动合并清单
history.expire.max-snapshot-age-ms 432000000 (5 days) 快照到期时保留在表及其所有分支上的默认最大快照年龄
history.expire.min-snapshots-to-keep 1 快照到期时保留在表及其所有分支上的默认最小快照数
history.expire.max-ref-age-ms Long.MAX_VALUE (永久) 对于除主分支以外的快照引用,在快照过期时要保留的快照引用的默认最大年龄。主干永不过期。

其它属性参考官网地址

 

六、测试结论

checkpoint设置1分钟场景下,一般情况下一个checkpoint就写入
1、一次写入10条,则下个checkpoint就进入
2、连续1分钟写一次,下个checkpoint就进入
3、删除数据,下个checkpoint就逻辑删除

 

七、更新数据

1、更新数据
update order_info1 set purchase_info_id='test6' where id>10000000 and id<=10100000;
update order_info1 set purchase_info_id='test6' where id>10100000 and id<=10200000;
update order_info1 set purchase_info_id='test6' where id>10200000 and id<=10300000;
update order_info1 set purchase_info_id='test6' where id>10300000 and id<=10400000;
update order_info1 set purchase_info_id='test6' where id>10400000 and id<=10500000;
update order_info1 set purchase_info_id='test6' where id>10500000 and id<=10600000;
update order_info1 set purchase_info_id='test6' where id>10600000 and id<=10700000;
update order_info1 set purchase_info_id='test6' where id>10700000 and id<=10800000;
update order_info1 set purchase_info_id='test6' where id>10800000 and id<=10900000;
update order_info1 set purchase_info_id='test6' where id>10900000 and id<=11000000;
2、插入数据
insert into order_info1 (order_number,user_id,sub_clazz_number,clazz_number,course_number,purchased_minutes,follow_order,order_status,time_expire,purchase_info_id,paid_time,pay_channel,create_time,original_price,spike_price_id,voucher_price,user_voucher_id,manual_price,price,refund_price,refund_time,order_type,parent_order_number,update_time,user_address_id,pay_number,coupon_price,activity_number,course_original_price,course_price,activity_type,activity_ext_info,activity_price,business_type,replay_limit,biz_line,app_version,app_id,service_period) SELECT id as order_number,user_id,sub_clazz_number,clazz_number,course_number,purchased_minutes,follow_order,order_status,time_expire,purchase_info_id,paid_time,pay_channel,create_time,original_price,spike_price_id,voucher_price,user_voucher_id,manual_price,price,refund_price,refund_time,order_type,parent_order_number,update_time,user_address_id,pay_number,coupon_price,activity_number,course_original_price,course_price,activity_type,activity_ext_info,activity_price,business_type,replay_limit,biz_line,app_version,app_id,service_period from order_info1 where id>10000000 and id<=12000000;

八、参考文档

1、iceberg官网-建表

https://iceberg.apache.org/docs/latest/configuration/

2、iceberg官网-flink写入

https://iceberg.apache.org/docs/latest/flink-connector/