checkpoint
- Barriers
当接收到jobmanager要进行checkpoint的请求时,会在当前source的数据插入一个barrier,随之该barrier往下游走,期间需要进行快照的操作只要碰到barrier,就会触发自身的快照操作,当所有sink确认快照后,就会向checkpoint协调器发送确认该快照完成,当失败重启时,会从最近一次成功保存的快照恢复。
barrier的作用就是为了把数据区分开,CheckPoint过程中有一个同步做快照的环节不能处理barrier之后的数据,为什么呢?
如果做快照的同时,也在处理数据,那么处理的数据可能会修改快照内容,所以先暂停处理数据,把内存中快照保存好后,再处理数据
- align
接收多个输入流的运算符需要继续快照barrier对齐输入流,比如当flink并行度小于kafka的分区时,必定会有一个task从多个partition读取kafka数据,此时就需要进行对齐。/xianpanjia4616/article/details/86375224
还没对齐时,从这些流入的数据会被放进缓冲区暂时不会处置,在该对齐并且快照之后,会优先处理缓冲区的数据
所以,对齐会增加流式的等待时间,某些系统对失效性要求较高,精确到毫秒级的,这时可以考虑不对齐。可以不对齐吗?可以的,但是不对齐可能会出现数据重复处理,比如前一个快照处理了后面的数据,当从这个快照失败恢复时,会造成后面的数据部分重复处理
- Exactly Once、At Least Once
Exactly Once时必须barrier对齐,如果barrier不对齐就变成了At Least Once
- flink内部的exactly once,即上面说的
-
End-to-End Exactly-Once语义
flink和外部系统如何做到一次交付呢?比如kafka,已经输入到kafka了,但是没有进行快照,从最近一次快照恢复,必定会重复数据,只输入到kafka一次,这时需要利用kafka producter的事务机制,在0.11才有的
Solution ---- Two phase commit
Flink采用Two phase commit来解决这个问题.
Phase 1: Pre-commit
Flink的JobManager向source注入checkpoint barrier以开启这次snapshot.
barrier从source流向sink.
每个进行snapshot的算子成功snapshot后,都会向JobManager发送ACK.
当sink完成snapshot后, 向JobManager发送ACK的同时向kafka进行pre-commit.
Phase 2:Commit
当JobManager接收到所有算子的ACK后,就会通知所有的算子这次checkpoint已经完成.
Sink接收到这个通知后, 就向kafka进行commit,正式把数据写入到kafka
不同阶段fail over的recovery举措:
(1) 在pre-commit前fail over, 系统恢复到最近的checkponit
(2) 在pre-commit后,commit前fail over,系统恢复到刚完成pre-commit时的状态
Flink的two phase commit实现 ---- 抽象类TwoPhaseCommitSinkFunction
TwoPhaseCommitSinkFunction有4个方法:
1. beginTransaction()
开启事务.创建一个临时文件.后续把原要写入到外部系统的数据写入到这个临时文件
2. preCommit()
flush并close这个文件,之后便不再往其中写数据.同时开启一个新的事务供下个checkponit使用
3. commit()
把pre-committed的临时文件移动到指定目录
4. abort()
删除掉pre-committed的临时文件
jobmanager:保存在jobmanager的堆上
filesystem:将in-flight数据(即每个算子操作的state的数据)存放在TaskManager的内存中。当进行checkpoint时,它将状态快照写入到配置的文件系统和目录。最小的元数据存储在JobManager的内存中
rocksdb:将in-flight数据存储在RocksDB数据库中,它(默认)存储在TaskManager的data目录下。当checkpoint时,整个RocksDB数据库将被checkpoint到配置的文件系统和目录下。最小的元数据存储在JobManager的内存中
# Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
# <class-name-of-factory>.
#
#: filesystem
: rocksdb
: /data/flink/rocksdb_data
# Directory for checkpoints filesystem, when using any of the default bundled
# state backends.
#
: file:///cfs/flink_pro/statedir/checkpoints
# Default target directory for savepoints, optional.
#
: file:///cfs/flink_pro/statedir/savepoints
- savapoint和checkpoint
Savepoint 是一种特殊的 Checkpoint,实际上它们的存储格式也是一致的,它们主要的不同在于定位。Checkpoint 机制的目标在于保证 Flink 作业意外崩溃重启不影响 exactly once 准确性,通常是配合作业重启策略使用的。而 Savepoint 的目的在于在 Flink 作业维护(比如更新作业代码)时将作业状态写到外部系统,以便维护结束后重新提交作业可以到恢复原本的状态。换句话讲,Checkpoint 是为 Flink runtime 准备的,Savepoint 是为 Flink 用户准备的。因此 Checkpoint 是由 Flink runtime 定时触发并根据运行配置自动清理的,一般不需要用户介入,而 Savepoint 的触发和清理都由用户掌控。
./flink list
./flink savepoint 6e5fbb2854c6177f4163e4f169e5f737 /cfs/flink_pro/tmp_savapoint/testsavapiont/billing_cpt_nolmalcpt_rt/
./flink run -s file:/cfs/flink_pro/tmp_savapoint/testsavapiont/billing_cpt_nolmalcpt_rt/savepoint-6e5fbb-04ba06a04128 -d /data/app/rt_billing.jar
由于savapoint提交时会涉及到代码的修改,Flink提供了API来为程序中每个Operator设置ID,这样可以在后续更新/升级程序的时候,可以在Savepoint数据中基于Operator ID来与对应的状态信息进行匹配,从而实现恢复。当然,如果我们不指定Operator ID,Flink也会我们自动生成对应的Operator状态ID。当然,如果程序修改很大,为了尽可能识别,要在每个操作上设置id
window
- 种类:
window
Tumbling Time Window 翻滚时间窗口
Sliding Time Window 滑动时间窗口
window
Tumbling Count Window
Sliding Count Window
3.Session Window
- 时间
处理时间
摄取时间
事件时间
- 水位线:使用事件时间时需要有水位线
- 驱逐器
- 触发器
例子:每隔5分钟触发一次
处理结果分为://TriggerResult.FIRE_AND_PURGE/
-
package
-
-
import
-
import
-
import
-
import
-
import .{IntSerializer, LongSerializer}
-
import
-
import
-
import .{Trigger, TriggerResult}
-
import
-
-
-
-
@SerialVersionUID(1L)
-
class ContinuousProcessingTimeTrigger[W <: Window] (val interval: Long) extends Trigger[AnyRef, W] {
-
/** When merging we take the lowest of all fire timestamps as the new fire timestamp. */
-
-
final private val stateDesc = new ReducingStateDescriptor[Long]("fire-time", new Min,classOf[Long])
-
-
@throws[Exception]
-
override def onElement(element: Object, timestamp: Long, window: W, ctx: ): TriggerResult = {
-
val fireTimestamp = (stateDesc)
-
val timestamp =
-
if ( == null) {
-
val start = timestamp - (timestamp % interval)
-
val nextFireTimestamp = start + interval
-
(nextFireTimestamp)
-
(nextFireTimestamp)
-
return
-
}
-
-
}
-
-
@throws[Exception]
-
override def onEventTime(time: Long, window: W, ctx: ) =
-
-
@throws[Exception]
-
override def onProcessingTime(time: Long, window: W, ctx: ): TriggerResult = {
-
val fireTimestamp = (stateDesc)
-
if ( == time) {
-
()
-
(time + interval)
-
(time + interval)
-
return
-
}
-
-
}
-
-
@throws[Exception]
-
override def clear(window: W, ctx: ): Unit = {
-
val fireTimestamp = (stateDesc)
-
val timestamp =
-
(timestamp)
-
()
-
}
-
-
override def canMerge = true
-
-
override def onMerge(window: W, ctx: ): Unit = {
-
(stateDesc)
-
}
-
-
@VisibleForTesting def getInterval: Long = interval
-
-
override def toString: String = "ContinuousProcessingTimeTrigger(" + interval + ")"
-
}
-
-
import
-
-
private class Min extends ReduceFunction[Long] {
-
override def reduce(value1: Long, value2: Long): Long = (value1, value2)
-
}
keyed
定义keys:
tuples(元组)
字段表达式Field Expressions
1、在pojo中直接选中某个字段
// some ordinary POJO (Plain old Java Object)
public class WC {
public String word;
public int count;
}
DataStream<WC> words = // [...]
DataStream<WC> wordCounts = ("word").window(/*window specification*/);
2、Select Tuple fields by their field name or 0-offset field index. For example "f0" and "5" refer to the first and sixth field of a Java Tuple type, respectively.
3、You can select nested fields in POJOs and Tuples. For example ""
4、You can select the full type using the "*" wildcard expressions. This does also work for types which are not Tuple or POJO types.
Key Selector Functions
// some ordinary POJO
public class WC {public String word; public int count;}
DataStream<WC> words = // [...]
KeyedStream<WC> keyed = words
.keyBy(new KeySelector<WC, String>() {
public String getKey(WC wc) { return ; }
});
Specifying Keys
Some transformations (join, coGroup, keyBy, groupBy) require that a key be defined on a collection of elements. Other transformations (Reduce, GroupReduce, Aggregate, Windows) allow data being grouped on a key before they are applied.