当前项目在更新数据的时候,是单纯的先更数据库再删除缓存,没有加入删除失败重试的机制,通过选型,选择了Canal进行监听Mysql的操作,并且接管删除缓存和重试操作。

首先就是得知道Canal的原理,为什么能实现以上流程。

Canal的原理

要了解Canal的原理,首先得知道Mysql主从复制的原理

Mysql主从复制

第一步: 数据库发生变化,Master binlog

第二步: Slave 进程连接 Master,把 binlog 里的内容 复制 到自己的 relay log 中。

第三步:Slave 另一个进程 读取 relay log,并把它里面的操作 执行 在自己的数据库上。

通俗易懂来说就是 「抄作业」

Canal实现监听的机制

第一步:canal模拟slave的交互协议,伪装自己是slave,向master发送dump协议(其实就是不断传输的一个动作)

第二步:master收到dump请求,建立连接,开始推送binlog给canal

第三步:canal解析binlog(字节流)

Canal实现同步的机制

还在学习......

Canal实现监听

安装Canal

安装完成

修改配置

# position info
canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=binlog.000001
canal.instance.master.position=157

配置以上信息及账号密码

启动Canal

这里发生报错了,一顿排查之后发现是因为我的数据库的binlog不叫binlog,叫DESKTOP-xxxx-bin.000001,修改配置文件之后启动成功

SHOW MASTER STATUS;

image-20251019165338280
image-20251019165338280

Spring配置

 <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.4</version>
        </dependency>
canal:
  # 部署到208上,port默认为11111
  server: 127.0.0.1:11111
  # 实例名,与之前复制example之后那个文件夹名称一致
  destination: redis
# 设置canal消息日志打印级别
logging:
  level:
    top.javatool.canal.client: warn

创建账号

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

创建监听类

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.net.InetSocketAddress;
import java.util.List;

@Component
public class CanalListener {
    /**
     * 解析数据
     *
     * @param beforeColumns 修改、删除后的数据
     * @param afterColumns  新增、修改、删除前的数据
     * @param dbName        数据库名字
     * @param tableName     表大的名字
     * @param eventType     操作类型(INSERT,UPDATE,DELETE)
     * @param timestamp     消耗时间
     */
    private static void dataDetails(List<CanalEntry.Column> beforeColumns, List<CanalEntry.Column> afterColumns, String dbName, String tableName, CanalEntry.EventType eventType, long timestamp) {

        System.out.println("数据库:" + dbName);
        System.out.println("表名:" + tableName);
        System.out.println("操作类型:" + eventType);
        if (CanalEntry.EventType.INSERT.equals(eventType)) {
            System.out.println("这是一条新增的数据");
        } else if (CanalEntry.EventType.DELETE.equals(eventType)) {
            System.out.println("删除数据:" + afterColumns);
        } else {
            System.out.println("更新数据:更新前数据--" + afterColumns);
            System.out.println("更新数据:更新后数据--" + beforeColumns);

        }
        System.out.println("操作时间:" + timestamp);
    }

    @PostConstruct
    public void run() throws Exception {
        CanalConnector conn = CanalConnectors.newSingleConnector(new InetSocketAddress("124.111.11.111", 11111), "example", null, null);
        while (true) {
            conn.connect();
            conn.subscribe(".*\\..*");
            // 回滚到未进行ack的地方
            conn.rollback();
            // 获取数据 每次获取一百条改变数据
            Message message = conn.getWithoutAck(100);
            //获取这条消息的id
            long id = message.getId();
            int size = message.getEntries().size();
            if (id != -1 && size > 0) {
                // 数据解析
                analysis(message.getEntries());
            } else {
                //暂停1秒防止重复链接数据库
                Thread.sleep(1000);
            }
            // 确认消费完成这条消息
            conn.ack(message.getId());
            // 关闭连接
            conn.disconnect();
        }
    }

    /**
     * 数据解析
     */
    private void analysis(List<CanalEntry.Entry> entries) {
        for (CanalEntry.Entry entry : entries) {
            // 解析binlog
            CanalEntry.RowChange rowChange = null;
            try {
                rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("解析出现异常 data:" + entry.toString(), e);
            }
            if (rowChange != null) {
                // 获取操作类型
                CanalEntry.EventType eventType = rowChange.getEventType();
                // 获取当前操作所属的数据库
                String dbName = entry.getHeader().getSchemaName();
                // 获取当前操作所属的表
                String tableName = entry.getHeader().getTableName();
                // 事务提交时间
                long timestamp = entry.getHeader().getExecuteTime();
                for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                    dataDetails(rowData.getBeforeColumnsList(), rowData.getAfterColumnsList(), dbName, tableName, eventType, timestamp);

                }
            }
        }
    }

}

