Flink-Checkpoint机制详解:(第41天)

时间:2024-07-20 09:31:17

系列文章目录

1、为什么要学Checkpoint机制
2、Flink怎么实现容错
3、Checkpoint机制的执行流程
4、重启策略Restart Strategy
5、状态后端State Backend
6、开源Flink案例

文章目录

  • 系列文章目录
  • 前言
      • 1、为什么要学Checkpoint机制
      • 2、Flink怎么实现容错
      • 3、Checkpoint机制的执行流程
      • 4、重启策略Restart Strategy
        • 4.1 不重启策略
        • 4.2 固定延迟重启策略
        • 4.3 失败率重启策略
        • 4.4 指数延迟重启策略
        • 4.5 小结
      • 5、状态后端State Backend
        • 5.1 内存状态后端MemoryStateBackend
        • 5.2 文件系统状态后端FsStateBackend
        • 5.3 RocksDB数据库状态后端RocksDBStateBackend
      • 6、开源Flink案例


前言

本文通过案例方式详解-Flink-Checkpoint机制

1、为什么要学Checkpoint机制

因为Flink是流式(实时)计算程序,我们工作中希望Flink程序能够7x24小时运行,同时遇到一些问题/bug以后,能够自动恢复程序的运行。

2、Flink怎么实现容错

Flink由于是实时运行的程序,因此不仅要对中间计算的数据进行容错,还需要对程序进行容错。也就是Flink中的容错分为如下两类:

  • 状态后端:对中间计算的数据进行容错
  • 重启策略:对程序进行容错,让程序能够自动恢复

3、Checkpoint机制的执行流程

在这里插入图片描述

步骤如下:

Flink中Checkpoint执行流程:
1- JobManager中的检查点协调器会将barrier栅栏发送给到source算子
2- source算子接收到栅栏以后,先暂停对数据的处理工作,将算子运行的状态数据先保存到TaskManager上形成State状态数据;同时会向检查点协调器上报数据,在检查点协调器中获得到的数据称之为Checkpoint数据。数据上报完以后,才会恢复对数据的处理。
3- 栅栏会随着数据从source算子一直流动到最后的sink算子
4- 每个算子拿到栅栏以后的处理过程与source算子一样。也就是先暂停对数据的处理,在TaskManager上保存State状态数据,以及向检查点协调器汇报Checkpoint数据。然后才会继续处理数据
5- 直到所有的算子将数据汇报完成,那么这个过程才算结束。

4、重启策略Restart Strategy

重启策略,能够让Flink程序在挂了之后进行自动重启。保证任务容错。既可以在代码中设置,也能够在配置文件中设置,一般推荐使用代码进行设置。

官网链接如下:

https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/ops/state/task_failure_recovery/

Flink有如下的几种重启策略。

  • 不重启策略:一般不用
  • 固定延迟重启策略:频繁使用
  • 失败率重启策略:频繁使用
  • 指数延迟重启策略:较少使用

官网文档如下:

在这里插入图片描述

4.1 不重启策略

Flink程序不重启,如果遇到异常就挂了。

代码中配置:

env.set_restart_strategy(RestartStrategies.no_restart())

配置文件flink-conf.yaml中的配置:

restart-strategy: none
4.2 固定延迟重启策略

允许Flink程序固定可以重启几次。每次重启的时间间隔是多少。这些参数是自己指定的。

代码中配置:

env = StreamExecutionEnvironment.get_execution_environment()
env.set_restart_strategy(RestartStrategies.fixed_delay_restart(
    3,  #重启的次数
    10000  #延迟时间,这里配置的是10000毫秒
))

如果重启的次数超过了3次,那么不会再给你重启

配置文件flink-conf.yaml中的配置:

restart-strategy: fixed-delay   #配置固定延迟重启
restart-strategy.fixed-delay.attempts: 3    #重启的次数
restart-strategy.fixed-delay.delay: 10 s    #重启的间隔时间
4.3 失败率重启策略

在一定的时间范围内,重启的次数在允许范围内,那么会一直给你重启。

代码中配置:

env.set_restart_strategy(RestartStrategies.failure_rate_restart(
    3,  #间隔时间内重启的次数
    300000,  #时间间隔
    10000  #延迟时间,这里配置的是10000毫秒
))
如果在300000毫秒统计时间以内,重启次数小于等于3次,那么会持续的给你进行重启;如果超过,不会再重启。

配置文件flink-conf.yaml中配置:

restart-strategy: failure-rate  #配置失败率重启
restart-strategy.failure-rate.max-failures-per-interval: 3   #最大重启的次数
restart-strategy.failure-rate.failure-rate-interval: 5 min   #失败率的时间间隔
restart-strategy.failure-rate.delay: 10 s					 #每次重启的时间间隔
4.4 指数延迟重启策略

Flink程序的重启时间随着指数的增加而呈指数级别递增。注意:阿里云Flink中没有这种重启策略

在这里插入图片描述

代码中配置:

Python暂不支持。

配置文件flink-conf.yaml中配置:

restart-strategy: exponential-delay			#配置指数延迟重启
restart-strategy.exponential-delay.initial-backoff: 10 s    #重启的初始值
restart-strategy.exponential-delay.max-backoff: 2 min		#最大从重启时间间隔
restart-strategy.exponential-delay.backoff-multiplier: 2.0  #指数
restart-strategy.exponential-delay.reset-backoff-threshold: 10 min    #重置重启时间
restart-strategy.exponential-delay.jitter-factor: 0.1		#重启因子,抖动因子
4.5 小结

工作中常用的重启策略:固定延迟重启策略,推荐使用失败率重启策略

