flink

时间:2024-11-16 08:08:23

checkpoint

  • Barriers

当接收到jobmanager要进行checkpoint的请求时,会在当前source的数据插入一个barrier,随之该barrier往下游走,期间需要进行快照的操作只要碰到barrier,就会触发自身的快照操作,当所有sink确认快照后,就会向checkpoint协调器发送确认该快照完成,当失败重启时,会从最近一次成功保存的快照恢复。

barrier的作用就是为了把数据区分开,CheckPoint过程中有一个同步做快照的环节不能处理barrier之后的数据,为什么呢?
如果做快照的同时,也在处理数据,那么处理的数据可能会修改快照内容,所以先暂停处理数据,把内存中快照保存好后,再处理数据

  • align

接收多个输入流的运算符需要继续快照barrier对齐输入流,比如当flink并行度小于kafka的分区时,必定会有一个task从多个partition读取kafka数据,此时就需要进行对齐。/xianpanjia4616/article/details/86375224

还没对齐时,从这些流入的数据会被放进缓冲区暂时不会处置,在该对齐并且快照之后,会优先处理缓冲区的数据

所以,对齐会增加流式的等待时间,某些系统对失效性要求较高,精确到毫秒级的,这时可以考虑不对齐。可以不对齐吗?可以的,但是不对齐可能会出现数据重复处理,比如前一个快照处理了后面的数据,当从这个快照失败恢复时,会造成后面的数据部分重复处理

  • Exactly Once、At Least Once

Exactly Once时必须barrier对齐,如果barrier不对齐就变成了At Least Once

  1. flink内部的exactly once,即上面说的
  2. End-to-End Exactly-Once语义

flink和外部系统如何做到一次交付呢?比如kafka,已经输入到kafka了,但是没有进行快照,从最近一次快照恢复,必定会重复数据,只输入到kafka一次,这时需要利用kafka producter的事务机制,在0.11才有的

Solution ---- Two phase commit
Flink采用Two phase commit来解决这个问题.
Phase 1: Pre-commit
Flink的JobManager向source注入checkpoint barrier以开启这次snapshot.
barrier从source流向sink.
每个进行snapshot的算子成功snapshot后,都会向JobManager发送ACK.
当sink完成snapshot后, 向JobManager发送ACK的同时向kafka进行pre-commit.
Phase 2:Commit
当JobManager接收到所有算子的ACK后,就会通知所有的算子这次checkpoint已经完成.
Sink接收到这个通知后, 就向kafka进行commit,正式把数据写入到kafka

不同阶段fail over的recovery举措:
(1)     在pre-commit前fail over, 系统恢复到最近的checkponit
(2)     在pre-commit后,commit前fail over,系统恢复到刚完成pre-commit时的状态
Flink的two phase commit实现 ---- 抽象类TwoPhaseCommitSinkFunction
TwoPhaseCommitSinkFunction有4个方法:
1. beginTransaction()
开启事务.创建一个临时文件.后续把原要写入到外部系统的数据写入到这个临时文件
2. preCommit()
flush并close这个文件,之后便不再往其中写数据.同时开启一个新的事务供下个checkponit使用
3. commit()
把pre-committed的临时文件移动到指定目录
4. abort()
删除掉pre-committed的临时文件

jobmanager:保存在jobmanager的堆上

filesystem:将in-flight数据(即每个算子操作的state的数据)存放在TaskManager的内存中。当进行checkpoint时,它将状态快照写入到配置的文件系统和目录。最小的元数据存储在JobManager的内存中

rocksdb:将in-flight数据存储在RocksDB数据库中,它(默认)存储在TaskManager的data目录下。当checkpoint时,整个RocksDB数据库将被checkpoint到配置的文件系统和目录下。最小的元数据存储在JobManager的内存中

# Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
# <class-name-of-factory>.
#
#: filesystem
: rocksdb
: /data/flink/rocksdb_data

# Directory for checkpoints filesystem, when using any of the default bundled
# state backends.
#
: file:///cfs/flink_pro/statedir/checkpoints

# Default target directory for savepoints, optional.
#
: file:///cfs/flink_pro/statedir/savepoints

  • savapoint和checkpoint

Savepoint 是一种特殊的 Checkpoint,实际上它们的存储格式也是一致的,它们主要的不同在于定位。Checkpoint 机制的目标在于保证 Flink 作业意外崩溃重启不影响 exactly once 准确性,通常是配合作业重启策略使用的。而 Savepoint 的目的在于在 Flink 作业维护(比如更新作业代码)时将作业状态写到外部系统,以便维护结束后重新提交作业可以到恢复原本的状态。换句话讲,Checkpoint 是为 Flink runtime 准备的,Savepoint 是为 Flink 用户准备的。因此 Checkpoint 是由 Flink runtime 定时触发并根据运行配置自动清理的,一般不需要用户介入,而 Savepoint 的触发和清理都由用户掌控。

     ./flink list
     ./flink savepoint 6e5fbb2854c6177f4163e4f169e5f737  /cfs/flink_pro/tmp_savapoint/testsavapiont/billing_cpt_nolmalcpt_rt/
     ./flink run -s file:/cfs/flink_pro/tmp_savapoint/testsavapiont/billing_cpt_nolmalcpt_rt/savepoint-6e5fbb-04ba06a04128 -d /data/app/rt_billing.jar

