Mysql数据同步至Elasticsearch

发布时间 2024-01-10 10:10:11作者: 粒子先生

方案对比

鉴于Canal是一个基于MySQL二进制日志的高性能数据同步系统, go-mysql-elasticsearch 是一个第三方插件的,虽然都可以同步mysql数据到es,由于es官方文档推荐使用canal工具来同步数据,因此主要将Canal的使用方式提供给用户。

核心概念

名词

解释

mysql binlog

MySQL 的 binlog 日志主要用于数据库的主从复制和数据恢复。binlog 中记录了数据的增删改查操作,主从复制过程中,主库向从库同步 binlog 日志,从库对 binlog 日志中的事件进行重放,从而实现主从同步。

mysqldump

MySQL 数据库中的数据进行全量导出的一个工具

Canal

基于MySQL二进制日志的高性能数据同步系统,进行数据库日志解析,获取增量变更进行同步,是Github开源ETL软件,详情请参见canal,ES官方文档推荐的mysql至es的数据同步工具。更详细说明请参见:https://yq.aliyun.com/articles/706226?spm=5176.11065265.1996646101.searchclickresult.1921233dWCtcRL&aly_as=HMMyF_kx

 go-mysql-elasticsearch

mysql至es的数据同步工具,属于第三方插件。

go-mysql-elasticsearch的基本原理:如果是第一次启动该程序,首先使用 mysqldump 工具对源 MySQL 数据库进行一次全量同步,通过 elasticsearch client 执行操作写入数据到 ES;然后实现了一个 mysql client,作为 slave 连接到源 MySQL,源 MySQL 作为 master 会将所有数据的更新操作通过 binlog event 同步给 slave,通过解析 binlog event 就可以获取到数据的更新内容,写入到 ES。

mypipe

mysql binlog 同步工具,能够将 binlog event 发送到 kafka

ROW

mysql binlog 日志的三种模式之一。记录每一行数据被修改的情况,但是日志量太大。

STATEMENT

mysql binlog 日志的三种模式之一。记录每一条修改数据的 SQL 语句,减少了日志量,但是 SQL 语句使用函数或触发器时容易出现主从不一致。

MIXED

mysql binlog 日志的三种模式之一。结合了 ROW 和 STATEMENT 的优点,根据具体执行数据操作的 SQL 语句选择使用 ROW 或者 STATEMENT 记录日志。

方案验证

方案一:使用Canal同步数据到ES

使用前提:

准备和MySQL、ES在相同区域、可用区、VPC、子网下的云主机,并已安装MySql和JDK。

ES中创建索引和mapping需要与MySQL中创建的表名称和字段(名称和类型)保持一致。

操作步骤:

1.连接MySQL数据库测试,命令如下:
mysql -h<MySQL实例的内网地址> -P<MySQL实例端口,一般为3306> -u<MySQL数据库的账号> -p<登录数据库的密码> -D< MySQL数据库的名称>

命令示例:

mysql -hmysql-cn-north-1-9dae15cd77e84bb8.rds.jdcloud.com -P3306 -utestml -pMl215517330# -Dtestml
2.安装并启动Canal-server。
1)修改conf/example/instance.properties文件。 

vi conf/example/instance.properties

配置项

说明

canal.instance.master.address

<MySQL数据库的内网地址>:<内网端口>,相关信息可在RDS for MySQL实例的基本信息页面获取。例如mysql-cn-north-1-9dae15cd77e84bb8.rds.jdcloud.com:3306

canal.instance.dbUsername

MySQL数据库的账号名称,可在实例的账号管理页面获取。

canal.instance.dbPassword

MySQL数据库的密码。

2)启动Canal-server。

./bin/startup.sh

3.安装并启动Canal-adapter。
1)修改conf/application.yml文件:

vi conf/application.yml

配置项

参数说明

canal.conf.canalServerHost

canalDeployer访问地址。保持默认(127.0.0.1:11111)即可。

canal.conf.srcDataSources.defaultDS.url

jdbc:mysql://<MySQL内网地址>:<内网端口>/<数据库名称>?useUnicode=true,相关信息可在 MySQL实例的基本信息页面获取。例如jdbc:mysql-cn-north-1-9dae15cd77e84bb8.rds.jdcloud.com:3306/testml?useUnicode=true

