使用canal同步mysql数据到elasticsearch

发布时间 2023-05-24 18:14:30作者: 月习

官方去下载canal包

https://github.com/alibaba/canal/releases/tag/canal-1.1.6

分为deployer、admin、adapter三个模块。deployer是数据库数据同步服务端。adapter是适配同步到不同终端,可以是es,hbase,redis其它数据库等。admin是一个配置管理中心,但是吧又没有配置adapter的界面,adapter还要连admin对应的配置库。每个模块包下都有

bin目录存放启停脚本,conf配置文件,logs日志文件。

deployer

conf/canal.properties文件,这里只吧canal_local.properties里的配置项对应更新进来。实例就使用默认的example,

修改example实例配置文件conf/example/instance.properties

主要修改主库连接信息,根据实际来。

canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8

然后执行 bin/startup.sh 启动deployer。

其它数据库准备活动参考写的这篇文章吧:使用canal同步mysql数据

admin

conf下有一个canal_manager.sql数据库初始化文件,执行创建对应的canal_manager库。

是一个springboot工程,application.yml修改数据库连接配置

spring.datasource:
  address: 127.0.0.1:3306
  #上面刚创建的数据库
  database: canal_manager
  username: canal
  password: canal

浏览器访问http://localhost:8089/ 默认账号 admin/123456.

界面上有server管理和instance管理。server对应的deployer服务,instance对应的example这种配置。可以在线配置,会自动刷新到deployer配置中。如果这里看不到server,可能需要重启下deployer。

adapter

主要是deployer的消费端适配,这里以同步到es为例

conf/bootstrap.yml配置文件,修改为连接admin对应配置库canal_manager的连接信息。

conf/application.yml配置,只列修改的配置,其它的都用默认值

canal.conf:
  consumerProperties:
    # deployer 服务地址
    canal.tcp.server.host: 127.0.0.1:11111

  srcDataSources:
    defaultDS:
      url: jdbc:mysql://127.0.0.1:3306/db_test?useUnicode=true
      username: root
      password: root
  canalAdapters:
  - instance: test # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
      #es adapter 连接信息
      #name 改成es7,会读取 conf/es7/下的es配置文件
      - name: es7
        #es rest地址
        hosts: http://127.0.0.1:9200 # 127.0.0.1:9200 for rest mode
        properties:
          mode: rest
          #账号密码
          security.auth: elastic:888888 
          #集群名称
          cluster.name: my-application

然后修改conf/es7/下的配置文件。默认有biz_order.yml、customer.yml、mytest_user.yml三个样例配置文件。只保留一个mytest_user.yml文件。

然后数据库创建一个用户表用来作为同步数据验证。

创建表

CREATE TABLE `sys_user` (
	`id` INT(10) NOT NULL AUTO_INCREMENT,
	`code` VARCHAR(50) NOT NULL,
	`name` VARCHAR(50) NOT NULL,
	PRIMARY KEY (`id`) USING BTREE
)

mytest_user.yml对应配置

#application.yml中配置的srcDataSources
dataSourceKey: defaultDS
destination: example
groupId: g1
esMapping:
  #es中index名称
  _index: mytest_user
  #索引id使用的列
  _id: _id
#  upsert: true
#  pk: id
 #索引对应的数据
  sql: "select a.id as _id, a.name, a.code from sys_user a"

这里也支持比较复杂的关联查询取数。对应的mytest_user索引要预先在es中创建好。

索引创建,使用kibana的dev tool界面创建

PUT mytest_user
{
  "mappings": {
      "properties": {
          "name": {
              "type": "text"
          },
          "code": {
              "type": "text"
          }
      }
  }
}

bin/start.sh 启动adapter。

sys_user表插入数据验证是否同步成功。

可能遇到的问题

1、jdk版本问题。

官方说的1.1.6版本可以使用jdk1.8。然而在启动admin和deployer的时候会遇到一些问题。例如NoSuchMethodError: java.nio.ByteBuffer.clear()Ljava/nio/ByteBuffer异常,所以最好jdk升级到11.

2、启动参数

默认配置:-Xms2g -Xmx3g Xss256k。堆内存根据自己配置实际来。Xss栈内存我这里是起不来的,

会报The stack size specified is too small, Specify at least 328k异常。调大。

3、mysql8连接问题

发生Caused by: java.io.IOException: caching_sha2_password Auth failed异常,连接不上数据库。

修改slave用户canal密码认证方式。mysql8默认密码认证方式是caching_sha2_password ,修改成mysql_native_password。

ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'canal';
FLUSH PRIVILEGES;

root用户执行该语句可能会报1227错误需要SYSTEM_USER 权限。root用户没有,需要先新增root权限

grant system_user on *.* to 'root';
FLUSH PRIVILEGES;

查看用户认证方式

SELECT user,authentication_string,plugin,host FROM mysql.user;

坑还是挺多的,算是跑起来了。