监听mysql binlog

发布时间 2023-06-12 17:02:20作者: hsql

1,有一款开源的工具 maxwell,可以直接用,它将变更信息推送到kafka或者redis等,看了一下源码,主要是用到了mysql-binlog-connector-java,那么由此也可以自己做拓展

2,添加maven

        <dependency>
            <groupId>com.zendesk</groupId>
            <artifactId>mysql-binlog-connector-java</artifactId>
            <version>0.23.3</version>
        </dependency>

3,使用:创建BinaryLogClient 监听binlog

配置文件:

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

@Component
@Data
@ConfigurationProperties(prefix = "binlog")
public class BinLogConfig {

    /**
     * 是否监听
     */
    private String status;

    private String host;

    private Integer port;

    private String username;

    private String password;

}

关键代码:

package com.cfam.quotaplatform.service.binlog;

import com.cfam.quotaplatform.config.BinLogConfig;
import com.cfam.quotaplatform.entity.TriTable;
import com.cfam.quotaplatform.entity.TriTableChangeRecord;
import com.cfam.quotaplatform.entity.dto.BinLogListenTable;
import com.cfam.quotaplatform.entity.dto.BinLogListenTablePk;
import com.cfam.quotaplatform.mq.publish.BinLogTaskPublisher;
import com.cfam.quotaplatform.service.DbQueryService;
import com.cfam.quotaplatform.service.IRedisService;
import com.cfam.quotaplatform.service.TriTableService;
import com.cfam.quotaplatform.util.ObjectHelper;
import com.cfam.quotaplatform.util.RedisUtil;
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.EventData;
import com.github.shyiko.mysql.binlog.event.TableMapEventData;
import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;
import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;

import javax.annotation.Resource;
import java.io.IOException;
import java.io.Serializable;
import java.util.*;
import java.util.stream.Collectors;

/**
 * @author makejava
 * @module
 * @since 2023/6/2 13:37
 */
@Slf4j
public class BinLogListener{

    @Autowired
    private BinLogConfig binLogConfig;

    private final Map<String, BinLogListenTable> tableMapCache = new HashMap<>();

    public final String LOCK_KEY = "BIN_LOG_LOCK_KEY";

    @Resource
    private TriTableService triTableService;

    @Resource
    private DbQueryService dbQueryService;

    @Resource
    private BinLogTaskPublisher binLogTaskPublisher;

    @Resource
    private RedisUtil redisUtil;
    
    @Resource
    private IRedisService redisService;

