Flink 容错机制 保存点和检查点

发布时间 2023-07-26 19:07:48作者: RICH-ATONE

Flink检查点常用配置:

//配置检查点
env.enableCheckpointing(180000); // 开启checkpoint 每180000ms 一次
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(50000);// 确认 checkpoints 之间的时间会进行 50000 ms
env.getCheckpointConfig().setCheckpointTimeout(600000); //设置checkpoint的超时时间 即一次checkpoint必须在该时间内完成 不然就丢弃
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//设置有且仅有一次模式 目前支持EXACTLY_ONCE/AT_LEAST_ONCE
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);// 设置并发checkpoint的数目
env.getCheckpointConfig().setCheckpointStorage("hdfs:///flink-checkpoints/oracle/AC_SUB_REGIST_INFO"); // 这个是存放到hdfs目录下
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// 开启在 job 中止后仍然保留的 externalizedcheckpoints
env.getCheckpointConfig().enableUnalignedCheckpoints();// 开启checkpoints

Checkpoint与State的关系

State 是 Checkpoint 所做的主要持久化备份的主要数据,而 Checkpoint 是从 source 触发到下游所有节点完成的一次全局操作。

 Flink任务恢复,可以从Checkpoint或者savepoint进行实时任务数据恢复;

 Checkpoint 的实现算法

  • 基于 Chandy-Lamport 算法的分布式快照

  • 将检查点的保存和数据处理分开,不暂停整个应用

  • 检查点分界线(Checkpoint Barrier)

    • Flink 的检查点算法用到了一种为分界线(barrier)的特殊数据形式,用来把一条流上的数据按照不同的检查点分开
    • Flink 会定时在任务的 Source Task 触发 barrier,barrier是一种特殊的消息事件,会随着消息通道流入到下游的算子中
    • barrier 之前到来的数据导致的状态更改,都会被包含在当前 barrier 所属的检查点中
    • barrier 之后的数据导致的所有更改,就会被包含在之后的检查点中
    • 在某些算子的 Task 有多个输入时,会存在 Barrier 对齐时间,我们可以在Web UI上面看到各个 Task 的Barrier 对齐时间
    • 只有当最后 Sink 端的算子接收到 Barrier 并确认该次 Checkpoint 完成时,该次 Checkpoint 才算完成
详解参考:Flink系列 15. 介绍Flink中Checkpoint与Savepoint

从检查点启动示例

从checkpoint启动示例:
./bin/flink run -s hdfs://ip:8020/user/xx/chk-35  -c xxx_demo ./xxx.jar 

从savepoint启动参考:Flink保留savepoint,并从savepoint启动示例

 
其他参考:
Flink 容错机制 保存点和检查点
Flink Checkpoint 原理流程以及常见失败原因分析