使用maxwell实现数据库主从同步

发布时间 2023-07-26 12:56:59作者: 夏雪冬蝉

前置条件

maxwell使用1.29.2版本,再高的版本不支持JDK1.8。

使用Maxwell之前需要准备搭建以下环境

 在https://www.cnblogs.com/szhNJUPT/p/17574193.html有详细搭建过程

mysql采用5.7.43,尝试过mysql8.0版本,但是由于utf8mb3字符集在mysql8.0版本被舍弃,导致maxwell连接失败。

数据库的创建

由于虚拟机磁盘不够,我在Windows10上进行了数据库的搭建。主库的搭建如下图

在master目录下创建一个名为my.ini的文件

CMD以管理员身份进入master的bin目录下,并执行初始化命令

mysqld --initialize --user=mysql --console

初始化成功

初始化完成后,执行安装服务的命令,安装成功!

启动服务

进入mysql

设置主库的新密码

Navicat测试连接

 

Maxwell的使用

 将安装包解压到opt/install中

初始化Maxwell元数据库

  • 在MySQL中建立一个maxwell库用于存储Maxwell元数据
    CREATE DATABASE maxwell;
  • 设置密码安全级别
    set global validate_password_policy=0;
    set global validate_password_length=4;
  • 分配一个账号用于操作该数据库
    CREATE USER 'maxwell'@'%' IDENTIFIED WITH mysql_native_password BY 'maxwell';
    GRANT ALL PRIVILEGES ON maxwell.* TO 'maxwell'@'%';
  • 分配这个账号监控其他数据库的权限
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO maxwell@'%';
    FLUSH PRIVILEGES;

启动zookeeper以及kafka

./zkServer.sh start
bin/kafka-server-start.sh config/server.properties &

启动Maxwell

进入opt/install/maxwell-1.29.2/目录

  • 使用命令行参数启动maxwell进程(控制台&&kafka)
    bin/maxwell --user='maxwell' --password='maxwell' --host='192.168.75.1' --post=3310 --producer=stdout

      bin/maxwell --user='maxwell' --password='maxwell' --host='192.168.75.1' --port=3310 --producer=kafka    --kafka.bootstrap.servers=192.168.75.137:9092 --kafka_topic=maxwell

    
    # 注意:host和post填写mysql宿机
  • 配置文件启动
    bin/maxwell –config ./config.properties

     

 此时能从主库读入数据到kafka

 

接下来写代码实现主从同步

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.json.JSONObject;

import java.sql.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;

