Flink mysql-cdc连接器参数

发布时间 2023-12-28 17:20:47作者: 黑水滴

一、背景

通过Flink同步mysql到iceberg中,任务一直在运行中,但是在目标表看不到数据。经排查发现job manager一直在做切片,日志如下:

2023-12-28 16:58:36.251 [snapshot-splitting] INFO com.ververica.cdc.connectors.mysql.source.assigners.ChunkSplitter [] - ChunkSplitter has split 600 chunks for table
表的主键应该是使用不当变得非常大,最小值14415117481467904,最大值199932809068940288。真实数据只有4 7890 9615(4亿多)
job manager一直在计算分片超一个小时,默认每个分片8096条,所以计算非常缓慢。
flink计算出表为非均匀分布方式同步(日志中use unevenly-sized chunks),因非均匀分布方式会逐个扫描分片范围,可通过调整单分片大小来提高分片速度。
 

二、全量阶段核心设计

1、数据切片划分

全量阶段数据读取方式为分布式读取,会先对当前表数据按主键划分成多个Chunk,后续子任务读取Chunk 区间内的数据。根据主键列是否为自增整数类型,对表数据划分为均匀分布的Chunk及非均匀分布的Chunk。

(1)均匀分布方式(数值类型)

满足主键列自增且类型为整数类型(int,bigint,decimal)。查询出主键列的最小值,最大值,按 chunkSize 大小将数据均匀划分,因为主键为整数类型,根据当前chunk 起始位置、chunkSize 大小,直接计算 chunk 的结束位置。

注意:最新版本均匀分布的触发条件不再依赖主键列是否自增,要求主键列卫整数类型且根据 max(id) - min(id)/rowcount 计算出数据分布系数,只有分布系数 <= 配置的分布系数 (evenly-distribution.factor 默认为 1000.0d) 才会进行数据均匀划分。

//  计算主键列数据区间
select min(`order_id`), max(`order_id`) from demo_orders;

//  将数据划分为 chunkSize 大小的切片
chunk-0: [min,start + chunkSize)
chunk-1: [start + chunkSize, start + 2chunkSize)
.......
chunk-last: [max,null)

(2)非均匀分布方式(字符串或时间戳等其它类型)

主键列非自增或者类型为非整数类型。主键为非数值类型,每次划分需要对未划分的数据按主键进行升序排列,取出前 chunkSize 的最大值为当前 chunk 的结束位置。

注意:最新版本非均匀分布触发条件为主键列为非整数类型,或者计算出的分布系数 (distributionFactor) > 配置的分布系数 (evenly-distribution.factor)。

// 未拆分的数据排序后,取 chunkSize 条数据取最大值,作为切片的终止位置。
chunkend = SELECT MAX(`order_id`) FROM (
        SELECT `order_id`  FROM `demo_orders` 
        WHERE `order_id` >= [前一个切片的起始位置] 
        ORDER BY `order_id` ASC 
        LIMIT   [chunkSize]  
    ) AS T

2、全量切片数据读取

Flink 将表数据划分为多个 Chunk,子任务在不加锁的情况下,并行读取 Chunk 数据。因为全程无锁在数据分片读取过程中,可能有其他事务对切片范围内的数据进行修改,此时无法保证数据一致性。因此,在全量阶段 Flink 使用快照记录读取 + Binlog 数据修正的方式来保证数据的一致性。

 

三、mysql-cdc连接器参数

