net6+canal 实现数据实时同步数据

发布时间 2023-03-31 17:11:24作者: 冼润伟

前言:

  前几天在做公司的一个项目,需求:把A项目数据,同步到B项目,A\B项目数据结构不一样(我们用的是mysql),B项目可以接受几秒内的数据同步;我当时第一时间就想到了flink和canal,然后发现flink并不支持net,所以转用canal;那么接下来我们看看canal到底是什么;

canal简述

  canal是阿里巴巴 MySQL binlog 增量订阅&消费组件;canal是通过读取binlog文件进行解析后发送给canal客户端的一个组件;简单来说就是通过binlog文件变化而通知下游的一个组件;优缺点在这里简单的说一下,优点非常明细:1.支持业务解耦;缺点也非常明显:因为mysql修改完数据后,才会进行写入binlog,所以会有一定的延迟;下面是官网的流程图:

 

 

 在使用canal前我们需要准备什么?

  1.docker(由于本人是docker安装的,因为安装方便)

  2.canal-admin(canal可视化管理工具,可不装)

  3.canal-server (canal的服务端)

一、使用docker安装canal-admin和canal-server;(这里需要注意的是canal.admin.passwd,这个参数是加密后的参数,初始值都是默认下面这个,详情可以去官网查看https://github.com/alibaba/canal)

#拉取canal adm包
#然后安装canal adm
docker run -d -it -p 8089:8089  -e server.port=8089 -e canal.adminUser=admin -e canal.adminPasswd=admin --name=canal-admin -m 1024m canal/canal-admin:v1.1.6

#拉取canal server包
#然后安装 canal server
docker run -d -it -p 11111:11111 -e canal.admin.manager=10.8.0.5:8089 -e canal.admin.port=11110 -e canal.admin.user=admin -e canal.admin.passwd=4ACFE3202A5FF5CF467898FC58AAB1D615029441 --name=canal-server -m 2048m canal/canal-server

  安装完后即可进行预览,下面成功安装后,server才会出现canal server的记录,否则是空的

二、需要配置mysql的binlog(我们公司用的是阿里云rds mysql,默认是开启;如果是自己配置binlog,也很简单,网上资料很多可以找找)

三、配置instance.propertios

  我们只需要在instance里添加主机,canal会自带模板的,直接点击模板,然后配置自己需要的参数,参数说明需要在官网查看,这里就不做过多的参数说明了,简单说一下这里只需要我们添加数据库的链接,账号,密码即可;

 

 

 四、在net 6变形相应的代码

  首先我们需要引入依赖:CanalSharp;接着下面可以参考下我的demo代码,值得注意的是SubscribeAsync方法可以设置自己想要的库或表

class Program
{
    static ILogger<SimpleCanalConnection> logger;
    static ILogger<SimpleCanalConnection> _logger;

    static async Task Main(string[] args)
    {
        var loggerFactory = LoggerFactory.Create(builder =>
        {
            builder
                .AddFilter("Microsoft", LogLevel.Debug)
                .AddFilter("System", LogLevel.Information);
            //builder.AddConsole();

        });
        logger = loggerFactory.CreateLogger<SimpleCanalConnection>();
        
        var conn = new SimpleCanalConnection(new SimpleCanalOptions("175.18.81.27", 11111, "1"), logger);//第三个是客户端参数
        ////连接到 Canal Server
        //await conn.ConnectAsync();
        ////订阅
        //await conn.SubscribeAsync("xuehua_grouppurchase\\..*,xuehua_goods\\..*");



        do
        {
            try
            {
                if(conn.State==ConnectionState.Closed)
                {
                    //连接到 Canal Server
                    await conn.ConnectAsync();
                    //订阅
                    await conn.SubscribeAsync("xuehua_grouppurchase\\..*,xuehua_goods\\..*");
                }
                var msg = await conn.GetAsync(1024);
                PrintEntry(msg.Entries);
                await Task.Delay(300);
            }
            catch (Exception ex)
            {
                Thread.Sleep(5000);
                Console.WriteLine(ex.Message);
                //logger.LogError(ex, "异常");
            }
        } while (true);
    }



    private static void PrintEntry(List<Entry> entries)
    {
        foreach (var entry in entries)
        {
            if (entry.EntryType == EntryType.Transactionbegin || entry.EntryType == EntryType.Transactionend)
            {
                continue;
            }

            RowChange rowChange = null;

            try
            {
                rowChange = RowChange.Parser.ParseFrom(entry.StoreValue);
            }
            catch (Exception e)
            {
                logger.LogError(e.ToString());
            }

            if (rowChange != null)
            {
                EventType eventType = rowChange.EventType;
                Console.WriteLine($"[名称:]{entry.Header.SchemaName},{entry.Header.TableName}");
                logger.LogInformation(
                    $"================> binlog[{entry.Header.LogfileName}:{entry.Header.LogfileOffset}] , name[{entry.Header.SchemaName},{entry.Header.TableName}] , eventType :{eventType}");

                foreach (var rowData in rowChange.RowDatas)
                {
                    if (eventType == EventType.Delete)
                    {
                        PrintColumn(rowData.BeforeColumns.ToList());
                    }
                    else if (eventType == EventType.Insert)
                    {
                        PrintColumn(rowData.AfterColumns.ToList());
                    }
                    else
                    {
                        logger.LogInformation("-------> before");
                        PrintColumn(rowData.BeforeColumns.ToList());
                        logger.LogInformation("-------> after");
                        PrintColumn(rowData.AfterColumns.ToList());
                    }
                }
            }
        }
        static void PrintColumn(List<Column> columns)
        {
            Console.WriteLine($"[整个表参数:]{Newtonsoft.Json.JsonConvert.SerializeObject(columns)}");
            foreach (var column in columns)
            {
                Console.WriteLine($"[表参数:]{column.Name} : {column.Value}  update=  {column.Updated}");
            }
        }
    }
}

  最后来看看我的运行成果,在正常情况下基本1秒内能进行数据同步完成

 

 

遇到的问题:

  错误信息:CanalSharp.CanalConnectionException:“Received an error returned by the server: something goes wrong with channel:[id: 0x47ae2285, /172.17.0.1:56572 => /172.17.0.3:11111], exception=com.alibaba.otter.canal.server.exception.CanalServerException: destination:example should start first

第一次配置后,运行代码会出现以下的代码报错,翻阅了资料看到下面的instance名称,如果不存在example的话就会出现上面的错,估计是CanalSharp里面的问题;

 

 总结:

  本人最后是实现了canal+kafka进行数据同步方案;但很遗憾的是我们生产环境用的是polardb mysql,所以最后没有把方案上到生产环境;原因是polardb mysql开启binlog,会下降10%的写性能(官网上有明确说明),我们行业是读多写多的,所以就放弃了该方案