Flink CDC 同步 demo

发布时间 2023-11-09 14:12:24作者: iyiluo
  1. 运行 docker-compose.yml 搭建数据库源,官方 mysql 样例数据源无法启动,改用其他 mysql 镜像
version: '2.1'

services:
  postgres:
    image: debezium/example-postgres:1.1
    ports:
      - "5432:5432"
    environment:
      - POSTGRES_PASSWORD=1234
      - POSTGRES_DB=postgres
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres


  mysql:
    image: mysql
    ports:
      - "3306:3306"
    environment:
      - MYSQL_ROOT_PASSWORD=123456
      - MYSQL_USER=mysqluser
      - MYSQL_PASSWORD=mysqlpw


  elasticsearch:
    image: elastic/elasticsearch:7.6.0
    environment:
      - cluster.name=docker-cluster
      - bootstrap.memory_lock=true
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
      - discovery.type=single-node
    ports:
      - "9200:9200"
      - "9300:9300"
    ulimits:
      memlock:
        soft: -1
        hard: -1
      nofile:
        soft: 65536
        hard: 65536


  kibana:
    image: elastic/kibana:7.6.0
    ports:
      - "5601:5601"

  1. 在 maven 仓库中没有找到 flink-1.8.0 版本的 elasticsearch 包,改用下载 flink-1.7.0 版本,bin 目录没有 bat 文件,需要上传到 linux 环境执行

  2. 服务器安装 java-11-openjdk,java 8 版本运行会报错,java 17 版本运行 flink-1.7.0 也会报反射相关的错误,所有用官方推荐的 java 11 版本

  3. 默认安装的 mysql 需要先修改时区,不然同步数据时会报错。时区永久生效需要写入 my.cnf

SET GLOBAL time_zone = 'Asia/Shanghai';
  1. 修改 flink 配置 conf/flink-conf.yaml,web 端默认监听 127.0.0.1,改成 0.0.0.0
rest.bind-address: 0.0.0.0
  1. 下载 jar 包,放入到 flink lib 目录下
flink-sql-connector-postgres-cdc-2.4.2.jar
flink-sql-connector-mysql-cdc-2.4.2.jar
flink-sql-connector-elasticsearch7-3.0.1-1.17.jar
  1. 启动 flink
./start-cluster.sh
./sql-client.sh
  1. 录入官方数据库样例数据

  2. flink 创建表,官方给的 shipments 样例数据会报错, shipments 需要添加一行

'slot.name' = 'flink'
  1. 其他步骤参考官方教程,可以顺利同步数据到 elasticsearch

参考链接:
基于 Flink CDC 构建 MySQL 和 Postgres 的 Streaming ETL