系列文章目录
1、为什么要学Checkpoint机制
2、Flink怎么实现容错
3、Checkpoint机制的执行流程
4、重启策略Restart Strategy
5、状态后端State Backend
6、开源Flink案例
文章目录
- 系列文章目录
- 前言
- 1、为什么要学Checkpoint机制
- 2、Flink怎么实现容错
- 3、Checkpoint机制的执行流程
- 4、重启策略Restart Strategy
- 4.1 不重启策略
- 4.2 固定延迟重启策略
- 4.3 失败率重启策略
- 4.4 指数延迟重启策略
- 4.5 小结
- 5、状态后端State Backend
- 5.1 内存状态后端MemoryStateBackend
- 5.2 文件系统状态后端FsStateBackend
- 5.3 RocksDB数据库状态后端RocksDBStateBackend
- 6、开源Flink案例
前言
本文通过案例方式详解-Flink-Checkpoint机制
1、为什么要学Checkpoint机制
因为Flink是流式(实时)计算程序,我们工作中希望Flink程序能够7x24小时运行,同时遇到一些问题/bug以后,能够自动恢复程序的运行。
2、Flink怎么实现容错
Flink由于是实时运行的程序,因此不仅要对中间计算的数据进行容错,还需要对程序进行容错。也就是Flink中的容错分为如下两类:
- 状态后端:对中间计算的数据进行容错
- 重启策略:对程序进行容错,让程序能够自动恢复
3、Checkpoint机制的执行流程
步骤如下:
Flink中Checkpoint执行流程:
1- JobManager中的检查点协调器会将barrier栅栏发送给到source算子
2- source算子接收到栅栏以后,先暂停对数据的处理工作,将算子运行的状态数据先保存到TaskManager上形成State状态数据;同时会向检查点协调器上报数据,在检查点协调器中获得到的数据称之为Checkpoint数据。数据上报完以后,才会恢复对数据的处理。
3- 栅栏会随着数据从source算子一直流动到最后的sink算子
4- 每个算子拿到栅栏以后的处理过程与source算子一样。也就是先暂停对数据的处理,在TaskManager上保存State状态数据,以及向检查点协调器汇报Checkpoint数据。然后才会继续处理数据
5- 直到所有的算子将数据汇报完成,那么这个过程才算结束。
4、重启策略Restart Strategy
重启策略,能够让Flink程序在挂了之后进行自动重启。保证任务容错。既可以在代码中设置,也能够在配置文件中设置,一般推荐使用代码进行设置。
官网链接如下:
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/ops/state/task_failure_recovery/
Flink有如下的几种重启策略。
- 不重启策略:一般不用
- 固定延迟重启策略:频繁使用
- 失败率重启策略:频繁使用
- 指数延迟重启策略:较少使用
官网文档如下:
4.1 不重启策略
Flink程序不重启,如果遇到异常就挂了。
代码中配置:
env.set_restart_strategy(RestartStrategies.no_restart())
配置文件flink-conf.yaml中的配置:
restart-strategy: none
4.2 固定延迟重启策略
允许Flink程序固定可以重启几次。每次重启的时间间隔是多少。这些参数是自己指定的。
代码中配置:
env = StreamExecutionEnvironment.get_execution_environment()
env.set_restart_strategy(RestartStrategies.fixed_delay_restart(
3, #重启的次数
10000 #延迟时间,这里配置的是10000毫秒
))
如果重启的次数超过了3次,那么不会再给你重启
配置文件flink-conf.yaml中的配置:
restart-strategy: fixed-delay #配置固定延迟重启
restart-strategy.fixed-delay.attempts: 3 #重启的次数
restart-strategy.fixed-delay.delay: 10 s #重启的间隔时间
4.3 失败率重启策略
在一定的时间范围内,重启的次数在允许范围内,那么会一直给你重启。
代码中配置:
env.set_restart_strategy(RestartStrategies.failure_rate_restart(
3, #间隔时间内重启的次数
300000, #时间间隔
10000 #延迟时间,这里配置的是10000毫秒
))
如果在300000毫秒统计时间以内,重启次数小于等于3次,那么会持续的给你进行重启;如果超过,不会再重启。
配置文件flink-conf.yaml中配置:
restart-strategy: failure-rate #配置失败率重启
restart-strategy.failure-rate.max-failures-per-interval: 3 #最大重启的次数
restart-strategy.failure-rate.failure-rate-interval: 5 min #失败率的时间间隔
restart-strategy.failure-rate.delay: 10 s #每次重启的时间间隔
4.4 指数延迟重启策略
Flink程序的重启时间随着指数的增加而呈指数级别递增。注意:阿里云Flink中没有这种重启策略
代码中配置:
Python暂不支持。
配置文件flink-conf.yaml中配置:
restart-strategy: exponential-delay #配置指数延迟重启
restart-strategy.exponential-delay.initial-backoff: 10 s #重启的初始值
restart-strategy.exponential-delay.max-backoff: 2 min #最大从重启时间间隔
restart-strategy.exponential-delay.backoff-multiplier: 2.0 #指数
restart-strategy.exponential-delay.reset-backoff-threshold: 10 min #重置重启时间
restart-strategy.exponential-delay.jitter-factor: 0.1 #重启因子,抖动因子
4.5 小结
工作中常用的重启策略:固定延迟重启策略,推荐使用失败率重启策略
重启策略中重启次数并不是设置的越多越好,一般推荐3-5次。
5、状态后端State Backend
用来保存Flink中State和Checkpoint的数据。
分类:
- 内存状态后端:默认,一般在开发或者测试中使用
- 文件系统状态后端:经常在生产环境使用,是用来存储Checkpoint数据
- RocksDB状态后端:经常在生产环境使用,是用来存储State数据
同时,生产中一般都是文件系统状态后端和RocksDB状态后端一起配合使用。
5.1 内存状态后端MemoryStateBackend
这是Flink中默认的状态后端。
代码中配置:
env.set_state_backend(HashMapStateBackend())
env.get_checkpoint_config().set_checkpoint_storage(JobManagerCheckpointStorage())
配置文件flink-conf.yaml中配置:
state.backend: hashmap
state.checkpoint-storage: jobmanager
内存状态后端,由于数据不安全,一般不用。
5.2 文件系统状态后端FsStateBackend
代码中配置:
env.set_state_backend(HashMapStateBackend())
env.get_checkpoint_config().set_checkpoint_storage_dir("file:///checkpoint-dir")
配置文件flink-conf.yaml中的配置:
state.backend: hashmap
state.checkpoints.dir: file:///checkpoint-dir/
state.checkpoint-storage: filesystem
文件系统状态后端一般工作中常用。
5.3 RocksDB数据库状态后端RocksDBStateBackend
代码中配置:
env.set_state_backend(EmbeddedRocksDBStateBackend())
env.get_checkpoint_config().set_checkpoint_storage_dir("file:///checkpoint-dir")
配置文件flink-conf.yaml中的配置:
state.backend: rocksdb
state.checkpoints.dir: file:///checkpoint-dir/
state.checkpoint-storage: filesystem
6、开源Flink案例
Checkpoint的配置一般都是固定不变的,可以配置在flink-conf.yaml文件中,这样配置完后,对所有任务都生效,如下:
建议操作前先保存一个node1的虚拟机快照,下面的操作,全部都在node1执行:
# 1.创建HDFS路径
hdfs dfs -mkdir /checkpoints
# 2.修改Flink配置文件
cd /export/server/flink/conf
vim flink-conf.yaml
# 3.要添加的内容如下
注意: 配置文件中的配置项前面不要有#
execution.checkpointing.interval: 5000
#设置有且仅有一次模式 目前支持 EXACTLY_ONCE、AT_LEAST_ONCE
execution.checkpointing.mode: EXACTLY_ONCE
state.backend: hashmap
#设置checkpoint的存储方式
state.checkpoint-storage: filesystem
#设置checkpoint的存储位置
state.checkpoints.dir: hdfs://node1:8020/checkpoints
#设置savepoint的存储位置
state.savepoints.dir: hdfs://node1:8020/checkpoints
#设置checkpoint的超时时间 即一次checkpoint必须在该时间内完成 不然就丢弃
execution.checkpointing.timeout: 600000
#设置两次checkpoint之间的最小时间间隔
execution.checkpointing.min-pause: 500
#设置并发checkpoint的数目
execution.checkpointing.max-concurrent-checkpoints: 1
#开启checkpoints的外部持久化这里设置了清除job时保留checkpoint,默认值时保留一个 假如要保留3个
state.checkpoints.num-retained: 3
#默认情况下,checkpoint不是持久化的,只用于从故障中恢复作业。当程序被取消时,它们会被删除。但是你可以配置checkpoint被周期性持久化到外部,类似于savepoints。这些外部的checkpoints将它们的元数据输出到外#部持久化存储并且当作业失败时不会自动
#清除。这样,如果你的工作失败了,你就会有一个checkpoint来恢复。
#ExternalizedCheckpointCleanup模式配置当你取消作业时外部checkpoint会产生什么行为:
#RETAIN_ON_CANCELLATION: 当作业被取消时,保留外部的checkpoint。注意,在此情况下,您必须手动清理checkpoint状态。
#DELETE_ON_CANCELLATION: 当作业被取消时,删除外部化的checkpoint。只有当作业失败时,检查点状态才可用。
execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
#第二种:固定延迟重启策略
#设置重启策略
restart-strategy: fixed-delay
#尝试重启次数
restart-strategy.fixed-delay.attempts: 3
#两次连续重启的间隔时间
restart-strategy.fixed-delay.delay: 10 s
# 4.改完配置以后,需要重启Flink集群。另外需要注意我们使用了HDFS。
cd /export/server/flink/bin
./stop-cluster.sh
./start-cluster.sh
Python代码:
#1.构建流式执行环境
from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode, DataStream
from pyflink.table import DataTypes
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
# env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
#2.数据source
input_ds = DataStream(env._j_stream_execution_environment.socketTextStream("node1",9999))
#3.数据处理
def map_word(word):
if word == "error":
raise ValueError("出异常了,程序挂了...")
else:
return (word,1)
result_ds = input_ds.flat_map(lambda x:x.split(" "))\
.map(lambda word:map_word(word),output_type=Types.TUPLE([Types.STRING(),Types.INT()])).\
key_by(lambda x:x[0])\
.reduce(lambda x,y:(x[0],x[1] + y[1]))
#4.数据Sink
result_ds.print()
#5.启动流式任务
env.execute()
部署python代码的命令:
/export/server/flink/bin/flink run -py /export/software/checkpoint_demo.py
需要在node1上启动nc
nc -lk 9999
运行结果截图: