使用canal同步mysql数据

发布时间 2023-04-24 18:03:26作者: 月习

工作原理

引用官方说明:

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送 dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)

所以在使用前要先准备mysql环境。开启binlog,创建slave用户。

开启binlog

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

创建slave用户canal

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

server端部署

下载地址:https://github.com/alibaba/canal/releases

目前最新版本canal.deployer-1.1.6.tar.gz

解压后目录:

bin目录存放启停脚本

conf目录配置文件

canal.properties

#提供socket连接服务端口,后面连接会使用到
canal.port = 11111 
#配置文件目录
canal.conf.dir = ../conf
#实例列表,默认安装包有一个自带的example,对应在conf/example/instance.properties可以配置实例信息。可以配置多个。后面连接也会用到
canal.destinations=example
...其它参数暂时不用,不调整

conf/example/instance.properties实例配置信息

# position info
#mysql数据库信息
canal.instance.master.address=127.0.0.1:3306
#binlog文件名
canal.instance.master.journal.name= 
canal.instance.master.position=
# username/password slave用户信息
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8

java客户端实例

添加依赖

<dependency>
	<groupId>com.alibaba.otter</groupId>
	<artifactId>canal.protocol</artifactId>
	<version>1.1.6</version>
</dependency>
<dependency>
	<groupId>com.alibaba.otter</groupId>
	<artifactId>canal.client</artifactId>
	<version>1.1.6</version>
</dependency>

代码实例,完全参考官方java clientexample

public class CanalTest {
    private static Logger logger = LoggerFactory.getLogger(CanalTest.class);
    public static void main(String[] args) {

        CanalConnector connector = CanalConnectors.newSingleConnector(
                new InetSocketAddress("127.0.0.1", 11111),
                "test", "canal","canal");

        int batchSize = 100;
        connector.connect();
        connector.subscribe(".*\\..*");
        connector.rollback();
        while (true){
            Message message = connector.getWithoutAck(batchSize);
            long batchId = message.getId();
            int size = message.getEntries().size();
            if(batchId == -1 || size == 0){
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }else{
                printMsg(message);
            }
        }
    }

    static void printMsg(Message message){
        message.getEntries().stream().forEach(entry -> {
            if(entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
                    || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND){
                return;
            }
            CanalEntry.Header header = entry.getHeader();

            CanalEntry.RowChange rowChange = null;
            try {
                rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (InvalidProtocolBufferException e) {
                e.printStackTrace();
            }
            CanalEntry.EventType eventType = rowChange.getEventType();
            logger.info("schema:{},table:{},logFile:{},offset:{},event:{}",
                    header.getSchemaName(),header.getTableName(),
                    header.getLogfileName(),header.getLogfileOffset(),eventType.name());

            rowChange.getRowDatasList().stream().forEach(rowData -> {
                if(CanalEntry.EventType.INSERT == eventType){
                    printColumn(rowData.getAfterColumnsList());
                }else if(CanalEntry.EventType.DELETE == eventType){
                    printColumn(rowData.getBeforeColumnsList());
                }else {
                    printColumn(rowData.getBeforeColumnsList());
                    printColumn(rowData.getAfterColumnsList());
                }
            });

        });
    }

    private static void printColumn(List<CanalEntry.Column> columns) {
        for (CanalEntry.Column column : columns) {
            logger.info("{}:{} update={}",column.getName(),column.getValue(),column.getUpdated());
            //System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }
}

启动服务后执行数据库操作

CREATE TABLE users (
	uid INT(11) NOT NULL AUTO_INCREMENT,
	ucode VARCHAR(20) NOT NULL,
	uname VARCHAR(50) NOT NULL,
	PRIMARY KEY (uid) 
);
INSERT INTO users(ucode,uname) VALUES('001','曹操');
UPDATE users SET uname='曹操' WHERE UCODE='001';

观察日志,基本上没有什么延迟能收到数据库变动事件。哪个库的哪个表是什么操作,操作前后的值都能拿到。

#创建表事件
 -schema:db_test,table:users,logFile:binlog.000013,offset:278141,event:CREATE
 #INSERT事件
 - schema:db_test,table:users,logFile:binlog.000013,offset:278561,event:INSERT
 - uid:1 update=true
 - ucode:001 update=true
 - uname:曹操 update=true
#UPDATE 事件
 - schema:db_test,table:users,logFile:binlog.000013,offset:279122,event:UPDATE
 #打印更新前
 - uid:1 update=false
 - ucode:001 update=false
 - uname:曹操 update=false
 #打印更新后
 - uid:1 update=false
 - ucode:001 update=false
 - uname:曹操1 update=true

拿到消息后一般会放到mq里,然后后续系统订阅消费。最好是放到能保证顺序消费的消息中间件了。kafka,rocketmq接入官方也有例子。

参考:

https://github.com/alibaba/canal/wiki/ClientExample

https://github.com/alibaba/canal/wiki/