    public void start() {
        Boolean notOverdueLock = redisUtil.getNotOverdueLock(LOCK_KEY, "1");
        if(notOverdueLock) {
            log.info("初始化Binlog监听器...");
            BinaryLogClient client = new BinaryLogClient(binLogConfig.getHost(), binLogConfig.getPort(), binLogConfig.getUsername(), binLogConfig.getPassword());
            client.setServerId(3);
            client.registerEventListener(event -> {
                EventData data = event.getData();
                if (data instanceof TableMapEventData) {
                    TableMapEventData tableMapEventData = (TableMapEventData) data;
                    String database = tableMapEventData.getDatabase();
                    String table = tableMapEventData.getTable();
                    triTableService.refreshNeedListenerTableList(TriTableService.exampleRefreshTime);
                    List<TriTable> collect = TriTableService.needListenTableList.stream().filter(e -> e.getDataSourceKey().equals(database) && e.getTableName().equals(table)).collect(Collectors.toList());
                    if(ObjectHelper.isNull(collect)){
                        return;
                    }
                    long tableId = tableMapEventData.getTableId();
                    if(ObjectHelper.isNull(tableMapCache.get(String.valueOf(tableId)))){
                        BinLogListenTable binLogListenTable = new BinLogListenTable();
                        binLogListenTable.setTriTable(collect.get(0));
                        List<String> pkNameList = dbQueryService.getPkName(database, table);
                        List<BinLogListenTablePk> binLogListenTablePkList= new ArrayList<>();
                        for (String pkName : pkNameList) {
                            String positionSql = "select ORDINAL_POSITION from information_schema.`COLUMNS` t WHERE t.TABLE_NAME = '"+table+"' and t.COLUMN_NAME = '"+pkName+"'";
                            Object position = dbQueryService.getOneBySql(database, positionSql, null);
                            BinLogListenTablePk binLogListenTablePk = new BinLogListenTablePk();
                            binLogListenTablePk.setPkName(pkName);
                            binLogListenTablePk.setPkPosition((Integer) position);
                            binLogListenTablePkList.add(binLogListenTablePk);
                        }
                        binLogListenTable.setBinLogListenTablePkList(binLogListenTablePkList);
                        tableMapCache.put(String.valueOf(tableId), binLogListenTable);
                    }
                }
                if (data instanceof UpdateRowsEventData) {
                    UpdateRowsEventData updateRowsEventData = (UpdateRowsEventData) data;
                    long updataTableId = updateRowsEventData.getTableId();
                    BinLogListenTable binLogListenTable = tableMapCache.get(String.valueOf(updataTableId));
                    if(ObjectHelper.isNull(binLogListenTable)){
                        return ;
                    }
                    TriTable triTable = binLogListenTable.getTriTable();
                    List<BinLogListenTablePk> binLogListenTablePkList = binLogListenTable.getBinLogListenTablePkList();
                    for (Map.Entry<Serializable[], Serializable[]> row : updateRowsEventData.getRows()) {
                        List<Serializable> entries = Arrays.asList(row.getValue());
                        TriTableChangeRecord triTableChangeRecord = new TriTableChangeRecord();
                        triTableChangeRecord.setSubjectId(triTable.getSubjectId());
                        triTableChangeRecord.setDataSourceKey(triTable.getDataSourceKey());
                        triTableChangeRecord.setTableName(triTable.getTableName());
                        triTableChangeRecord.setPkName(StringUtils.join(binLogListenTablePkList.stream().map(BinLogListenTablePk::getPkName).collect(Collectors.toList()) , ","));
                        List<Integer> pkPositionList = binLogListenTablePkList.stream().map(BinLogListenTablePk::getPkPosition).collect(Collectors.toList());
                        List<Object> pkValueList = new ArrayList<>();
                        for (Integer pkPosition : pkPositionList) {
                            pkValueList.add(entries.get(pkPosition - 1));
                        }
                        triTableChangeRecord.setPkValue(StringUtils.join(pkValueList, ","));
                        triTableChangeRecord.setType("UPDATE");
                        binLogTaskPublisher.publish(triTableChangeRecord);
                    }
                }
                else if (data instanceof WriteRowsEventData) {
                    WriteRowsEventData writeRowsEventData = (WriteRowsEventData) data;
                    long updataTableId = writeRowsEventData.getTableId();
                    BinLogListenTable binLogListenTable = tableMapCache.get(String.valueOf(updataTableId));
                    if(ObjectHelper.isNull(binLogListenTable)){
                        return ;
                    }
                    TriTable triTable = binLogListenTable.getTriTable();
                    List<BinLogListenTablePk> binLogListenTablePkList = binLogListenTable.getBinLogListenTablePkList();
                    for (Serializable[] row : writeRowsEventData.getRows()) {
                        TriTableChangeRecord triTableChangeRecord = new TriTableChangeRecord();
                        triTableChangeRecord.setSubjectId(triTable.getSubjectId());
                        triTableChangeRecord.setDataSourceKey(triTable.getDataSourceKey());
                        triTableChangeRecord.setTableName(triTable.getTableName());
                        triTableChangeRecord.setPkName(StringUtils.join(binLogListenTablePkList.stream().map(BinLogListenTablePk::getPkName).collect(Collectors.toList()) , ","));
                        List<Integer> pkPositionList = binLogListenTablePkList.stream().map(BinLogListenTablePk::getPkPosition).collect(Collectors.toList());
                        List<Object> pkValueList = new ArrayList<>();
                        for (Integer pkPosition : pkPositionList) {
                            pkValueList.add(row[pkPosition - 1]);
                        }
                        triTableChangeRecord.setPkValue(StringUtils.join(pkValueList, ","));
                        triTableChangeRecord.setType("INSERT");
                        binLogTaskPublisher.publish(triTableChangeRecord);
                    }
                }
            });
            try {
                client.connect();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

}

其中:1)LOCK_KEY 是项目多实例,加锁防止多个实例都启动监听,最终保证只有一个实例监听

2)triTableService是一张记录了哪些表的变化需要记录的表

此方案思路为,监听全部表变化(也没办法监听部分的)然后根据自己的配置做筛选,然后组装json放到redis队列,由队列消费者来消费。

也可以使用maxwell工具发送到kafka,创建kafka消费器,但是这样有一个问题是kafka做不到平均分配任务到监听同一个toptic的消费者,如果业务方法太耗时间还要转一层到redis