通过Docker搭建Debezium同步MySQL的数据变化

发布时间 2023-10-24 17:07:18作者: 大卫小东(Sheldon)

Debezium是红帽开发的一款CDC产品,和阿里的Canel类似,都是同步binlog,不过强大了一点点。为了不再麻烦,下面称之为dbz。

达拉崩吧斑得贝迪卜多比鲁翁...

dbz的搭建依赖很多中间件:首先要有个MySQL库,dbz来读取数据库binlog(和数据库当前快照);读到后发给kafka,与kafka通信的叫connector;kafka自己是依赖zookeeper的。另外为了方便,我们也使用一个kafka的可视化客户端kafdrop。所以要用docker搭建,除了一个个创建容器,我们这里选择使用docker-compose。

参考 https://github.com/debezium/debezium-examples/blob/main/tutorial/README.md

完整的docker-compose.yml文件如下:

version: '2'
services:
  zookeeper:
    image: quay.io/debezium/zookeeper:${DEBEZIUM_VERSION}
    ports:
     - 2181:2181
     - 2888:2888
     - 3888:3888
    networks:
      - kafka_network
  kafka:
    image: quay.io/debezium/kafka:${DEBEZIUM_VERSION}
    ports:
     - 9092:9092
     - 29092:29092
     - 29093:29093
    networks:
      - kafka_network
    links:
     - zookeeper
    environment:
      ZOOKEEPER_CONNECT: zookeeper:2181
      ALLOW_PLAINTEXT_LISTENER: "yes"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL_SAME_HOST:PLAINTEXT,EXTERNAL_DIFFERENT_HOST:PLAINTEXT
      KAFKA_LISTENERS: EXTERNAL_SAME_HOST://:29092,EXTERNAL_DIFFERENT_HOST://:29093,INTERNAL://:9092
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,EXTERNAL_SAME_HOST://localhost:29092,EXTERNAL_DIFFERENT_HOST://10.136.184.253:29093
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
  kafdrop:
    image: obsidiandynamics/kafdrop
    restart: "no"
    environment:
      KAFKA_BROKERCONNECT: "kafka:9092"
      JVM_OPTS: "-Xms16M -Xmx512M -Xss180K -XX:-TieredCompilation -XX:+UseStringDeduplication -noverify"
    ports:
      - 9000:9000
    depends_on:
      - kafka
    networks:
      - kafka_network
  mysql:
    image: quay.io/debezium/example-mysql:${DEBEZIUM_VERSION}
    ports:
     - 3306:3306
    networks:
      - kafka_network
    environment:
     - MYSQL_ROOT_PASSWORD=debezium
     - MYSQL_USER=mysqluser
     - MYSQL_PASSWORD=mysqlpw
  connect:
    image: quay.io/debezium/connect:${DEBEZIUM_VERSION}
    ports:
     - 8083:8083
    links:
     - kafka
     - mysql
    networks:
      - kafka_network
    environment:
     - BOOTSTRAP_SERVERS=kafka:9092
     - GROUP_ID=1
     - CONFIG_STORAGE_TOPIC=my_connect_configs
     - OFFSET_STORAGE_TOPIC=my_connect_offsets
     - STATUS_STORAGE_TOPIC=my_connect_statuses
networks:
  kafka_network:
    name: kafka_docker_example_net

这个文件改造自官方的github样例:https://github.com/debezium/debezium-examples/blob/main/tutorial/docker-compose-mysql.yaml。
差异点主要有:

  • kafka暴露的端口,示例只有9092,但是我配置了3个。如果只开放9092,我们只能通过docker内部访问kafka。增加了29092使得我们可以在本地主机访问kafka并消费,所以29092必须开放。如果要其他电脑(能通过IP访问到你)也能消费到你docker里面的消息,则要通过29093,所以可以根据需要开放
  • kafka的环境参数。示例只指定了zookeeper地址,我这里为了配合开放29092和29093,增加了更多配置,可以根据需要裁剪
  • kafdrop,示例没用这个软件,我们这里自己加上的
  • networks,为这些软件指定网络组,使用默认的也行。

目前(2023年10月24日)的dbz版本是2.1(需要根据https://github.com/debezium/debezium-examples/blob/main/tutorial/README.md#using-mysql 调整),所以我们在搭建前指定环境变量

export DEBEZIUM_VERSION=2.1

然后在docker-compose.yml所在目录执行docker-compose up -d,经过几分钟就能全部启动

docker-compose up -d
[+] Building 0.0s (0/0)
[+] Running 5/5
 ✔ Container debezium-zookeeper-1           Running                        0.0s
 ✔ Container 64813a30b3e3_debezium-mysql-1  Running                        0.0s
 ✔ Container debezium-kafka-1               Running                        0.0s
 ✔ Container debezium-kafdrop-1             Running                        0.0s
 ✔ Container debezium-connect-1             Started                        0.7s

我们可以访问本机的8083端口查看connector状态,9000端口访问kafdrop。


接下来连接数据库,我们需要一个能访问binlog的用户(当然前提是数据库开启了binlog),这个用户要有REPLICATION SLAVE, REPLICATION CLIENT 权限。如果你用root访问,那当然具有最高权限,不用主动去增加了。

然后向8083端口POST连接请求,具体如下

curl --location --request POST 'http://0.0.0.0:8083/connectors' \
--header 'Accept: application/json' \
--header 'User-Agent: Apifox/1.0.0 (https://apifox.com)' \
--header 'Content-Type: application/json' \
--data-raw '{
    "name": "fastpass_latest-connector",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "localhost",
        "database.port": "3306",
        "database.user": "debezium",
        "database.password": "PQEXQPm7DDXEYtw4Qs6Xh3mXJgv4vCy5",
        "database.server.id": "6500",
        "snapshot.locking.mode": "none",
        "table.include.list":"fastpass.admin_etcd_config",
        "topic.prefix": "latest1",
        "database.include.list": "fastpass",
        "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
        "schema.history.internal.kafka.topic": "schema-changes.fastpass_latest"
    }
}'

这个请求改造自https://github.com/debezium/debezium-examples/blob/main/tutorial/register-mysql.json,主要的差异是

  • snapshot.locking.mode,这里我摸设置成不加锁,不然它会把库锁住,同步完快照再释放锁去同步binlog,因为不加锁的话,在同步快照和binlog之间的数据可能会丢失
  • table.include.list,示例没有指定同步哪些表,这样会同步库里所有的表,我们这里只同步某一个表

接下来写程序监听就行了,默认的topic是topic.prefix+database+table,比如latest1.fastpass.admin_etcd_config