重启策略中重启次数并不是设置的越多越好,一般推荐3-5次。

5、状态后端State Backend

用来保存Flink中State和Checkpoint的数据。

分类:

  • 内存状态后端:默认,一般在开发或者测试中使用
  • 文件系统状态后端:经常在生产环境使用,是用来存储Checkpoint数据
  • RocksDB状态后端:经常在生产环境使用,是用来存储State数据

同时,生产中一般都是文件系统状态后端和RocksDB状态后端一起配合使用。

5.1 内存状态后端MemoryStateBackend

这是Flink中默认的状态后端。

在这里插入图片描述

代码中配置:

env.set_state_backend(HashMapStateBackend())
env.get_checkpoint_config().set_checkpoint_storage(JobManagerCheckpointStorage())

配置文件flink-conf.yaml中配置:

state.backend: hashmap
state.checkpoint-storage: jobmanager

内存状态后端,由于数据不安全,一般不用。

5.2 文件系统状态后端FsStateBackend

在这里插入图片描述

代码中配置:

env.set_state_backend(HashMapStateBackend())
env.get_checkpoint_config().set_checkpoint_storage_dir("file:///checkpoint-dir")

配置文件flink-conf.yaml中的配置:

state.backend: hashmap
state.checkpoints.dir: file:///checkpoint-dir/
state.checkpoint-storage: filesystem

文件系统状态后端一般工作中常用。

5.3 RocksDB数据库状态后端RocksDBStateBackend

在这里插入图片描述

代码中配置:

env.set_state_backend(EmbeddedRocksDBStateBackend())
env.get_checkpoint_config().set_checkpoint_storage_dir("file:///checkpoint-dir")

配置文件flink-conf.yaml中的配置:

state.backend: rocksdb
state.checkpoints.dir: file:///checkpoint-dir/

state.checkpoint-storage: filesystem

6、开源Flink案例

Checkpoint的配置一般都是固定不变的,可以配置在flink-conf.yaml文件中,这样配置完后,对所有任务都生效,如下:

建议操作前先保存一个node1的虚拟机快照,下面的操作,全部都在node1执行:

# 1.创建HDFS路径
hdfs dfs -mkdir /checkpoints


# 2.修改Flink配置文件
cd /export/server/flink/conf
vim flink-conf.yaml

# 3.要添加的内容如下
注意: 配置文件中的配置项前面不要有#
execution.checkpointing.interval: 5000
#设置有且仅有一次模式 目前支持 EXACTLY_ONCE、AT_LEAST_ONCE        
execution.checkpointing.mode: EXACTLY_ONCE
state.backend: hashmap
#设置checkpoint的存储方式
state.checkpoint-storage: filesystem
#设置checkpoint的存储位置
state.checkpoints.dir: hdfs://node1:8020/checkpoints
#设置savepoint的存储位置
state.savepoints.dir: hdfs://node1:8020/checkpoints
#设置checkpoint的超时时间 即一次checkpoint必须在该时间内完成 不然就丢弃
execution.checkpointing.timeout: 600000
#设置两次checkpoint之间的最小时间间隔
execution.checkpointing.min-pause: 500
#设置并发checkpoint的数目
execution.checkpointing.max-concurrent-checkpoints: 1
#开启checkpoints的外部持久化这里设置了清除job时保留checkpoint,默认值时保留一个 假如要保留3个
state.checkpoints.num-retained: 3
#默认情况下,checkpoint不是持久化的,只用于从故障中恢复作业。当程序被取消时,它们会被删除。但是你可以配置checkpoint被周期性持久化到外部,类似于savepoints。这些外部的checkpoints将它们的元数据输出到外#部持久化存储并且当作业失败时不会自动
#清除。这样,如果你的工作失败了,你就会有一个checkpoint来恢复。
#ExternalizedCheckpointCleanup模式配置当你取消作业时外部checkpoint会产生什么行为:
#RETAIN_ON_CANCELLATION: 当作业被取消时,保留外部的checkpoint。注意,在此情况下,您必须手动清理checkpoint状态。
#DELETE_ON_CANCELLATION: 当作业被取消时,删除外部化的checkpoint。只有当作业失败时,检查点状态才可用。
execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION

#第二种:固定延迟重启策略
#设置重启策略
restart-strategy: fixed-delay
#尝试重启次数
restart-strategy.fixed-delay.attempts: 3
#两次连续重启的间隔时间
restart-strategy.fixed-delay.delay: 10 s
# 4.改完配置以后,需要重启Flink集群。另外需要注意我们使用了HDFS。
cd /export/server/flink/bin
./stop-cluster.sh
./start-cluster.sh

Python代码:

#1.构建流式执行环境
from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode, DataStream
from pyflink.table import DataTypes

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
# env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
#2.数据source
input_ds = DataStream(env._j_stream_execution_environment.socketTextStream("node1",9999))
#3.数据处理
def map_word(word):
    if word == "error":
        raise ValueError("出异常了,程序挂了...")
    else:
        return (word,1)

result_ds = input_ds.flat_map(lambda x:x.split(" "))\
    .map(lambda word:map_word(word),output_type=Types.TUPLE([Types.STRING(),Types.INT()])).\
    key_by(lambda x:x[0])\
    .reduce(lambda x,y:(x[0],x[1] + y[1]))
#4.数据Sink
result_ds.print()
#5.启动流式任务
env.execute()

在这里插入图片描述

部署python代码的命令:
/export/server/flink/bin/flink run -py /export/software/checkpoint_demo.py
需要在node1上启动nc
nc -lk 9999

运行结果截图:

在这里插入图片描述