问题

问题1:这里面遇到了一个问题,就是即使导入完坐标了,但是还是没有protocol这个类,导致很多报错

解决1:发现是由于版本比帖子的版本高,canal取消了这个,解决办法是再导入一个包

<dependency>
  <groupId>com.alibaba.otter</groupId>
  <artifactId>canal.protocol</artifactId>
  <version>1.1.6</version>
</dependency>

问题2:运行之后,Springboot类反倒是启动不起来了,一直卡住了。

解决2:发现原来是上面的代码有问题,详细状况:

    1、@PostConstruct 方法默认在主线程运行,run方法有个无限循环。

    2、Spring容器初始化之后,会调用带上面注解的方法,然后主线程被永久阻塞。

    3、用一个新线程异步启动run方法new Thread。

成果

数据库:cixingji
表名:user
操作类型:UPDATE
更新数据:更新前数据--[index: 0
等等一堆

Canal实现同步Redis

两种策略

经过了解和搜索,发现有两种同步方式:

1.Kafka手动处理Json消息解析

此方法的最大的坏处在于,只要存在多个表,那内部逻辑就是一大堆的If else的循环,非常耦合。后续想要同步到其他组件比如ElasticSearch,又需要写新的逻辑

 // 步骤 1: 【必须】手动添加 if/else 链来分发到不同的 ES 逻辑
    if ("commodity".equals(table)) {
        // 步骤 2: 【必须】实现转换逻辑,将 Map 转换为 CommodityDocument
        for (Map<String, String> row : message.getData()) {
            CommodityDocument doc = convertMapToCommodityDocument(row); // 转换方法
            esService.index("commodity_index", doc);
        }
        
    } else if ("order".equals(table)) {
        // 步骤 3: 【必须】实现转换逻辑,将 Map 转换为 OrderDocument
        for (Map<String, String> row : message.getData()) {
            OrderDocument doc = convertMapToOrderDocument(row); // 转换方法
            esService.index("order_index", doc);
        }

2.引入CanalStarter及EntryHandler实现类

这种方法的实现起来还算可以,并没有特别复杂,主要在于耦合度很低,扩展性也很强。

原理就是Handler来自动化字段反射和类型转换,以及Handler匹配字段和实体类通过注解

其他同上

Spring配置

<dependency>
  <groupId>top.javatool</groupId>
  <artifactId>canal-spring-boot-starter</artifactId>
  <version>1.2.1-RELEASE</version>
</dependency>

创建Handler类

@CanalTable("user")
@Component
@AllArgsConstructor
@Slf4j
public class NmsUserHandler  implements EntryHandler<User> {

    private final RedisTemplate<Object,Object> redisTemplate;

    @Override
    public void insert(User nmsUser) {
        log.info("[新增]"+nmsUser.toString());
        redisTemplate.opsForValue().set("user:"+nmsUser.getUsername(),nmsUser);
        Object o = redisTemplate.opsForValue().get("user:ns0");
        System.out.println(o);
    }

    @Override
    public void update(User before, User after) {
        log.info("[更新]"+after.toString());
        redisTemplate.opsForValue().set("user:"+after.getUsername(),after);
        Object o = redisTemplate.opsForValue().get("user:snn7");
        System.out.println(o);
    }

    @Override
    public void delete(User nmsUser) {
        log.info("[删除]"+nmsUser.getUsername());
        redisTemplate.delete("user:"+nmsUser.getUsername());
    }
}

这里遇到了问题:项目的RedisTemplate的泛型是<String,Object> ,不符合这个同步的代码

解决:在RedisConfig加入一个新的方法,叫别的名字,泛型返回值改成<Object,Object>,即可使用

配置同步的表

@AllArgsConstructor
@NoArgsConstructor
@Data
@Table(name = "user") // 标明对应的数据库表名
public class User implements Serializable {
    private static final long serialVersionUID = 1L;
    @Column(name="uid")
    @TableId
    private Integer uid;
    @Column(name="username")
    private String username;
    @Column(name="password")
    private String password;
    //及其他
}

也就是说在实体类的上面加上@Table(name = "表名"),在每个属性上面加入 @Column(name="属性")

效果

2025-10-20 21:17:05.316  INFO 20616 --- [xecute-thread-1] c.c.backend.im.handler.NmsUserHandler    : [更新]User{uid=1, username='cixingji', nickname='ccvxv', avatar='https://cube.elemecdn.com/9/c2/f0ee8a3c7c9638a54940382568c9dpng.png', background='https://tinypic.host/images/2023/11/15/69PB2Q5W9D2U7L.png', gender=2, description='这个人很懒,什么都没留下~', exp=0, coin=0.0, vip=0, state=0, role=1, auth=0, authMsg='', createDate=Thu Jul 17 21:00:34 CST 2025, deleteDate=null}
null

Canal实现删重

实现canal进行redis删除失败的时候,进行重试删除。

两种策略

策略1:即时重试,不需要添加新组件。

策略2:异步补偿-->死信队列。需要添加MQ。

策略1

// ... 注入 RedisTemplate ...
@Service
@Slf4j
public class RedisService {
  
@Retryable(
    value = {RedisConnectionFailureException.class, RedisBusyException.class}, // 指定触发重试的异常
    maxAttempts = 3, // 最大重试次数
    backoff = @Backoff(delay = 1000) // 延迟1秒后重试
)
public void delete(String key) {
    log.warn("Attempting to delete Redis key: {}", key);
    // 如果连接Redis失败,会抛出异常,触发重试
    redisTemplate.delete(key); 
}

// 增加一个 Recover 方法,用于最终失败后的处理(例如发送告警)
@Recover
public void recoverDelete(RedisException e, String key) {
    log.error("Final attempt to delete key {} failed! Error: {}", key, e.getMessage());
    // 【关键】:这里是即时重试失败后的最终处理点
    // 比如:发送邮件告警,或者写入一个本地的重试表
}

策略1只能实现redis偶尔的抖动,网络小故障。

准备使用策略2,可以应用于redis长时间宕机。

为什么叫做死信/异补

死信队列:消息在消费过程中,故障了或者失败了

异步 : 交给一个独立的服务实现。

补偿 : 独立的服务持续监听重试队列,等待Redis或ES恢复之后,再进行补偿的“重试”。

一、引入Kafka

1.配置

<dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
      <version>2.2.6.RELEASE</version>
</dependency>
spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      auto-offset-reset: latest
      max-poll-records: 100
      max-partition-fetch-bytes: 1000000

2.

二、定义补偿消息体DTO

@Data
@NoArgsConstructor
@AllArgsConstructor
public class SyncFailDTO implements Serializable {
    private String tableName;        // 哪个表失败了
    private String operationType;    // INSERT, UPDATE, DELETE
    private String key;              // Redis key (用于删除操作)
    private Object data;             // 失败时要同步的完整数据对象(例如 User 对象)
    private Long timestamp = System.currentTimeMillis();
}

三、修改 Canal Handler(生产者)

  // 引入 Kafka 模板
    private final KafkaTemplate<String, Object> kafkaTemplate;
    // 注入泛型正确的 RedisTemplate<Object, Object>
    private final RedisTemplate<Object,Object> redisTemplate;

    // 定义补偿 topic 名称
    private static final String COMPENSATION_TOPIC = "canal_redis_compensation";

    private void handleRedisOperation(String opType, String key, User user) {
        try {
            log.info("[尝试 Redis 同步] Op:{} Key:{}", opType, key);

            // 核心逻辑:执行 Redis 操作
            if ("DELETE".equals(opType)) {
                redisTemplate.delete(key);
            } else { // INSERT 或 UPDATE
                redisTemplate.opsForValue().set(key, user);
            }

        } catch (Exception e) {
            // 如果 Redis 操作失败(例如连接异常、宕机等),则进入补偿流程
            log.error("[Redis 失败] 发送补偿消息到 Kafka. Key: {}", key, e);

            // 1. 构造失败消息体
            SyncFailDTO failMessage = new SyncFailDTO("user", opType, key, user);
                     // 发送完整的 User 对象,以便重试时可以正确 SET
            // 2. 发送到补偿队列 (使用 key 作为 Kafka key,确保同一 key 的消息有序)
            kafkaTemplate.send(COMPENSATION_TOPIC, key, failMessage);
        }
    }

四、创建补偿消费者服务(消费者)

public class RedisCompensationConsumer {
    // 注入 RedisTemplate (泛型需与 Canal Handler 中保持一致)
    private final RedisTemplate<Object,Object> redisTemplate;
    
    // 【可选】引入 Spring Retry 或自行实现重试机制
    
    // 监听步骤 3 中定义的 topic
    @KafkaListener(topics = "canal_redis_compensation", groupId = "redis-compensation-group")
    public void listen(SyncFailMessage message) {
        String key = message.getKey();
        String opType = message.getOperationType();
        
        log.warn("[补偿开始] 尝试重新同步 key: {},操作: {}", key, opType);

        try {
            // 核心逻辑:再次尝试 Redis 操作
            if ("DELETE".equals(opType)) {
                redisTemplate.delete(key);
            } else { // INSERT 或 UPDATE
                // 这里的 data 需要进行类型转换,Spring Kafka 默认是 Map,需要反序列化回 User
                // 简化:如果之前发送的就是 User 对象(依赖正确的 JsonSerializer/Deserializer 配置)
                redisTemplate.opsForValue().set(key, message.getData());
            }
            log.info("[补偿成功] key: {}", key);
        } catch (Exception e) {
            // 【关键】:如果补偿失败,则需要延迟或重复消费
            // 1. Kafka 默认重试:如果抛出异常,Kafka 可能会根据配置自动重试几次。
            // 2. 延迟重试:更高级的方案是,将此消息发送到一个**延迟队列**或**真正的死信队列(DLQ)**,等待更长时间再重试,防止对宕机的Redis造成持续压力。
            log.error("[补偿失败] key: {},将等待 Kafka 重试或进入下一步 DLQ 流程。", key, e);
            
            // 抛出异常,让 Kafka 消费者机制介入重试 (或配置手动 Ack/Nack)
            throw new RuntimeException("Redis compensation failed, waiting for Kafka retry.", e); 
        }
    }
}

Canal实现同步ES

引入ES通用工具类

已有

创建要同步的Document

要同步哪个表的哪些字段,这里面就只需要有这些字段

// UserDocument.java
@Data
@NoArgsConstructor
@AllArgsConstructor
public class UserDocument {
    @Id // 假设使用 userId 作为 ES 文档 ID
    private Long id; 
    private String username;
    private String nickname;
    private String phone;
    // ... 仅包含需要ES搜索的字段
}

在Handler类加入ES逻辑

@CanalTable("user")
@Component
@AllArgsConstructor
@Slf4j
public class NmsUserHandler implements EntryHandler<User> {

    private final RedisTemplate<Object, Object> redisTemplate;
    private final EsService esService; // 【新增】注入 ES Service

    // --- 辅助方法:将 DB 实体转换为 ES Document ---
    private UserDocument convertToDocument(User user) {
        // 【核心逻辑】:实现数据转换和字段筛选
        return new UserDocument(
            user.getUserId(),
            user.getUsername(),
            user.getNickname(),
            user.getPhone()
        );
    }

    @Override
    public void insert(User user) {
        // Redis 逻辑...
        redisTemplate.opsForValue().set("user:" + user.getUsername(), user);

        // 【新增 ES 逻辑】
        UserDocument doc = convertToDocument(user);
        esService.index("user_index", doc.getId().toString(), doc);
    }

    @Override
    public void update(User before, User after) {
        // Redis 逻辑...
        redisTemplate.opsForValue().set("user:" + after.getUsername(), after);

        // 【新增 ES 逻辑】
        UserDocument doc = convertToDocument(after);
        esService.index("user_index", doc.getId().toString(), doc); // ES 索引是幂等的,可以直接用 index/upsert
    }

    @Override
    public void delete(User user) {
        // Redis 逻辑...
        redisTemplate.delete("user:" + user.getUsername());

        // 【新增 ES 逻辑】
        esService.delete("user_index", user.getUserId().toString());
    }
}

效果

Kafka学习(Canal)

各部分组件如下

各部分

Producer

生产者,负责创建和发送消息

Consumer

消费者,负责阅读和处理消息

Broker

Kafka服务器,负责存储,接收,发送消息

Topic

是在Broker中的一类数据的分类

Partition

Partition是Topic的物理切分,也就是分区,一个Partition相当于一个独立的通道一个独立的线程

工作流程

每条消息的的结构:Key+Value

Key是分区路由,用来保证同一个Partition的顺序性。

在不指定Key的情况下,分区策略就会变成:早期版本是轮询(均匀的发送到Topic的所有Partition),新版本是粘性分区(生产者会尝试把连续的消息一直发给同一个Partition,直到满再换)

指定Key,一般都是拿主键id来保障顺序性。

1、生产者发送消息,把消息发送给Broker,并且指定Topic,指定Key。

2、Broker接收到消息之后,根据Topic和Key,把消息追加写到指定Partition的末尾;消息写入成功后,就会拿到一个Offset偏移量,也就是这个消息在Partition的位置编号。

3、消费者读取消息,向Broker请求订阅某个Topic,消费者是主动向Broker询问:有没有新消息。(订阅以Topic为单位)

4、消费者处理完这批消息之后,告诉Broker:我已经看完了编号为xx的消息了。这个xx就是新的Offset,Broker会记录这个记录,重复3。