本文整理自小米大数据部高级软件工程师张蛟在 Flink Forward Asia 2021 生产实践专场的演讲。主要内容包括:
1.发展现状和规模;
2.稳定性优化及实践
3.运维优化及实践
4.未来规划与展望
01
发展现状及规模
现阶段,我们的整体架构可以分成5层,数据从下往上流动,如上图。
数据采集层主要负责收集各类数据,数据的来源分为两类,一类是埋点和业务日志以及服务日志,经由 LCS Agent 进行采集,另一类是数据库数据经由 Binlog 或 Checkpoint 数据集成等方式收集到消息队列中。以 Flink、Spark 为主的计算层对其进行处理,并最终存储到各类存储和查询服务中,供业务使用。Flink 是计算层实时和准实时处理的主要框架,在其中正发挥着越来越重要的作用,尤其是 Flink+Iceberg 数据湖技术,正在让流批一体成为现实。
目前我们的集群上运行着 3000 多个作业,主力版本是 1.12,1.14 版本也已经合并上线,日均处理 10 万亿+ 条消息,PB 级的数据量,峰值数据 2 亿条/秒,运行在国内外 10 多个集群,使用超过 45000 个 CPU core,内存使用超过 200tb。
在这样规模的数据处理过程中,我们遇到了许多问题。
作业内存占用不可控,on Yarn 模式非常容易出现 Yarn container OOM kill,导致 container lost,引发作业频繁重启,包括框架内重启。
on Yarn 模式无法支持作业自动平滑重启,在机器过保、下线、机房迁移等过程中,只能触发 failover。
实时作业对负载较为敏感,启动和运行的过程中需要保证机器性能,避免因离线和在线混部造成影响。
Checkpoint 作为 Flink 有状态计算数据一致性的保障,存在稳定性问题。
historyserver 默认的清理策略不好设置,导致占用的磁盘空间比较大,访问慢。
作业异常时难以确定异常原因和节点,需要查看大量的作业日志,导致故障排查困难。
02
稳定性优化及实践
首先是 Yarn container lost 的优化。Flink JobManager 首先会向 Yarn reCheckpointmanager 申请资源,Yarn reCheckpointmanager 为该申请分配资源后将分配信息返回给 JobManager,然后 JobManager 会根据分配信息去启动 taskmanager,并使之与 JobManager 进行心跳。
JobManager 包括 JobMaster 和 reCheckpointmanager,它会主动发送心跳请求,探测 taskmanager 是否存活。如果 taskexecutor 因为某些原因意外被 kill,JobManager 的日志中就会提示 container lost。
上图是 container lost 现象的提示之一,一般老版本的 Flink 中出现比较多。
上图是 container lost 现象的另一种提示。
在出现 container lost 时,如果去查看 Yarn的nodemanager 或 JobManager 中异常前后的日志,一般都可以看到类似 beyond the physical memory limit 的日志,这表明它是因为物理内存使用超限被 Yarn kill。
这里需要先介绍一下 Yarn 控制内存超用的方式,Yarn Nodemanager 会启动一个 containersmonitor 的线程,这个线程会定期扫描 Nodemanager 上的 container 内存占用,从而实现内存资源的隔离。
简单来说,如果某个 container 对应进程树中所有年龄大于 0 的进程,总内存使用量超过申请量的两倍,或所有年龄大于 1 的进程,总内存使用量超过上限,就表明其内存超用,需要被 kill。
但实际上这种方式存在一定的问题:
一是定期扫描对于内存突增的隔离性比较差,可能还没有开始扫描就已经达到系统总内存上限,导致被系统 kill;
二是 Yarn 通常会开启节点资源的超卖,此时如果所有资源都被使用,会导致节点不稳定;
三是如果作业只是临时的内存需求,即使此时节点仍有富余内存,也会触发 kill。
针对这些问题,我们采用 Cgroup + JDK升级 + Jemalloc 的方式进行了优化。可能有人会问为什么需要进行 JDK 升级?这是因为老版本的 JDK 使用 Jemalloc 存在线程死锁的问题,另外升级最新的 JDK 也能避免其他的 JDK bug,通常这类 bug 都不容易被找到和复现。
Cgroup 的方式主要是开启内存软限制,它对 container 的内存限制不再是基于单个 container 的内存申请量,而是整个 Nodemanager 的内存量。这个时候如果 NodeManager 上仍有富余内存,内存超用的 container 就可以接着使用这些富余的内存。一个节点上同时存在多个 Container 内存超用导致整个节点内存达到上限,才会触发 oom event。Oom listener 对该事件进行监听并判断,如果达到节点总内存就会选取内存实际占用量超过申请量且启动时间最短、优先级最低的作业触发 oom kill。
然而,Cgroup 只是在一定程度上解决了 container 频繁被 Yarn oom kill 导致 lost 的问题,并没有完全彻底地解决。在使用的过程中,依然存在某些 container 的内存使用持续上涨,最终被 cgroup oom kill 的情况,然后我们发现该问题可能与 glibc 的内存分配 bug 有关,长期运行的进程会存在连续多块大小为 65536 的 anon 块,所以我们最终的解决方案如下:
使用 Cgroup 解决内存临时超用的问题,比如 RocksDB 对内存的限制不严格、小白用户对内存的设置和使用不正确等造成的问题,然后升级 JDK 版本,解决 Jemalloc 分配时的线程死锁 bug,最后切换 Jemalloc,解决 Linux 系统下的 64M anon 分配 bug。
经过一系列的优化,从上图可以看出,container lost 的频率由每月的近 5000 次减少到不到 100 次,因 Yarn oom kill 造成的作业异常重启减少了 90% 以上,效果显著。
第二个优化实践是节点的平滑重启功能,流式作业是长时间运行的作业,由于大部分都运行在廉价的机器上,因此机器出现过保、硬件故障、维修下线、机房迁移等都比较常见。为了提前预防可能出现的隐患,避免框架重启造成的影响,提升云环境下作业的稳定性,解决 Yarn 模式下恢复时间过长带来的问题,我们开发了作业的平滑重启功能。
将节点加入到 exclude 后,Flink recheckpoint manager 会获取到 decommission 的信息,通过解析该信息得到对应的节点,并判断当前运行任务的 container 是否运行在被 decommission 的节点上。如果是,就通过调用任务的 JobManager 的 stop with savepoint 接口去停止。平台会自动检测任务的运行状态,如果某个作业不是通过平台停止,则平台会自动将该任务重新拉起,作业从 savepoint 恢复。这个过程会进行周期性的触发并批量合并后再处理,避免消息频繁触发造成瞬时负载压力。此外,节点和 container 都会进行去重,避免对同一任务多次触发影响稳定性。另外它的触发周期远小于 sre 在下线节点时设置的下线周期,也缓解了运维压力。
JobManager 会启动指标收集监控线程,并周期性地采集节点的 CPU、内存、磁盘 io 和网络 io 等指标,然后汇聚成指标集合,通过动态指标规则对指标进行判定,如果满足条件就会将其加入到节点黑名单,这样该 Application 的 container 便不会再运行在这个节点上。如果某个节点被多个 application 加入黑名单,则表明该节点可能存在问题,会自动触发作业平滑重启,并进行监控报警,以此来自动发现可能的异常节点。
上图是 Flink Checkpoint 的大致流程,Checkpoint coordinator 会触发 Checkpoint Operator 进行 Checkpoint,Checkpoint Operator 生成并向下游广播 Checkpoint Barrier,然后 Snapshot State。Checkpoint Operator 完成 Checkpoint 后进行 ack,下游节点收到 Checkpoint Barrier 后,根据是否要进行对齐做对应的处理,然后进入 Checkpoint 逻辑。所有的节点都向 Checkpoint Coordinateor ack 之后,表示该次 Checkpoint 已经完成,接着向所有参与 Checkpoint 的 Operator 发送完成通知,最后 Operator 做最后的提交操作等。
Checkpoint 过程中遇到的问题包括以下这些:
磁盘满或其他 io 异常,会导致 Checkpoint 长期无法触发,但异常信息只存在于 JobManager 的日志中,并不影响作业的正常执行,导致潜在的隐患不容易被感知。
作业因逻辑变更、调整并发、重新调度等原因,重启时默认不会从 Checkpoint 恢复,导致状态丢失或者消息积压。
大并发度时 Checkpoint 小文件过多,引发大量的 HDFS RPC 负载压力。
用户错误配置 Checkpoint 目录引发的恢复冲突非常不容易控制,也不易排查。
针对以上问题,我们也进行了一些优化。
针对磁盘满、io 异常、Kerberos 文件损坏的问题,我们会捕获异常栈,根据异常栈进行判断和重试,并在失败时增加 Checkpoint 的失败计数,超过一定次数则进行框架内的重启,或向用户发送告警,保证作业不会出现长时间的 Checkpoint 失败而从一个非常老的 Checkpoint 恢复。
针对作业重启时无法从 Checkpoint 恢复的问题,优化方式是对每个作业设置默认的保留数量,并在进行 Checkpoint 时先生成一个临时的 Checkpoint Metadata 文件,只有在 Finalize 时才会被 rename 成正式的文件。接着将所有 Checkpoint 文件按最后修改时间降序排序,在其中寻找正式的 Checkpoint Metadata 文件。如果成功则表明其是一个完备的、可用于恢复的 Checkpoint 文件。
在这样的设定下,必须确保文件最后修改时间的正确性。为此我们设置了任务 finish 默认不删除 Checkpoint 文件,任务在做 Savepoint 时默认不 discard 最新的 Checkpoint 文件,以确保这两类文件最后修改时间的正确性。通过以上方式保证了任务能自动从最新的、完备的状态进行恢复,需要重新处理的数据和状态尽量少。另外,如果任务已经找到最新的、完备的 Checkpoint 并可以用来恢复,这表明前面的 Savepoint 和 Checkpoint 已经可以清理,由此减少空间的占用。
于是我们通过为 Savepoint 设置生命周期来清理全量 Savepoint;对于增量的 Checkpoint,为了避免清除掉正在使用的状态,会先去读取其 Metadata 文件的内容,将其中用到的状态文件对应的父文件夹保留,其余的进行清理,从而确保在不影响状态恢复的前提下,尽量减少文件数和空间占用。
针对用户随意配置 Checkpoint 目录导致状态恢复冲突和引发负载压力的问题,通过在 Metadata 文件中增加作业名和时间戳,当前的作业名与存储的作业名不同则会提示告警信息,恢复的 Checkpoint 的时间戳与当前时间存在较大的差异,也会有告警信息。
小文件是使用 HDFS 经常遇到的问题,由于 HDFS 适合于存储大块文件,所以必须对小文件进行优化来提升性能和稳定性。方法是在进行 Checkpoint 时对小文件写入进行合并,比如将多个小文件写入到 sequence file 中,形成一个大的文件,这可能会造成空间浪费,但是对降低 HDFS Namenode 负载压力效果比较明显。
此外通过联邦集群的方式,使用多个 Namenode 均衡 RPC 请求负载,每一个 Namenode 都是一个相对独立的服务,然后对用户作业规范其 Checkpoint 目录,使其访问能够被均衡到多个 Namenode 上,再对旧的 HDFS 文件通过挂载表的形式读旧写新,逐步实现自动迁移到新的统一的规范目录下。
接下来介绍一个案例,该案例来自小米数据采集服务,图示是他们非常简单的架构图,主要是将多个源端 SDK 的埋点和数据收集到消息队列中,然后使用 Flink 进行 ETL,最终存储到 Doris 中并在看板上进行展示。
目前该业务已经接入 750+ 国内外业务,日均处理 1600亿+ 条消息。通过采用 Checkpoint 相关的优化手段,将 RPC 延迟降低了约 40%,减少了小文件。同时在作业通过 stop with savepoint 启停时,保证了恢复的正确性,确保了 exactly once 的语义。
03
运维优化实践
Flink Historyserver 对作业运维非常有效,尤其是它能在作业停止后,查看作业的统计信息,如果作业异常退出或处理结果有问题,我们又因为一些原因无法及时查看相关日志,就可以在将来通过 Historyserver 查看。
Flink Historyserver 会在每一次定时清理时获取上一次清理已经被缓存的作业 ID,再获取本次已经打包的历史日志信息,然后判断历史日志是否已经超过了配置的最大值,若是,就会将后面的历史日志直接执行清理,否则就会判断上一次缓存的作业在当次历史日志中是否存在,如果不存在也会执行清理。
但上述流程存在一系列的问题,一个是服务重启会造成当前缓存的已下载的作业信息丢失,如果在重启之间该作业的历史日志也丢失,就会形成悬浮的缓存作业,本地缓存的作业将会长期存在,无法清理。当前已打包的历史日志信息不支持过期,导致大量的日志存留于 HDFS 和本地磁盘,且会长期存在,不仅影响访问的速度,也会造成磁盘空间的较大浪费。缓存下来的作业历史日志最大值难以确定,基础服务如 HDFS 等如果出现异常,会导致同时出现大量失败,冲走有效日志。另外当前默认并没有记录 Taskmanager 上的日志,非常不利于异常排查,
针对上述问题我们也做了相应优化。
一个是读取当前已经缓存到本地磁盘的作业历史日志信息,并将其与历史日志记录进行对比,从而避免出现悬浮的缓存作业;支持历史日志的最长保留时间,超过其生命周期就会进行清理,相比于当前支持的历史日志最大保留数量,更加科学合理;另外我们也支持了 Taskmanager 和 Container 历史数据的打包和清理,更全面地记录作业在异常退出时的各项信息,方便排查问题。
作业的全链路心跳监控功能主要是对作业的链路延时进行监控,实现方式是通过在 Stream Checkpoint 中插入特殊标记,标记信息包括作业的名称、当前的时间,名称的生成方式是 op+operator 在整个链路的 index 以及 subtask 在 operator 的 index,非 Checkpoint 节点会在收到标记后更新名称,并用当前的时间减去 Checkpoint 插入的时间生成从 Checkpoint 到该 subtask 的耗时,并上报到 Metrics Reporter 中,最终对这些 metrics 进行计算,通过这种方式可以发现链路中的异常节点,监测作业的数据异常丢失,还能够通过心跳信息的插入频率预估其影响。
心跳标记在遇到多个下游链路时并不是随机选择链路,而是同时广播到多条链路中,因此可能会出现心跳监控标记信息过多的情况,影响正常作业的处理性能。
这时就出现了一个矛盾点,全链路心跳监控采样越频繁,对各节点处理性能的监控就越及时准确,但同时也会造成信息过多、影响正常数据的处理。针对这个问题,我们进行了以下三个方面的改进和处理:
一是将 chain operator metrics 信息进行合并上报,因为它的监控信息基本相同,这样可以减少上报的数据量。
二是通过 restful 接口动态启停监控,这样只有在有异常时才会进行采样和监控,正常情况下不影响作业的运行。
三是通过对采样进行周期性的合并和处理,实现了对任务 pipeline 数据量和延迟的预估以及监控功能。
restful 接口动态启停监控功能不仅能动态启停心跳监控,我们发现还有其他场景也能从这个功能中受益,因此我们对其进行了扩展。简单的代码修改就能让它支持其他配置的动态调整,包括 Checkpoint 配置,如 Checkpoint 周期和超时时间,动态日志的级别等。
当作业出现性能或 Checkpoint 问题时,可以通过 restful 接口动态开启、问题确定后动态停止,这样就能解决心跳信息过多的问题。在负载突增、短时数据倾斜导致 Checkpoint 超时,动态调整 Checkpoint 超时时间能避免作业因 Checkpoint 超时而失败,它也能避免由于 Checkpoint 长时间不成功导致数据积压更多、数据倾斜问题更严重而陷入的死循环。同时它还能用于确定超时时间,用户可以通过动态调整的方式,不断测试最适合作业的超时时间,减少了压测过程中的作业启停次数。它也支持其他配置的调整,比如动态调整日志级别,但是需要注意的是调整后的配置并没有持久化,会因为框架重启或作业的重启而失效。
04
未来规划
未来,我们将在以下方面继续探索:
持续开发并优化自动弹性伸缩容的功能。Flink1.13 开始提供了自动弹性伸缩容的功能,但是目前并不完善,要在生产环境上用起来还需要做不少的工作。
版本收敛是很多Flink开发人员都会遇到的一个问题。Flink 社区的发展比较快,版本的发布和迭代也是非常快。为了降低运维压力,紧跟社区,这也是势在必行的。
对 state 读写性能进行优化,提升大状态作业的性能。
Heartbeat timeout 也是目前线上对稳定性影响比较大的问题,我们也会进行跟进和优化。
对作业启动和恢复性能进行优化,减少作业因各种原因造成的断流是 Flink 社区和许多业务非常关注的问题,我们同样也有面临着这样的压力。
继续打磨批流融合能力,完善对 batch 模式和数据湖等的支持,也是现在的热点,我们希望能在这上面进行更多探索,从而更好地支撑业务,也让 Flink 的应用更加广泛。