参数
说明
是否必填
备注
connector
源表类型
固定值为 mysql-cdc
hostname
MySQL 数据库的 IP 地址或者 Hostname
-
port
MySQL 数据库服务的端口号
默认值为3306
username
MySQL 数据库服务的用户名
有特定权限(包括 SELECT、RELOAD、SHOW DATABASES、REPLICATION SLAVE 和 REPLICATION CLIENT)的 MySQL 用户
password
MySQL 数据库服务的密码
-
database-name
MySQL 数据库名称
数据库名称支持正则表达式以读取多个数据库的数据
table-name
MySQL 表名
表名支持正则表达式以读取多个表的数据
server-id
数据库客户端的一个 ID
该 ID 必须是 MySQL 集群中全局唯一的。建议针对同一个数据库的每个作业都设置不同的 ID 范围值,例如5400-5405。默认会随机生成一个6400 - Integer.MAX_VALUE 的值
server-time-zone
数据库在使用的会话时区
例如 Asia/Shanghai,该参数控制了 MySQL 中的 TIMESTAMP 类型如何转成 STRING 类型
append-mode
开启 append 流模式
Flink1.13及以上版本支持, 例如:将 mysql-cdc 数据以 append 的方式同步到 hive
filter-duplicate-pair-records
过滤未在 Flink DDL 语句中定义的源表字段变更记录
例如 MySQL 源表有 a, b, c, d 四个字段,而用户在 Flink SQL 建表时只定义了 a, b 两个字段;开启该参数后,仅涉及 c 或 d 字段的变更记录会被忽略,不会输出到下游,可减少计算量和处理压力
scan.lastchunk.optimize.enable
对全量阶段的最后一个分片做重划分
如果全量同步期间,源表持续有大量写入和变更,则可能导致最后一个分片过大,引起 TaskManager OOM 崩溃重启。 开启本功能后(值设置为 true),Flink 会自动将过大的最后一个分片分成若干的小分片,提升作业的稳定性
debezium.min.row.count.to.stream.results
当表的条数大于该值时,会使用分批读取模式
默认值为1000。Flink 采用以下方式读取 MySQL 源表数据:
全量读取:直接将整个表的数据读取到内存里。优点是速度快,缺点是会消耗对应大小的内存,如果源表数据量非常大,可能会有 OOM 风险
分批读取:分多次读取,每次读取一定数量的行数,直到读取完所有数据。优点是读取数据量比较大的表没有 OOM 风险,缺点是读取速度相对较慢
debezium.snapshot.fetch.size
在 Snapshot 阶段,每次读取 MySQL 源表数据行数的最大值
仅当分批读取模式时,该参数生效
debezium.skipped.operations
需要过滤的 oplog 操作。操作包括 c 表示插入,u 表示更新,d 表示删除。默认情况下,不跳过任何操作,以逗号分隔
-
scan.incremental.snapshot.enabled
增量快照
默认为 true
scan.incremental.snapshot.chunk.size
当读取表的快照时,表快照捕获的表的块大小(行数)
默认为 8096
scan.lazy-calculate-splits.enabled
全量阶段JM中数据分片懒加载避免数据量太大,分片数据太多导致JM OOM
默认为 true
scan.newly-added-table.enabled
动态加表
默认为 false
scan.split-key.mode
联合主键作为 splitkey 的模式
取值为 default / specific;其中 default 为默认逻辑,采用联合主键的第一个字段作为 split key;设置为 specific 需要设置 scan.split-key.specific-column 指定联合主键中的某个字段
scan.split-key.specific-column
指定联合主键中某个字段作为 splitkey
当 scan.split-key.mode 为 specific 时必填。取值为联合主键中某个字段名
scan.startup.mode
MySQL CDC 消费者可选的启动模式
合法的模式为 "initial"(默认),"earliest-offset","latest-offset","specific-offset" 和 "timestamp"
scan.startup.specific-offset.file
在 "specific-offset" 启动模式下,启动位点的 binlog 文件位置
-
scan.startup.specific-offset.pos
在 "specific-offset" 启动模式下,启动位点的 binlog 文件位置
-
scan.startup.specific-offset.gtid-set
在 "specific-offset" 启动模式下,启动位点的 GTID 集合
-
scan.startup.timestamp-millis
在 "timestamp" 启动模式下,启动时间的毫秒时间戳
-
scan.startup.specific-offset.skip-events
在指定的启动位点后需要跳过的事件数量
-
scan.startup.specific-offset.skip-rows
在指定的启动位点后需要跳过的数据行数量
-
connect.timeout
尝试连接到 MySQL 数据库服务器后在超时之前等待的最长时间
默认 30s
connect.max-retries
建立MySQL连接尝试最大的次数
默认 3
connection.pool.size
连接池大小
默认 20
jdbc.properties.*
自定义JDBC URL参数,例如:'jdbc.properties.useSSL' = 'false'
默认 20
heartbeat.interval
发送心跳事件的时间间隔,用于跟踪最新可用的binlog偏移量, 一般用于解决慢表的问题(更新缓慢的数据表)
默认 20
debezium.*
Debezium 属性参数
从更细粒度控制 Debezium 客户端的行为。例如'debezium.snapshot.mode' = 'never',详情请参见 配置属性