public class KafkaToMySQLSync {
    public static void main(String[] args) {
        String kafkaBrokers = "192.168.75.137:9092"; // 设置Kafka broker地址
        String kafkaTopic = "maxwell"; // 设置Kafka topic名称
        String mysqlUrl = "jdbc:mysql://localhost:3311"; // 设置从库MySQL连接地址
        String mysqlUsername = "root"; // 设置MySQL用户名
        String mysqlPassword = "slave"; // 设置MySQL密码

        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "kafka-to-mysql-sync-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singleton(kafkaTopic));

        try {
            Connection connection = DriverManager.getConnection(mysqlUrl, mysqlUsername, mysqlPassword);
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    String key = record.key();
                    String value = record.value();

                    // 解析JSON格式的Kafka消息
                    JSONObject valueJson = new JSONObject(value);
                    String database = valueJson.getString("database");
                    String table = valueJson.getString("table");
                    String type = valueJson.getString("type");
                    JSONObject data = valueJson.getJSONObject("data");

                    // 根据type和表的结构构建相应的PreparedStatement
                    PreparedStatement stmt = null;
                    switch (type) {
                        case "insert":
                            stmt = buildInsertStatement(connection, database, table, data);
                            break;
                        case "update":
                            stmt = buildUpdateStatement(connection, database, table, data);
                            break;
                        case "delete":
                            stmt = buildDeleteStatement(connection, database, table, data);
                            break;
                        default:
                            // 不支持的操作类型,忽略
                            continue;
                    }

                    if (stmt != null) {
                        try {
                            stmt.executeUpdate();
                        } catch (SQLException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        } catch (SQLException e) {
            e.printStackTrace();
        } finally {
            consumer.close();
        }
    }

    // 根据表的结构构建INSERT语句
    private static PreparedStatement buildInsertStatement(Connection connection, String database, String table, JSONObject data) {
        StringBuilder columns = new StringBuilder();
        StringBuilder values = new StringBuilder();
        List<Object> fieldValues = new ArrayList<>();

        try {
            // 获取表的字段信息
            PreparedStatement metadataStmt = connection.prepareStatement("SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ?");
            metadataStmt.setString(1, database);
            metadataStmt.setString(2, table);
            ResultSet resultSet = metadataStmt.executeQuery();

            while (resultSet.next()) {
                String columnName = resultSet.getString("COLUMN_NAME");
                Object fieldValue = data.opt(columnName);

                if (fieldValue != null) {
                    // 构造SQL语句的字段部分和值部分
                    columns.append("`").append(columnName).append("`, ");
                    values.append("?, ");
                    fieldValues.add(fieldValue);
                }
            }

            columns.delete(columns.length() - 2, columns.length());
            values.delete(values.length() - 2, values.length());

            String sql = "INSERT INTO `" + database + "`.`" + table + "` (" + columns.toString() + ") VALUES (" + values.toString() + ")";
            PreparedStatement stmt = connection.prepareStatement(sql);

            // 为占位符设置值
            for (int i = 0; i < fieldValues.size(); i++) {
                stmt.setObject(i + 1, fieldValues.get(i));
            }

            return stmt;
        } catch (SQLException e) {
            e.printStackTrace();
        }

        return null;
    }

    // 根据表的结构构建UPDATE语句
    private static PreparedStatement buildUpdateStatement(Connection connection, String database, String table, JSONObject data) {
        StringBuilder updateSet = new StringBuilder();
        List<Object> fieldValues = new ArrayList<>();

        try {
            // 获取表的字段信息
            PreparedStatement metadataStmt = connection.prepareStatement("SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ?");
            metadataStmt.setString(1, database);
            metadataStmt.setString(2, table);
            ResultSet resultSet = metadataStmt.executeQuery();

            while (resultSet.next()) {
                String columnName = resultSet.getString("COLUMN_NAME");
                Object fieldValue = data.opt(columnName);

                if (fieldValue != null && !columnName.equalsIgnoreCase("id")) {
                    // 构造UPDATE语句的SET部分,排除id字段
                    updateSet.append("`").append(columnName).append("` = ?, ");
                    fieldValues.add(fieldValue);
                }
            }

            updateSet.delete(updateSet.length() - 2, updateSet.length());

            // 获取id字段的值
            Object idValue = data.opt("id");

            String sql = "UPDATE `" + database + "`.`" + table + "` SET " + updateSet.toString() + " WHERE `id` = ?";
            PreparedStatement stmt = connection.prepareStatement(sql);

            // 为占位符设置更新值
            for (int i = 0; i < fieldValues.size(); i++) {
                stmt.setObject(i + 1, fieldValues.get(i));
            }

            // 设置id字段的值
            stmt.setObject(fieldValues.size() + 1, idValue);

            return stmt;
        } catch (SQLException e) {
            e.printStackTrace();
        }

        return null;
    }

    // 根据表的结构构建DELETE语句
    private static PreparedStatement buildDeleteStatement(Connection connection, String database, String table, JSONObject data) {
        try {
            // 获取id字段的值
            Object idValue = data.opt("id");

            String sql = "DELETE FROM `" + database + "`.`" + table + "` WHERE `id` = ?";
            PreparedStatement stmt = connection.prepareStatement(sql);

            // 设置id字段的值
            stmt.setObject(1, idValue);

            return stmt;
        } catch (SQLException e) {
            e.printStackTrace();
        }

        return null;
    }
}