- flink sql参数配置
//关闭详细算子链(默认为true),true后job性能会略微有提升。false则可以展示更详细的DAG图方便地位性能结点 ###有用的参数
pipeline.operator-chaining: 'true'
//指定时区 ###实用的参数
table.local-time-zone: Asia/Shanghai
//对flink sql是否要敏感大小(建议false,不区分大小写。默认为true)
table.identifier-case-sensitive: 'false'
//开启 miniBatch
table.exec.mini-batch.enabled: 'true'
//批量输出的间隔时间
table.exec.mini-batch.allow-latency: 5s
//防止 OOM 设置每个批次最多缓存数据的条数
table.exec.mini-batch.size: '500'
//提交批次数据大小
batchSize: '127108864'
//刷数据间隔
flushIntervalMs: '60000'
//几个flush线程
numFlushThreads: '5'
// 写odps时压缩 :https://help.aliyun.com/zh/flink/developer-reference/maxcompute-connector
compressAlgorithm: snappy
//开启异步状态后端
state.backend.async: 'true'
//状态后端开启增量(默认就是true 增量)
state.backend.incremental: 'true'
//作业链与处理槽共享组(默认为false),开启后在针对某个操作算子增加并行度和cu等资源时,不与其他槽位共享资源,单独增加额外资源 ###有用的参数
table.exec.split-slot-sharing-group-per-vertex: 'true'
//Checkpoint间隔时间,单位为毫秒 默认180秒 ###如果作业量大,可以适当调大间隔时间。性能方便略有提升
execution.checkpointing.interval: 180s
//State数据的生命周期,单位为毫秒。默认36小时
table.exec.state.ttl: 129600000
//Checkpoint生成超时时间(默认值10分钟),当Checkpoint生成时间超过10分钟,flink会把创建生成的Checkpoint杀掉,重新再创建生成Checkpoint。如果观察自己的job生成时间过长减少被杀死Checkpoint可以调大下面时间 ###有用的参数
execution.checkpointing.timeout :10min
- datastream代码配置
// 初始化 table environment
TableEnvironment tEnv = ...
// 获取 tableEnv 的配置对象
Configuration configuration = tEnv.getConfig().getConfiguration();
// 设置参数:
// 开启 miniBatch
configuration.setString("table.exec.mini-batch.enabled", "true");
// 批量输出的间隔时间
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
// 防止 OOM 设置每个批次最多缓存数据的条数,可以设为 2 万条
configuration.setString("table.exec.mini-batch.size", "20000");
// 开启 LocalGlobal(job有聚合函数使用)
configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
// 开启 Split Distinct (job有聚合函数使用)
configuration.setString("table.optimizer.distinct-agg.split.enabled", "true");
// 第一层打散的 bucket 数目 (job有聚合函数使用)
configuration.setString("table.optimizer.distinct-agg.split.bucket-num", "1024");
// TopN 的缓存条数 (job有分组top使用)
configuration.setString("table.exec.topn.cache-size", "200000");
// 指定时区
configuration.setString("table.local-time-zone", "Asia/Shanghai");
- flink sql 简单作业优化实验截图
1).调大checkpoint生成时间
2).去掉参数:pipeline.operator-chaining: ‘false’
3).加攒批参数
4).由于full GC导致job性能过差(排查)
查看gc日志:
解决方案:对taskmanager增加内存(jobmanager略,因为它很少会出现频繁full gc)。
5).全量Checkpoint与增量Checkpoint的大小一致,是否正常?
如果您在使用Flink的情况下,观察到全量Checkpoint与增量Checkpoint的大小一致:
- 检查增量快照是否正常配置并生效。
- 是否为特定情况。在特定情况下,这种现象是正常的,例如:
a.在数据注入前(18:29之前),作业没有处理任何数据,此时Checkpoint只包含了初始化的源(Source)状态信息。由于没有其他状态数据,此时的Checkpoint实际上是一个全量Checkpoint。
b.在18:29时注入了100万条数据。假设数据在接下来的Checkpoint间隔时间(3分钟)内被完全处理,并且期间没有其他数据注入,此时发生的第一个增量Checkpoint将会包含这100万条数据产生的所有状态信息。
在这种情况下,全量Checkpoint和增量Checkpoint的大小一致是符合预期的。因为第一个增量Checkpoint需要包含全量数据状态,以确保能够从该点恢复整个状态,导致它实际上也是一个全量Checkpoint。
增量Checkpoint通常是从第二个Checkpoint开始体现出来的,在数据稳定输入且没有大规模的状态变更时,后续的增量Checkpoint应该显示出大小上的差异,表明系统正常地只对状态的增量部分进行快照。如果仍然一致,则需要进一步审查系统状态和行为,确认是否存在问题。