Apache Flink PMC(项目管理委员)很高兴地宣布发布 Apache Flink 1.17.0。Apache Flink 是领先的流处理标准,流批统一的数据处理概念在越来越多的公司中得到认可。得益于我们出色的社区和优秀的贡献者,Apache Flink 在 Apache 社区中一直保持着快速增长,并且是最活跃的社区之一。Flink 1.17 有 172 位贡献者热情参与,完成了 7 个 FLIP 和 600 多个 issue,为社区带来了许多令人兴奋的新功能和改进。
为了在 流式数仓 [1] 领域实现更高效的处理,Flink 1.17 对批处理和流处理的性能和语义都进行了实质性的改进。这些增强措施代表了朝着创建一个更高效、更简化的数据仓库,能够实时处理大量数据的目标迈进了一大步。• Streaming Warehouse API: FLIP-282 [2] 在 Flink SQL 中引入了新的 Delete 和 Update API,它们可以在 Batch 模式下工作。在此基础上,外部存储系统比如 Flink Table Store 可以通过这些新的 API 实现行级删除和更新。同时对 ALTER TABLE 语法进行了增强,包括 ADD/MODIFY/DROP 列、主键和 watermark 的能力,这些增强使得用户更容易维护元数据。
• Batch 性能优化: 在 Flink 1.17 中,批处理作业的执行在性能、稳定性和可用性方面都得到了显着改进。就性能而言,通过策略优化和算子优化,如新的 join-reorder 算法和自适应的本地哈希聚合优化、Hive 聚合函数改进以及混合 shuffle 模式优化,这些改进带来了 26% 的 TPC-DS 性能提升。就稳定性而言,Flink 1.17 预测执行可以支持所有算子,自适应的批处理调度可以更好的应对数据倾斜场景。就可用性而言,批处理作业所需的调优工作已经大大减少。自适应的批处理调度已经默认开启,混合 shuffle 模式现在可以兼容预测执行和自适应批处理调度,同时所需的各种配置都进行了简化。
• SQL Client/Gateway: Apache Flink 1.17 支持了 SQL Client 的 gateway 模式,允许用户将 SQL 提交给远端的 SQL Gateway。同时,用户可以在 SQL Client 中使用 SQL 语句来管理作业,包括查询作业信息和停止正在运行的作业等。这表示 SQL Client/Gateway 已经演进为一个作业管理、提交工具。针对流处理,Flink 1.17 完成了以下功能和改进:• Streaming SQL 语义增强: 非确定性操作可能会导致不正确的结果或异常,这在 Streaming SQL 中是一个极具挑战性的话题。Flink 1.17 修复了不正确的优化计划和功能问题,并且引入了实验性功能 PLAN_ADVICE,PLAN_ADVICE 可以为 SQL 用户提供潜在的正确性风险提示和 SQL 优化建议。• Checkpoint 改进: 通用增量 Checkpoint(GIC)增强了 Checkpoint 的速度和稳定性,Unaligned Checkpoint (UC) 在作业反压时的稳定性也在 Flink 1.17 中提高至生产可用级别。此外,该版本新引入一个 REST API 使得用户可以触发自定义 Checkpoint 类型的 Checkpoint。• Watermark 对齐完善: 高效的 watermark 处理直接影响 event time 作业的执行效率,在 Flink 1.17 中, FLIP-217 [3] 通过对 Source 算子内部的 split 进行数据对齐发射,完善了 watermark 对齐功能。这一改进使得 Source 中 watermark 进度更加协调,从而减轻了下游算子的缓存过多数据,增强了流作业执行的整体效率。• StateBackend 升级: 此次发布将 FRocksDB [4] 的版本升级到了 6.20.3-ververica-2.0,对 RocksDBStateBackend 带来了许多改进。例如在 slot 之间共享内存,支持 Apple Silicon 芯片组,如 Mac M1。Flink 1.17 版本还提供了参数扩大 TaskManager 的 slot 之间共享内存的范围,提升了 TaskManager 中 slot 内存使用不均匀时的效率。作为的流批一体的计算引擎,Apache Flink 在流处理领域持续领先,为了进一步增强其批处理能力,Flink 社区贡献者在 Flink 1.17 版本的批处理的性能优化和生态完善方面付出了诸多努力。这让用户可以更轻松地基于 Flink 构建 Streaming Warehouse。
预测执行
在此次发布中,预测执行支持了 Sink 算子。在之前的版本中,为了避免不稳定性或不正确的结果,预测执行不会发生在 Sink 算子上。Flink 1.17 丰富了 Sink 的上下文信息,使得 新版 Sink [5] 和 OutputFormat Sink [6] 都能获取到当前执行实例的序号(attempt number),根据这个序号,Sink 算子可以将同一子任务的多个不同实例生成的数据进行隔离,即使这些实例在同时运行。FinalizeOnMaster 接口也进行了改进,以便 OutputFormat Sink 可以知道哪些序号的实例成功产出了数据,从而正确地提交结果数据。当 Sink 的开发者确定该 Sink 可以正确的支持多个并发实例同时运行,就可以使其实现装饰性接口 SupportsConcurrentExecutionAttempts,从而允许其进行预测执行。一些内置 Sink 已经支持了预测执行,包括 DiscardingSink、PrintSinkFunction、PrintSink、FileSink、FileSystemOutputFormat 和 HiveTableSink。此外,预测执行的慢任务的检测也获得了改进。在之前,在决定哪些任务是慢任务时只考虑了任务的执行时间。现在,慢任务检测器还会考虑了任务的输入数据量。执行时间较长的任务,如果消费了更多的数据,不一定会被视为慢任务。这一改进有助于消除数据倾斜对慢任务检测的负面影响。
自适应批处理调度器
在此次发布中,自适应批处理调度器成为了批作业的默认调度器。该调度器可以根据每个 job vertex 处理的数据量,自动为其设置合适的并行度。这也是唯一一个支持预测执行的调度器。自适应批调度器的配置得到了改进,以提高其易用性。用户不再需要显式将全局默认并行度设置为-1 来开启自动推导并行度功能。现在,如果设置了全局默认并行度,其会被用做自动推导并行度的上界。一些配置项的名称也进行了改进,以便于用户理解。此外,自适应批处理调度器的能力也得到了增强。现在它可以根据细粒度的数据分布信息,将数据更均匀的分配给下游任务。自动推导的并行度现在也不再被限制为 2 的幂。
混合 Shuffle 模式
此次发布中,混合 Shuffle 模式带来了多个重要改进:• 混合 Shuffle 模式现在支持自适应批调度器和预测执行。
• 混合 Shuffle 模式现在支持重用中间数据,这带来了显着的性能改进。
• 提高了稳定性,避免了在大规模生产环境中出现的稳定性问题。更多详细信息可以在 混合 Shuffle [7] 部分找到。
TPC-DS
从 Flink 1.16 开始,Flink 社区持续优化批处理引擎的性能。在 Flink 1.16 中,引入了动态分区裁剪优化,但并非所有的 TPC-DS 查询都可以被优化。Flink 1.17 对该优化的算法进行了改进,使得大部分 TPC-DS 查询都可以被优化。此外,Flink 1.17 中引入了动态规划 join-reorder 算法,与之前版本的算法相比,该算法效果更好,但搜索空间更大。优化器可以根据查询中 join 个数自动选择合适的 join-reorder 算法,用户无需关心 join-reorder 算法的细节(注意:join-reorder 默认未开启,在运行 TPC-DS 时需要显式启用)。在算子层面,Flink 1.17 引入了动态 local hash aggregation 策略,可以根据数据的分布动态确定是否需要在本地进行聚合操作以提高性能。在运行时层面上,此次发布移除了一些不必要的虚拟函数调用,以加快执行速度。从整体测试结果上看,相比 Flink 1.16,对于分区表在 10T 数据集下 Flink 1.17 有 26% 的性能提升。
SQL Client/Gateway
Apache Flink 1.17 引入了一个名为"gateway 模式" 的新功能,允许用户将 SQL 查询提交到远程的 SQL Gateway 从而像 embedded 模式一样来使用 Gateway 的各种功能。这种新模式为用户在使用 SQL Gateway 时提供了更多的便利。此外,SQL Client/SQL Gateway 现在支持通过 SQL 语句来管理作业生命周期。用户可以使用 SQL 语句显示存储在 JobManager 中的所有作业信息,可以使用作业的唯一作业 ID 来停止运行中的作业。借助这个新功能,SQL Client/SQL Gateway 现在几乎拥有了与 Flink CLI 相同的功能,成为管理 Flink 作业的另一个更强大的工具。
SQL API
在现代大数据工作流中,SQL 引擎的行级删除和更新能力变得越来越重要。应用场景包括为了符合监管要求而删除特定一组数据、为了进行数据订正而更新一行数据等。许多流行的计算引擎比如 Trino、Hive 等已经提供了这类支持。Flink 1.17 为 Batch 模式引入了新的 Delete 和 Update API,并将其暴露给连接器,这样外部存储系统便可以基于这个 API 实现行级更新和删除。此外,此次发布还扩展了 ALTER TABLE 语法,包括 ADD/MODIFY/DROP 列、主键和 Watermark 的能力。这些功能增强提升了用户按需维护元数据的灵活性。Apache Flink 1.17 支持了 SQL Client 的 gateway 模式,允许用户将 SQL 查询提交给 SQL Gateway 来使用 Gateway 的各种功能。用户可以使用 SQL 语句来管理作业的生命周期,包括显示作业信息和停止正在运行的作业,这为管理 Flink 作业提供了一个强大的工具。
Hive 兼容
Apache Flink 1.17 对 Hive connector 进行了一系列改进,使其更加生产可用。在之前的版本中,对于 Hive 的写入,只支持在流模式下自动地进行文件合并,而不支持批模式。从 Flink 1.17 开始,在批模式下也能自动地进行文件合并,这个特性可以大大减少小文件的数量。同时,对于通过加载 HiveModule [8] 来使用 Hive 内置函数的场景,此次发布引入了一些原生的 Hive 聚合函数如 SUM/COUNT/AVG/MIN/MAX 进 HiveModule 中,这些函数可以在基于哈希的聚合算子上执行,从而带来显著的性能提升。Flink 1.17 解决了一些棘手的 Streaming SQL 语义和正确性问题,优化了 Checkpoint 性能,完善了 watermark 对齐机制,扩展了 Streaming FileSink,升级了 Calcite 和 FRocksDB 到更新的版本。这些提升进一步巩固了 Flink 在流处理领域的领先地位。
Streaming SQL 语义完善
为了解决正确性问题并完善 Streaming SQL 语义,Flink 1.17 引入了一个实验性功能叫 PLAN_ADVICE [9] ,该功能可以检测用户 SQL 潜在的正确性风险,并提供优化建议。例如,如果用户通过 EXPLAIN PLAN_ADVICE 命令发现查询存在 NDU(非确定性更新) [10] 问题,优化器会在物理计划输出的末尾追加建议,建议会标记到对应操作节点上,并提示用户更新查询和配置。通过提供这些具体的建议,优化器可以帮助用户提高查询结果的准确性。
== Optimized Physical Plan With Advice ==...advice[1]: [WARNING] The column(s): day(generated by non-deterministic function: CURRENT_TIMESTAMP ) can not satisfy the determinism requirement for correctly processing update message('UB'/'UA'/'D' in changelogMode, not 'I' only), this usually happens when input node has no upsertKey(upsertKeys=[{}]) or current node outputs non-deterministic update messages. Please consider removing these non-deterministic columns or making them deterministic by using deterministic functions.
PLAN_ADVICE 功能还可以帮助用户提高查询的性能和效率。例如,如果检测到聚合操作可以优化为更高效的 local-global 聚合操作,优化器会提供相应的优化建议。通过应用这些具体的建议,优化器可以帮用户提高其查询的性能和效率。
== Optimized Physical Plan With Advice ==...advice[1]: [ADVICE] You might want to enable local-global two-phase optimization by configuring ('table.optimizer.agg-phase-strategy' to 'AUTO').
此外 Flink 1.17 还修复了多个可能影响数据正确性的 plan 优化问题,如:FLINK-29849 [11] , FLINK-30006 [12] , 和 FLINK-30841 [13] 等。
Watermark 对齐增强
在早期版本中, FLIP-182 [14] 提出了一种称为 watermark 对齐的解决方案,以解决 event time 作业中的源数据倾斜问题。但是,该方案存在一个限制,即 Source 并行度必须和分区数匹配。这是因为具有多个分区的 Source 算子中,如果一个分区比另一个分区更快地发出数据,此时需要缓存大量数据。为了解决这个限制,Flink 1.17 引入了 FLIP-217 [15] ,它增强了 watermark 对齐考虑 watermark 边界的情况下对 Source 算子内的多个分区进行数据发射对齐。这个增强功能确保了 Source 中的 Watermark 前进更加协调,避免了下游算子缓存过多的数据,从而提高了流作业的执行效率。
Streaming FileSink 扩展
在添加 ABFS 支持之后,Streaming FileSink [16] 现在可以支持五种不同的文件系统:HDFS、S3、OSS、ABFS 和 Local。这个扩展有效地覆盖了主流文件系统,为用户提供了更多的选择和更高的灵活性。
Checkpoint 改进
表格-1: 在 WordCount 中开启 GIC 后的收益
|
Checkpoint
耗时
|
增量大小
|
数据回放量
|
cpu 使用量
|
网络流量
|
最大值
|
平均值
|
最大值
|
平均值
|
开启 GIC vs
关闭 GIC
|
-79.5% |
-95%
|
-86.7%
|
-39.3%
|
-46.8% |
-53% |
-67.7% |
表格-2: 在 WordCount 中开启 GIC 后的开销
|
最大处理性能 |
全量 Checkpoint 大小 |
开启 GIC vs 关闭 GIC |
-2% |
+30% |
Unaligned Checkpoint (UC) 可以大大提高反压下 Checkpoint 的完成率。之前版本的 UC 会写入过多的小文件,进一步可能会导致 HDFS 的 namenode 负载过高。社区在 1.17 版本中解决了该问题,使 UC 在生产环境中更加可用。
Flink 1.17 版本提供了一个 REST API [18] ,用户基于该 API 可以在作业运行时手动触发具有自定义 Checkpoint 类型的 Checkpoint。例如,对于使用增量 Checkpoint 运行的作业,用户可以定期或手动触发全量 Checkpoint 来去除多个增量 Checkpoint 之间的关联关系,从而避免引用很久以前的文件。
RocksDBStateBackend 升级
Flink 1.17 版本将 FRocksDB [4] 的版本升级到 6.20.3-ververica-2.0,为 RocksDBStateBackend 带来了一些改进:1. 支持在 Apple 芯片上构建 FRocksDB Java
2. 通过避免昂贵的 ToString() 操作提高 Compaction Filter 的性能
3. 升级 FRocksDB 的 ZLIB 版本,避免 Memory Corruption
4. 为 RocksJava 添加 periodic_compaction_seconds 选项可以参考 FLINK-30836 [19] 了解更多详细信息。Flink 1.17 版本还提供了参数扩大 TaskManager 的 slot 之间共享内存的范围,这种方式可以在 TaskManager 中 slot 内存使用不均匀时提高内存效率。基于此在调整参数后可以以资源隔离为代价来降低整体内存消耗。请参考 state.backend.rocksdb.memory.fixed-per-tm [20]了解更多相关信息。
Calcite 升级
Flink 1.17 将 Calcite [21] 版本升级到 1.29.0 以提高 Flink SQL 系统的性能和效率。Flink 1.16 使用的是 Calcite 1.26.0 版本,该版本存在 SEARCH 操作符引发的 RexNode 简化等严重问题,这些问题会导致查询优化后产生错误的数据,如 CALCITE-4325 [22] 和 CALCITE-4352 [23] 所报告的案例。通过升级到该版本的 Calcite,Flink 可以在 Flink SQL 中利用其功能改进和新特性。这不仅修复了多个 bug,同时加快了查询处理速度。
PyFlink
在 Flink 1.17 中,PyFlink 也完成了若干功能,PyFlink 是 Apache Flink 的 Python 语言接口。PyFlink 中,一些比较重要的改进包括支持 Python 3.10、支持在 Mac M1 和 M2 电脑上运行 PyFlink 等。此外,在该版本中还完成了一些小的功能优化,比如改进了 Java 和 Python 进程之间的跨进程通信的稳定性、支持以字符串的方式声明 Python UDF 的结果类型、支持在 Python UDF 中访问作业参数等。总体来说,该版本主要专注于改进 PyFlink 的易用性,而不是引入一些新的功能,期望通过这些易用性改进,改善用户的使用体验,使得用户可以更高效地进行数据处理。
性能监控 Benchmark
这个版本周期中,我们也在 Slack 频道( #flink-dev-benchmarks [24] )中加入了性能日常监控汇报来帮助开发者快速发现性能回退问题,这对代码质量保证非常有意义。通过 Slack 频道或 Speed Center [25] 发现性能回退后,开发者可以按照 Benchmark's wiki [26] 中方式处理它。
Task 级别火焰图
从 Flink 1.17 版本开始,Flame Graph 功能提供了针对 task 级别的可视化支持,使得用户可以更详细地了解各个 task 的性能。该功能是相比于之前版本的 Flame Graph 的重大改进,因为它可以让用户选择感兴趣的 subtask 并查看相应的火焰图。通过这种方式,用户可以确定任务可能出现性能问题的具体区域,然后采取措施加以解决。这可以显著提高用户数据处理管道的整体效率。
通用的令牌机制
在 Flink 1.17 之前,Flink 只支持 Kerberos 认证和基于 Hadoop 的令牌。随着 FLIP-272 [27] 的实现,Flink 的委托令牌框架更加通用,使其认证协议不再局限于 Hadoop。这将允许贡献者在未来可以添加对非 Hadoop 框架的支持,这些框架的认证协议可以不用基于 Kerberos。此外, FLIP-211 [28] 改进了 Flink 与 Kerberos 的交互,减少了在 Flink 中交换委托令牌所需的请求数量。
升级说明
Apache Flink 社区努力确保升级过程尽可能平稳, 但是升级到 1.17 版本可能需要用户对现有应用程序做出一些调整。请参考 Release Notes [29] 获取更多的升级时需要的改动与可能的问题列表细节。
贡献者列表
Apache Flink 社区感谢对此版本做出贡献的每一位贡献者:Ahmed Hamdy, Aitozi, Aleksandr Pilipenko, Alexander Fedulov, Alexander Preuß, Anton Kalashnikov, Arvid Heise, Bo Cui, Brayno, Carlos Castro, ChangZhuo Chen (陳昌倬), Chen Qin, Chesnay Schepler, Clemens, ConradJam, Danny Cranmer, Dawid Wysakowicz, Dian Fu, Dong Lin, Dongjoon Hyun, Elphas Toringepi, Eric Xiao, F* Paul, Ferenc Csaky, Gabor Somogyi, Gen Luo, Gunnar Morling, Gyula Fora, Hangxiang Yu, Hong Liang Teoh, HuangXingBo, Jacky Lau, Jane Chan, Jark Wu, Jiale, Jin, Jing Ge, Jinzhong Li, Joao Boto, John Roesler, Jun He, JunRuiLee, Junrui Lee, Juntao Hu, Krzysztof Chmielewski, Leonard Xu, Licho, Lijie Wang, Mark Canlas, Martijn Visser, MartijnVisser, Martin Liu, Marton Balassi, Mason Chen, Matt, Matthias Pohl, Maximilian Michels, Mingliang Liu, Mulavar, Nico Kruber, Noah, Paul Lin, Peter Huang, Piotr Nowojski, Qing Lim, QingWei, Qingsheng Ren, Rakesh, Ran Tao, Robert Metzger, Roc Marshal, Roman Khachatryan, Ron, Rui Fan, Ryan Skraba, Salva Alcántara, Samrat, Samrat Deb, Samrat002, Sebastian Mattheis, Sergey Nuyanzin, Seth Saperstein, Shengkai, Shuiqiang Chen, Smirnov Alexander, Sriram Ganesh, Steven van Rossum, Tartarus0zm, Timo Walther, Venkata krishnan Sowrirajan, Wei Zhong, Weihua Hu, Weijie Guo, Xianxun Ye, Xintong Song, Yash Mayya, YasuoStudyJava, Yu Chen, Yubin Li, Yufan Sheng, Yun Gao, Yun Tang, Yuxin Tan, Zakelly, Zhanghao Chen, Zhenqiu Huang, Zhu Zhu, ZmmBigdata, bzhaoopenstack, chengshuo.cs, chenxujun, chenyuzhi, chenyuzhi459, chenzihao, dependabot[bot], fanrui, fengli, frankeshi, fredia, godfreyhe, gongzhongqiang, harker2015, hehuiyuan, hiscat, huangxingbo, hunter-cloud09, ifndef-SleePy, jeremyber-aws, jiangjiguang, jingge, kevin.cyj, kristoffSC, kurt, laughingman7743, libowen, lincoln lee, lincoln.lil, liujiangang, liujingmao, liuyongvs, liuzhuang2017, luoyuxia, mas-chen, moqimoqidea, muggleChen, noelo, ouyangwulin, ramkrish86, saikikun, sammieliu, shihong90, shuiqiangchen, snuyanzin, sunxia, sxnan, tison, todd5167, tonyzhu918, wangfeifan, wenbingshen, xuyang, yiksanchan, yunfengzhou-hub, yunhong, yuxia Luo, yuzelin, zhangjingcun, zhangmang, zhengyunhong.zyh, zhouli, zoucao, 沈嘉琦
https://developer.aliyun.com/article/851771?spm=a2c6h.12873639.article-detail.6.4ff859e2A1PFCU
[2] FLIP-282:
%3A+Support+watermark+alignment+of+source+splits[6] OutputFormat Sink
/flink-core/src/main/java/org/apache/flink/api/common/io/OutputFormat.java
[7] 混合 Shuffle:
[8] HiveModule:
[9] PLAN_ADVICE:
https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/dev/table/sql/explain/#explaindetails
[10] NDU(非确定性更新):
https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/dev/table/concepts/determinism/#3-%E6%B5%81%E4%B8%8A%E7%9A%84%E7%A1%AE%E5%AE%9A%E6%80%A7
[11] FLINK-29849:
[12] FLINK-30006:
[13] FLINK-30841:
[14] FLIP-182:
%3A+Support+watermark+alignment+of+FLIP-27+Sources
[15] FLIP-217:
%3A+Support+watermark+alignment+of+source+splits
[16] FileSink:
[17] 性能测评文章:
https://mp.weixin.qq.com/s/8662I8knfYTUMQ-3plqUKQ
[18] REST API:
[19] FLINK-30836:
[20] state.backend.rocksdb.memory.fixed-per-tm:
[21] Calcite:
[22] CALCITE-4325:
[23] CALCITE-4352:
[24] #flink-dev-benchmarks:
[25] Speed Center:
[26] Benchmark's wiki:
[27] FLIP-272:
%3A+Generalized+delegation+token+support
[28] FLIP-211:
%3A+Kerberos+delegation+token+framework
[29] Release Notes: