1. 状态类型
Flink支持两种主要类型的状态:
1.1 算子状态(Operator State)
- **定义:**算子状态是与特定算子实例绑定的状态,即一个算子的状态不能被其他算子访问。
- **特性:**与并发的算子实例绑定,假设算子并行度为N,则存在N个对应的算子状态。
1.2 键控状态(Keyed State)
- 定义:键控状态是基于键(Key)的状态,用于存储与每个键相关的数据信息。
- 使用场景:键控状态只能在KeyedStream上使用,通过stream.keyBy(…)获得KeyedStream。
- 分类:
- ValueState:存储单个值的状态。
- ListState:存储列表类型的状态。
- MapState:存储键值对的状态。
- ReducingState:存储经过ReduceFunction计算后的结果。
- AggregatingState:存储经过AggregatingState计算后的结果。
RichFunction 中 RuntimeContext 提供如下方法:
ValueState<T> getState(ValueStateDescriptor<T>)
ReducingState<T> getReducingState(ReducingStateDescriptor<T>)
ListState<T> getListState(ListStateDescriptor<T>)
AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>)
官网例子:
public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
/**
* The ValueState handle. The first field is the count, the second field a running sum.
*/
private transient ValueState<Tuple2<Long, Long>> sum;
@Override
public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
// access the state value
Tuple2<Long, Long> currentSum = sum.value();
// update the count
currentSum.f0 += 1;
// add the second field of the input value
currentSum.f1 += input.f1;
// update the state
sum.update(currentSum);
// if the count reaches 2, emit the average and clear the state
if (currentSum.f0 >= 2) {
out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
sum.clear();
}
}
@Override
public void open(Configuration config) {
ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
new ValueStateDescriptor<>(
"average", // the state name
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
sum = getRuntimeContext().getState(descriptor);
}
}
// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
.keyBy(value -> value.f0)
.flatMap(new CountWindowAverage())
.print();
// the printed output will be (1,4) and (1,5)
这个例子实现了一个简单的计数窗口。 把元组的第一个元素当作 key(在示例中都 key 都是 “1”)。 该函数将出现的次数以及总和存储在 “ValueState” 中。 一旦出现次数达到 2,则将平均值发送到下游,并清除状态重新开始。 请注意,会为每个不同的 key(元组中第一个元素)保存一个单独的值。
2. 使用状态
2.1 定义和访问状态
- 使用Flink的Stateful Functions API(如KeyedProcessFunction、ProcessFunction等)来定义和访问状态。
- 通过getRuntimeContext().getState(…)或特定的状态描述符(如ValueStateDescriptor)来获取状态。
2.2 更新状态
- 在处理事件时,根据业务逻辑更新状态。
- 使用状态提供的更新方法(如ValueState.update(T value))来更新状态值。
2.3 使用状态的注意事项
- 确保在适当的时机(如处理完事件后)更新状态。
- 注意状态的清理和释放,以避免内存泄漏。
3. 状态管理
3.1 状态后端(State Backend)
- Flink使用状态后端来存储和管理状态数据。
- 可用的状态后端包括MemoryStateBackend(内存存储,适用于测试和轻量级应用)、FsStateBackend(文件系统存储,适用于生产环境)和RocksDBStateBackend(基于RocksDB的存储,适用于大规模应用)。
3.2 状态持久化
- Flink支持将状态数据持久化到外部存储系统(如HDFS、S3等),以确保在发生故障时能够恢复状态。
- 配置状态后端时,可以指定持久化选项和参数。
3.3 状态清理
- Flink允许为状态设置时间戳限制(TTL),以便在达到指定时间后自动清理过期状态。
- 使用StateTtlConfig类配置状态TTL。
状态有效期 (TTL) 官网例子:
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Duration.ofSeconds(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);
TTL 配置有以下几个选项: newBuilder 的第一个参数表示数据的有效期,是必选项。
TTL 的更新策略(默认是 OnCreateAndWrite):
- StateTtlConfig.UpdateType.OnCreateAndWrite - 仅在创建和写入时更新
- StateTtlConfig.UpdateType.OnReadAndWrite - 读取时也更新
注意: 如果你同时将状态的可见性配置为 StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp, 那么在PyFlink作业中,状态的读缓存将会失效,这将导致一部分的性能损失
数据在过期但还未被清理时的可见性配置如下(默认为 NeverReturnExpired):
- StateTtlConfig.StateVisibility.NeverReturnExpired - 不返回过期数据
注意: 在PyFlink作业中,状态的读写缓存都将失效,这将导致一部分的性能损失
- StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp - 会返回过期但未清理的数据
注意:
- 状态上次的修改时间会和数据一起保存在 state backend 中,因此开启该特性会增加状态数据的存储。 Heap state backend 会额外存储一个包括用户状态以及时间戳的 Java 对象,RocksDB state backend 会在每个状态值(list 或者 map 的每个元素)序列化后增加 8 个字节。
- 暂时只支持基于 processing time 的 TTL。
- 尝试从 checkpoint/savepoint 进行恢复时,TTL 的状态(是否开启)必须和之前保持一致,否则会遇到 “StateMigrationException”。
- TTL 的配置并不会保存在 checkpoint/savepoint 中,仅对当前 Job 有效。
- 不建议checkpoint恢复前后将state TTL从短调长,这可能会产生潜在的数据错误。
- 当前开启 TTL 的 map state 仅在用户值序列化器支持 null 的情况下,才支持用户值为 null。如果用户值序列化器不支持 null, 可以用 NullableSerializer 包装一层。
- 启用 TTL 配置后,StateDescriptor 中的 defaultValue(已被标记 deprecated)将会失效。这个设计的目的是为了确保语义更加清晰,在此基础上,用户需要手动管理那些实际值为 null 或已过期的状态默认值。
过期数据的清理
- 全量快照时进行清理
启用全量快照时进行清理的策略,这可以减少整个快照的大小。当前实现中不会清理本地的状态,但从上次快照恢复时,不会恢复那些已经删除的过期数据。 该策略可以通过 StateTtlConfig 配置进行配置:
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Duration.ofSeconds(1))
.cleanupFullSnapshot()
.build();
注意:
- 这种策略在 RocksDBStateBackend 的增量 checkpoint 模式下无效。
- 这种清理方式可以在任何时候通过 StateTtlConfig 启用或者关闭,比如在从 savepoint 恢复时。
- 增量数据清理
在状态访问或/和处理时进行。如果某个状态开启了该清理策略,则会在存储后端保留一个所有状态的惰性全局迭代器。 每次触发增量清理时,从迭代器中选择已经过期的数进行清理。
import org.apache.flink.api.common.state.StateTtlConfig;
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Duration.ofSeconds(1))
.cleanupIncrementally(10, true)
.build();
该策略有两个参数。 第一个是每次清理时检查状态的条目数,在每个状态访问时触发。第二个参数表示是否在处理每条记录时触发清理。 Heap backend 默认会检查 5 条状态,并且关闭在每条记录时触发清理。
注意:
- 如果没有 state 访问,也没有处理数据,则不会清理过期数据。
- 增量清理会增加数据处理的耗时。
- 现在仅 Heap state backend 支持增量清除机制。在 RocksDB state backend 上启用该特性无效。
- 如果 Heap state backend 使用同步快照方式,则会保存一份所有 key 的拷贝,从而防止并发修改问题,因此会增加内存的使用。但异步快照则没有这个问题。
- 对已有的作业,这个清理方式可以在任何时候通过 StateTtlConfig 启用或禁用该特性,比如从 savepoint 重启后。
- RocksDB 压缩时清理
使用 RocksDB state backend,则会启用 Flink 为 RocksDB 定制的压缩过滤器。RocksDB 会周期性的对数据进行合并压缩从而减少存储空间。 Flink 提供的 RocksDB 压缩过滤器会在压缩时过滤掉已经过期的状态数据。
import org.apache.flink.api.common.state.StateTtlConfig;
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Duration.ofSeconds(1))
.cleanupInRocksdbCompactFilter(1000, Duration.ofHours(1))
.build();
Flink 处理一定条数的状态数据后,会使用当前时间戳来检测 RocksDB 中的状态是否已经过期, 你可以通过 StateTtlConfig.newBuilder(…).cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries) 方法指定处理状态的条数。 时间戳更新的越频繁,状态的清理越及时,但由于压缩会有调用 JNI 的开销,因此会影响整体的压缩性能。 RocksDB backend 的默认后台清理策略会每处理 1000 条数据进行一次。
定期压缩可以加速过期状态条目的清理,特别是对于很少访问的状态条目。 比这个值早的文件将被选取进行压缩,并重新写入与之前相同的 Level 中。 该功能可以确保文件定期通过压缩过滤器压缩。 您可以通过StateTtlConfig.newBuilder(…).cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries, Duration periodicCompactionTime) 方法设定定期压缩的时间。 定期压缩的时间的默认值是 30 天。 您可以将其设置为 0 以关闭定期压缩或设置一个较小的值以加速过期状态条目的清理,但它将会触发更多压缩。
注意:
- 压缩时调用 TTL 过滤器会降低速度。TTL 过滤器需要解析上次访问的时间戳,并对每个将参与压缩的状态进行是否过期检查。 对于集合型状态类型(比如 list 和 map),会对集合中每个元素进行检查。
- 对于元素序列化后长度不固定的列表状态,TTL 过滤器需要在每次 JNI 调用过程中,额外调用 Flink 的 java 序列化器, 从而确定下一个未过期数据的位置。
- 对已有的作业,这个清理方式可以在任何时候通过 StateTtlConfig 启用或禁用该特性,比如从 savepoint 重启后。
- 定期压缩功能只在 TTL 启用时生效。
3.4 故障恢复
- 当Flink任务发生故障时,可以从最近的检查点(Checkpoint)恢复状态和数据。
- 检查点机制是Flink实现容错性的关键组成部分。