canal.conf.srcDataSources.defaultDS.username

MySQL数据库的账号名称,可在MySQL实例的账号管理页面获取。

canal.conf.srcDataSources.defaultDS.password

MySQL数据库的密码。

canal.conf.canalAdapters.groups.outerAdapters.hosts

定位到name:es的位置,将hosts替换为<京东云ES实例的内网地址>:<内网端口>,相关信息可在ES实例的基本信息概览页面获取。例如es-nlb-es-5gi2ck2s6w.jvessel-open-hb.jdcloud.com:9200:

canal.conf.canalAdapters.groups.outerAdapters.mode

必须设置为rest

canal.conf.canalAdapters.groups.outerAdapters.properties.security.auth

<京东云ES实例的账号>:<密码>。例如elastic:es_password

canal.conf.canalAdapters.groups.outerAdapters.properties.cluster.name

京东云ES实例的ID,可在实例的基本信息概览页面获取。

2)修改conf/es/*.yml文件,定义MySQL数据到ES数据的映射字段
vi conf/es/*.yml

配置项

说明

esMapping._index

创建表和字段章节中,在ES实例中所创建的索引的名称。本文使用es_test。

esMapping._type

创建表和字段章节中,在ES实例中所创建的索引的类型。本文使用_doc。

esMapping._id

需要同步到ES实例的文档的id,可自定义。本文使用_id。

esMapping.sql

SQL语句,用来查询需要同步到ES中的字段。本文使用select t.id as _id,t.id,t.count,t.name,t.color from es_test t;

3)启动Canal-adapter服务

./bin/startup.sh

4.验证Canal自动导出增量数据
1)在MySQL数据库中,新增/修改/删除数据库中es_test表的数据。
insert `elasticsearch`.`es_test`(`count`,`id`,`name`,`color`) values('11',2,'canal_test2','red');
2)登录kibana控制台的DevTools页面,查询同步过来的数据。

GET /es_test/_search

方案二:使用go-mysql-elasticsearch 开源工具同步数据到 ES

使用前提:

1.通过mysql binlog 将数据同步到 ES 集群,只能使用 ROW 模式,因为只有 ROW 模式才能知道 mysql 中的数据的修改内容。ROW 模式完整地记录了要修改的某行数据更新前以及更改后所有字段的值,而 STATEMENT 模式只记录了 UPDATE 操作的 SQL 语句。我们要将 MySQL 的数据实时同步到 ES,只能选择 ROW 模式的 binlog,获取并解析 binlog 日志的数据内容,执行 ES document api,将数据同步到 ES 集群中。

2.在 ES 集群中创建 building index,因为该工具并没有使用 ES 的 auto create index 功能,如果 index 不存在会报错 

使用限制:

  1. mysql binlog 必须是 ROW 模式(腾讯云 TencentDB for MySQL 产品默认开启)。
  2. 要同步的 MySQL 数据表必须包含主键,否则直接忽略。如果数据表没有主键,UPDATE 和 DELETE 操作就会因为在 ES 中找不到对应的 document 而无法进行同步。
  3. 不支持程序运行过程中修改表结构。
  4. 要赋予用于连接 MySQL 的账户 RELOAD 权限、REPLICATION 权限。

操作步骤:

1.使用mysqldump 工具将 MySQL数据库中的数据进行全量导出,并生成一个新的binlog日志文件记录后续数据操作。

2.使用 go-mysql-elasticsearch 开源工具同步数据到 ES。工具安装后修改etc/river.toml配置文件,设置源端表地址和目的地址即可。

方案三:使用mypipe 同步数据到 ES

使用限制和使用前提同方案二

操作步骤:

1.使用mysqldump 工具将 MySQL数据库中的数据进行全量导出,并生成一个新的binlog日志文件记录后续数据操作。

2.利用mypipe 将 binlog 日志内容解析后推送到 kafka broker,从 kafka 消费数据后,再写入 ES。mypipe 只支持 binlog 同步,不支持存量数据同步,即 mypipe 程序启动后无法对 MySQL 中已经存在的数据进行同。mypipe 只同步 binlog,需要同步数据到 ES 需要另行开发。