由于savapoint提交时会涉及到代码的修改,Flink提供了API来为程序中每个Operator设置ID,这样可以在后续更新/升级程序的时候,可以在Savepoint数据中基于Operator ID来与对应的状态信息进行匹配,从而实现恢复。当然,如果我们不指定Operator ID,Flink也会我们自动生成对应的Operator状态ID。当然,如果程序修改很大,为了尽可能识别,要在每个操作上设置id

window

  • 种类:

        window
            Tumbling Time Window 翻滚时间窗口
            Sliding Time Window 滑动时间窗口
        window
            Tumbling Count Window
            Sliding Count Window
        3.Session Window

  • 时间

处理时间

摄取时间

事件时间

  • 水位线:使用事件时间时需要有水位线
  • 驱逐器
  • 触发器

例子:每隔5分钟触发一次

处理结果分为://TriggerResult.FIRE_AND_PURGE/

  1. package
  2. import
  3. import
  4. import
  5. import
  6. import .{IntSerializer, LongSerializer}
  7. import
  8. import
  9. import .{Trigger, TriggerResult}
  10. import
  11. @SerialVersionUID(1L)
  12. class ContinuousProcessingTimeTrigger[W <: Window] (val interval: Long) extends Trigger[AnyRef, W] {
  13. /** When merging we take the lowest of all fire timestamps as the new fire timestamp. */
  14. final private val stateDesc = new ReducingStateDescriptor[Long]("fire-time", new Min,classOf[Long])
  15. @throws[Exception]
  16. override def onElement(element: Object, timestamp: Long, window: W, ctx: ): TriggerResult = {
  17. val fireTimestamp = (stateDesc)
  18. val timestamp =
  19. if ( == null) {
  20. val start = timestamp - (timestamp % interval)
  21. val nextFireTimestamp = start + interval
  22. (nextFireTimestamp)
  23. (nextFireTimestamp)
  24. return
  25. }
  26. }
  27. @throws[Exception]
  28. override def onEventTime(time: Long, window: W, ctx: ) =
  29. @throws[Exception]
  30. override def onProcessingTime(time: Long, window: W, ctx: ): TriggerResult = {
  31. val fireTimestamp = (stateDesc)
  32. if ( == time) {
  33. ()
  34. (time + interval)
  35. (time + interval)
  36. return
  37. }
  38. }
  39. @throws[Exception]
  40. override def clear(window: W, ctx: ): Unit = {
  41. val fireTimestamp = (stateDesc)
  42. val timestamp =
  43. (timestamp)
  44. ()
  45. }
  46. override def canMerge = true
  47. override def onMerge(window: W, ctx: ): Unit = {
  48. (stateDesc)
  49. }
  50. @VisibleForTesting def getInterval: Long = interval
  51. override def toString: String = "ContinuousProcessingTimeTrigger(" + interval + ")"
  52. }
  53. import
  54. private class Min extends ReduceFunction[Long] {
  55. override def reduce(value1: Long, value2: Long): Long = (value1, value2)
  56. }

keyed

定义keys:
        tuples(元组)
        字段表达式Field Expressions
            1、在pojo中直接选中某个字段
            // some ordinary POJO (Plain old Java Object)
            public class WC {
              public String word;
              public int count;
            }
            DataStream<WC> words = // [...]
            DataStream<WC> wordCounts = ("word").window(/*window specification*/);
            2、Select Tuple fields by their field name or 0-offset field index. For example "f0" and "5" refer to the first and sixth field of a Java Tuple type, respectively.
            3、You can select nested fields in POJOs and Tuples. For example ""
            4、You can select the full type using the "*" wildcard expressions. This does also work for types which are not Tuple or POJO types.
        Key Selector Functions
            // some ordinary POJO
            public class WC {public String word; public int count;}
            DataStream<WC> words = // [...]
            KeyedStream<WC> keyed = words
              .keyBy(new KeySelector<WC, String>() {
                 public String getKey(WC wc) { return ; }
               });
        Specifying Keys
            Some transformations (join, coGroup, keyBy, groupBy) require that a key be defined on a collection of elements. Other transformations (Reduce, GroupReduce, Aggregate, Windows) allow data being grouped on a key before they are applied.