debezium同步mysql数据至kafka(未完待续)

发布时间 2023-06-23 22:02:32作者: virtualzzf

实验环境

全部部署于本地虚拟机

1 mysql

参考 官方文档 和 根据官方示例镜像(debezium/example-mysql,mysql版本为8.0.32)

1.1 创建用户

官方镜像里一共有三个账号
debezium:connect用户
mysqluser:普通用户
replicator:用于主从?

设置命令

create user 'debezium'@'%' identified by "dbz";
create user 'mysqluser'@'%' identified by "mysqlpw";
create user 'replicator'@'%' identified by "replicator";
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium'@'%';
GRANT USAGE ON *.* TO 'mysqluser'@'%';
GRANT ALL PRIVILEGES ON `inventory`.* TO 'mysqluser'@'%';
GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'replicator'@'%';
flush privileges;

查询命令

select host,user from mysql.user;
show grants for `debezium`@`%`;

1.2 设置binlog

镜像配置文件

/etc/mysql/conf.d/mysql.cnf

[mysqld]
skip-host-cache
skip-name-resolve
user=mysql
symbolic-links=0
# ----------------------------------------------
# Enable the binlog for replication & CDC
# ----------------------------------------------
# Enable binary replication log and set the prefix, expiration, and log format.
# The prefix is arbitrary, expiration can be short for integration tests but would
# be longer on a production system. Row-level info is required for ingest to work.
# Server ID is required, but this will vary on production systems
server-id = 223344
log_bin = mysql-bin
expire_logs_days = 1
binlog_format = row

default_authentication_plugin = mysql_native_password

查询命令

SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::" FROM performance_schema.global_variables WHERE variable_name='log_bin';

1.3 使能GTIDs(实际镜像中未开启)

查询命令

SHOW VARIABLES LIKE 'gtid_mode';

1.4 配置会话定时器(实际镜像中为默认值 28800)

查询命令

show variables like 'interactive_timeout';
show variables like 'wait_timeout';

1.5 Enabling query log events(实际镜像中未开启)

查询命令

show variables like 'binlog_rows_query_log_events';

1.6 确认binlog row value options

查询命令

确保不为“PARTIAL_JSON”即可

show global variables where variable_name = 'binlog_row_value_options';

配置命令

set @@global.binlog_row_value_options="" ;

2 启动kafka

参考博文
单节点kafka部署笔记

3 启动connector

下载docker镜像

docker pull quay.io/debezium/postgres