canal-1.1.5 使用指南

发布时间 2023-07-26 14:58:09作者: 彩虹栗子

1.canal下载

https://github.com/alibaba/canal/releases
canal-1.1.6 需要jdk11以上
canal-1.1.5 需要jdk8以上

2.canal使用 canal-1.1.5为例

2.1 开启binlog

再 my.cnf 中加入如下文件,重启sql服务。

log-bin=mysql-bin
binlog-format=ROW
server_id=1

给账户配权限

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'test'@'%' IDENTIFIED BY 'canal';
FLUSH PRIVILEGES;

2.2 deployer

通过指令解压
tar -zxvf canal.deployer-1.1.5.tar.gz
cd conf/example
修改配置文件instance.properties

#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0

# enable gtid use true/false
canal.instance.gtidon=false

# position info  主数据库地址
canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password  主数据库账号
canal.instance.dbUsername=root
canal.instance.dbPassword=password
canal.instance.defaultDatabaseName=test
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#################################################

2.3 adapter

通过指令解压
tar -zxvf canal.adapter-1.1.5.tar.gz
cd conf/
修改配置文件instance.properties
默认用TCP模式就行
srcDataSources 源数据库信息
canalAdapters 目标数据库信息

server:
  port: 8091
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: tcp #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: -1
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
    # canal tcp consumer
    canal.tcp.server.host: 127.0.0.1:11111
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:
    # kafka consumer
    # kafka.bootstrap.servers: 127.0.0.1:9092
    #kafka.enable.auto.commit: false
    #kafka.auto.commit.interval.ms: 1000
    #kafka.auto.offset.reset: latest
    #kafka.request.timeout.ms: 40000
    #kafka.session.timeout.ms: 30000
    #kafka.isolation.level: read_committed
    #kafka.max.poll.records: 1000
    # rocketMQ consumer
    #    rocketmq.namespace:
    #rocketmq.namesrv.addr: 127.0.0.1:9876
    #rocketmq.batch.size: 1000
    #rocketmq.enable.message.trace: false
    #rocketmq.customized.trace.topic:
    #rocketmq.access.channel:
    #rocketmq.subscribe.filter:
    # rabbitMQ consumer
    # rabbitmq.host:
    #rabbitmq.virtual.host:
    #rabbitmq.username:
    #rabbitmq.password:
    #rabbitmq.resource.ownerId:

  srcDataSources:
    defaultDS:
      url: jdbc:mysql://127.0.0.1:3306/test??useUnicode=true&useSSL=false&serverTimezone=UTC
      username: root
      password: password 
  canalAdapters:
  - instance: example # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
      - name: rdb
        key: mysql1
        properties:
          jdbc.driverClassName: com.mysql.jdbc.Driver
          jdbc.url: jdbc:mysql://33.33.33.33:3306/test?useUnicode=true&useSSL=false&serverTimezone=UTC
          jdbc.username: root
          jdbc.password: password
          druid.stat.enable: false
          druid.stat.slowSqlMillis: 1000
#      - name: rdb
#        key: oracle1
#        properties:
#          jdbc.driverClassName: oracle.jdbc.OracleDriver
#          jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
#          jdbc.username: mytest
#          jdbc.password: m121212
#      - name: rdb
#        key: postgres1
#        properties:
#          jdbc.driverClassName: org.postgresql.Driver
#          jdbc.url: jdbc:postgresql://localhost:5432/postgres
#          jdbc.username: postgres
#          jdbc.password: 121212
#          threads: 1
#          commitSize: 3000
#      - name: hbase
#        properties:
#          hbase.zookeeper.quorum: 127.0.0.1
#          hbase.zookeeper.property.clientPort: 2181
#          zookeeper.znode.parent: /hbase
#      - name: es
#        hosts: 127.0.0.1:9300 # 127.0.0.1:9200 for rest mode
#        properties:
#          mode: transport # or rest
#          # security.auth: test:123456 #  only used for rest mode
#          cluster.name: elasticsearch
#      - name: kudu
#        key: kudu
#        properties:
#          kudu.master.address: 127.0.0.1 # ',' split multi address
#      - name: phoenix
#        key: phoenix
#        properties:
#          jdbc.driverClassName: org.apache.phoenix.jdbc.PhoenixDriver
#          jdbc.url: jdbc:phoenix:127.0.0.1:2181:/hbase/db
#          jdbc.username:
#          jdbc.password:

修改配置文件bootstrap.yml,如果不使用canal-admin需注释掉,不然会报错

#canal:
#  manager:
#    jdbc:
#      url: jdbc:mysql://127.0.0.1:3306/canal_manager?useUnicode=true&characterEncoding=UTF-8
#      username: root
#      password: 121212

cd conf/rdb
使用mirrorDb:true 需要源数据库和目标数据库库名一样,该模式表结构会同步
使用单表同步,不需要库名一样,该模式表结构不会同步
异库 单表(可以库名,表名不一样),多表可以给每张表写一个yml文件,database为主数据库库名

dataSourceKey: defaultDS
destination: example
groupId: g1
outerAdapterKey: mysql1
concurrent: true
dbMapping:
 database: test
 table: user
 targetTable: user
 targetPk:
  id: id
 mapAll: true
  #  targetColumns:
  # id:
  # name:
  # role_id:
  # c_time:
  # test1:
  #etlCondition: "where c_time>={}"
  #commitBatch: 3000 # 批量提交的大小

使用全库同步:

## Mirror schema synchronize config
dataSourceKey: defaultDS
destination: example
groupId: g1
outerAdapterKey: mysql1
concurrent: true
dbMapping:
 mirrorDb: true
 database: test