canal+RabbitMQ实现Redis与Mysql解决双写一致性

发布时间 2023-06-06 11:46:39作者: stepForward-

canal+RabbitMQ实现Redis与Mysql的数据最终一致性问题

配置mysql(windows版本)

image-20230603221946517

​ 打开my.ini文件:

image-20230603222107538

​ 然后net stop mysql关闭mysql,再net start mysql打开mysql即可。

CREATE USER canal IDENTIFIED BY 'canal';
GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%';       
FLUSH PRIVILEGES;

​ 创建canal用户并且赋予所有权限。

select host,user,plugin from mysql.user;

​ 查看是否成功创建canal用户。

配置canal(windows版本)

​ 我下载的版本是canal-deployer-1.1.7。

​ 目录结构:

image-20230603222728139

​ 打开conf/example/instance.properties文件:

​ 修改下面配置:

​ 通过ctrl+F进行快速定位修改即可,如果你和我下载是一样的版本那么就都不用修改了

# position info mysql的ip+端口号
canal.instance.master.address=127.0.0.1:3306


# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal


# mq config  与RabbitMQ的路由key
canal.mq.topic=example

​ 打开conf/canal.properties文件:

​ 修改下面配置:

​ 通过ctrl+F进行快速定位修改

image-20230603223706414

​ 修改成自己的配置,到时在java代码中要跟你配置的保持一致。

image-20230603223807825

​ 这个修改成rabbitMQ,默认这里是tcp的,一开始我没改发现队列中没有消息无法生效。

代码

​ 我这里是使用某某点评进行修改的,初始是先更新数据再删除缓存的操作,操作的数据是对数据一致性要求没那么高的商品缓存,代码如下:

RabbitMqConfig.java

//使用canal和RabbitMQ实现异步更新操作
    public static final String CANAL_QUEUE = "canal_queue";//队列
    public static final String DIRECT_EXCHANGE = "mysql";//交换机,要与canal中配置的相同
    public static final String ROUTING_KEY = "example";//routing-key,要与/conf/instance.properties中canal.mq.topic=example中配置的相同

    /**
     * 定义队列
     **/
    @Bean
    public Queue canalQueue(){
        return new Queue(CANAL_QUEUE,true);
    }

    /**
     * 定义直连交换机
     **/
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange(DIRECT_EXCHANGE);
    }

    /**
     * 队列和交换机绑定
     **/
    @Bean
    public Binding orderBinding() {
        return BindingBuilder.bind(canalQueue()).to(directExchange()).with(ROUTING_KEY);
    }

shopService层代码

@Override
@Transactional
public Result update(Shop shop) {
    Long id = shop.getId();
    if (id == null) {
        return Result.fail("商铺id不能为空");
    }
    //先更新数据库
    this.updateById(shop);
    //再删除缓存
    //stringRedisTemplate.delete(CACHE_SHOP_KEY + id);
    //这里使用RabbitMQ+canal实现异步删除缓存
    return Result.ok();
}

​ 创建一个从canal监听过来被RabbitMQ消费的的一个返回结果类:

@Data
public class CanalMessage<T> {
    /**
     * 更新后的数据
     */
    private List<T> data;
    /**
     * 数据库名
     */
    private String database;
    /**
     * binlog executeTime, 执行耗时
     */
    private long es;
    /**
     * id
     */
    private int id;
    /**
     * 标识是否是ddl语句,比如create table/drop table
     */
    private boolean isDdl;

    /**
     * 更新前的有变更的列的数据
     */
    private List<Map<String, Object>> old;
    /**
     * 主键字段名
     */
    private List<String> pkNames;
    /**
     * ddl/query的sql语句
     */
    private String sql;

    /**
     * 表名
     */
    private String table;
    /**
     * dml build timeStamp
     */
    private long ts;
    /**
     * 事件类型:INSERT/UPDATE/DELETE
     */
    private String type;
}

​ 我当时以为监听过来的直接是源数据,所以直接去将他json序列化,发现不起作用,原来他返回的是不仅仅是更新后的数据本身,然后其他数据,如sql语句,哪张表被更新了等等。

​ 监听canal_queue队列:

    @RabbitListener(queues = "canal_queue")
    public void deleteShopCache(Message message, Channel channel) {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        String realMessage = new String(message.getBody(), StandardCharsets.UTF_8);
        CanalMessage<Shop> canalMessage = JSON.parseObject(realMessage, CanalMessage.class);
        if (canalMessage != null && canalMessage.getData() != null) {
            //List<Shop> data = canalMessage.getData(); 这样的写法会报错,jsonObject无法转换为shop类型
            List<Shop> data = JSON.parseArray(canalMessage.getData().toString(), Shop.class);
            Shop shop = data.get(0);
            if (shop!=null){
                Long shopId = shop.getId();
                //消费数据:将数据从缓存中删除
                stringRedisTemplate.delete("cache:shop:" + shopId);
                try {
                    channel.basicAck(deliveryTag, false);
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        }
    }

​ 踩坑点:

  1. 正如前面所说,message保存了不仅仅是更新后的数据,还保存了其他信息,所以我要创建一个类去接收。
  2. message包含了对象+list类型,所以无法直接转换为json对象后直接获取里面的list数据,要使用parseArray()方法获取list数据。

测试

​ 首先先将数据缓存到redis中:

image-20230604112858848

​ 然后使用apiFox修改数据:

image-20230604112942825

​ 将RabbitMQ的数据拿出来后进行json格式化后:

image-20230604113132406

​ 其中data里面就保存了更新后的数据。

​ 可以看到缓存中的数据没有了。

image-20230604113252944