01 背景
随着欧加集团大数据业务的发展,现阶段公司大数据平台20+个组件,1EB+级别数据量,平台1000人均日活,服务已经有相当大的规模。在这样的业务背景下,越来越多的用户在使用大数据平台时,发现难以定位问题。基于此,我们设计大数据诊断平台,旨在提升用户解决问题效率,降低用户异常成本。代号“罗盘”,意为用户定位问题,给出优化方案。
此前业务存在问题现状总结如下:
1、问题定位效率相对低。平台组件多,从上层调度器、Livy客户端到中层计算引擎Spark,最后底层Hadoop系统;用户作业日志量大,没法串联一起,问题上下文关联难;用户人员角色非单一研发角色人员,自行分析能力有限,需平台方提供协助解决,沟通与定位让双方工作量只增不减;缺乏自动化工具定位问题等等。各种因素说明,海量作业调度,多种类型运行环境,TB级别日志量,依靠人力盘查作业问题是非常耗的。
2、异常问题类型多,缺乏有效知识库,高效重复利用已有的解决方案。从作业调度任务系统到计算引擎层,常见的业务问题常见如:晚点溯源、高频失败、运行耗时长、数据倾斜、暴力扫描、shuffle失败、CPU浪费、内存浪费、内存溢出等,需将问题数量降低收敛。
3、异常任务、不合理任务成本多。用户任务在执行周期内发生异常或者配置不合理,将导致任务浪费资源,产生许多额外的成本,需将此类问题成本损失降至最低。
总体上希望,从问题出发、经过快速定位、优化方案、问题收敛环节,最后达到降本增效目的。
02 业界产品
基于以上问题,我们调研了业界有关大数据诊断系统,目前比较类似的是Dr. Elephant开源系统,Dr. Elephant一个Hadoop和Spark的性能监控调优工具。它能自动采集Airflow、Azkaban、Oozie等调度系统作业流及计算引擎Spark和Hadoop MR的运行指标,分析作业的异常和性能结果,指导开发者进行作业调优,从而提升开发者工作效率和集群资源利用率。
工作原理:
Dr. Elephant定期从Yarn资源管理中心拉取近期成功和失败的作业列表。每个作业会实时从历史服务器中获取到元数据、配置及调度器作业信息以及监控数据。一旦获取到所有的元数据信息,Dr. Elephant就基于这些元数据运行启发式算法,并生成一份该作业的诊断报告。对该作业报告,进行标记和评级,分为五个级别来评定作业存在新能问题严重程度。
核心功能:
集成多个调度器框架如Azkaban、Airflow、Oozie等;
统计历史作业和工作流的性能指标;
Job级别工作流对比;
支持多个计算引擎框架性能诊断(Spark、Tez、MapReduce、TonY);
基于自定义规则的可配置启发式插件,用户诊断作业;
提供REST API, 用户能通过API获取所有信息;
欠缺功能:
支持Spark, Hadoop系统版本比较低,对于新版本Spark, Hadoop兼容性不友好;
不支持Spark, Hadoop新版本的特性的诊断;
诊断指标比较少,其中Spark相关指标仅4个,对于高度依赖Spark引擎是非常欠缺的;
不支持日志级别问题诊断,不能够诊断调度器运行任务或者App应用程序的出现的异常;
调度器和作业App元数据的关联在一些场景下不支持;
不支持异常资源的管理,达到降本增效指引目的;
对Spark History服务接口频繁调用影响History服务的稳定性;
缺乏有效的降本增效流程辅组工具;
综上所述,结合我们有大规模Spark集群调度特点,业界产品对我们解决业务痛点效果不佳, 我们决定自研诊断系统来解决业务带来的挑战。
03 技术方案
由上述可知,系统在业务层面既能快速定位解决用户问题,又能帮助用户管理异常资源;架构层面支持Spark, Hadoop多指标诊断又不影响第三方系统性能问题,我们采用非入侵的方式设计诊断系统。
架构层主要由同步工作流层任务元数据模块、同步Yarn/Spark App元数据模块、关联工作流层/引擎层App元数据模块、工作流任务异常检测模块,引擎层异常检测模块,Portal展示模块组成。同时调度器(Scheduler Server)可以适配多个开源调度器项目, 如内部系统Oflow、Airflow、DolphinScheduler等。
整体架构图
整体架构分3层:
第一层为对接外部系统,调度器、Yarn、HistoryServer、HDFS等系统,同步元数据、集群状态、运行环境状态、日志等到诊断系统分析;
第二层为架构层, 包括数据采集、元数据关联&模型标准化、异常检测、诊断Portal模块;
第三层为基础组件层,包括MySQL、 Elasticsearch、Kafka、Redis等组件。
具体模块流程阶段:
(1)数据采集阶段:从调度系统将用户、DAG、作业、执行记录等工作流元数据同步至诊断系统;定时同步Yarn ResourceManager、Spark HistoryServer App元数据至诊断系统,标志作业运行指标存储路径,为后续数据处理阶段作基础;
(2)数据关联&模型标准化阶段:将分步采集的工作流执行记录、Spark App、Yarn App、集群运行环境配置等数据通过ApplicationID介质进行关联,此时,工作流层与引擎层元数据已关联完毕,得到数据标准模型 (user, dag, task, application, clusterConfig, time) ;
(3)工作流层&引擎层异常检测阶段:至此已经获得数据标准模型,针对标准模型进一步Workflow异常检测流程,同时平台维护着一套沉淀多年的数据治理知识库,加载知识库到标准模型,通过启发式规则,对标准模型的指标数据、日志同时进行异常挖掘,结合集群状态及运行是环境状态,分析得出工作流层、引擎层异常结果;
(4)业务视图:存储、分析数据,提供给用户任务概览、工作流层任务诊断、引擎层作业Application诊断,工作流层展示调度器执行任务引发的异常,如任务失败、回环任务、基线偏离任务等问题,计算引擎层展示Spark作业执行引发的耗时、资源使用、运行时问题;
04 实践效果
我们从四个方面简述诊断平台带来的效果:诊断平台UI、效率分析、成本分析、稳定性分析、降本增效分析。
(1)诊断平台UI
引擎层分析主要展示Spark计算过程中异常、不合理的作业,并给作业记录异常标签,如CPU浪费、数据倾斜、Task长尾、大表扫描等异常类型标签,这些标签是数据标准模型经过工作流层、引擎层异常检测得出,同时可以让用户清楚作业的问题原因。
(2)效率分析
长尾Task分析
原因:长尾任务是由于作业运行过程中,一个Task或多个Task单元执行时间过长,拖延整个任务运行时间。
危害:作业执行时间过长,资源浪费
诊断:从时间角度计算,执行时间过长原因在于Task读取数据量多或者数据读取慢。如果读取数据过多,那么将出现数据倾斜,按数据倾斜方式处理;如果读取数据过慢,那么Hadoop集群的节点负载高或者有网络丢包问题等,导致数据读取慢,可以联系运维处理。
HDFS卡顿分析
原因:HDFS卡顿是Spark作业中Task最小执行单元读取数据速率比其他Task慢,低于阈值;
危害:作业执行时间过长,浪费资源;
诊断:作业数据所在机器网络IO问题或者集群配置不一致问题,导致Task从Hadoop读取数据速率低下。这种情况一般伴随着长尾Task出现,同时表现Task执行时间过长、读取数据量少,导致整个数据处理Task无法高效利用回收。这种情况需排查数据在节点配置及机器硬件配置;
推测执行过多分析
原因:推测执行(speculative)是指作业执行单元Task在同一个Stage中的执行时间相比其他Task执行时间长,在其他Executor发起相同Task执行,先完成的Task将Kill另个Task, 并取得结果。这样情况下如果作业大部分Task都发起推测执行,超过一定比例,就是推测执行过多的表现;
危害:任务执行时间长,资源浪费恶化;
诊断:机器配置不同、网络波动、集群负载高、作业数据倾斜等都会引起推测执行,过多的慢任务执行推测将会导致资源恶化,推测执行其实是对资源的压榨、用空间换取时间的做法。解决执行推测要从多方面入手,结合集群状态环境。
全局排序异常分析
原因:Spark Stage中的Task只有一个时,而且处理的数量级别大,Stage中的所有数据都集中在一个Task中,这种情况即发生全局排序异常。
危害:任务处理时间长、消耗资源大
诊断:全局排序异常并没有发挥Spark并发计算特性,Task处理数据漫长,非常消耗资源,解决这个问题需要对作业进行重新分区,并发计算数据。
(3)成本分析
CPU浪费分析
原因:Spark Driver/Executor cores参数配置不合理导致CPU空闲浪费
危害:没用高效利用资源
诊断:通过Spark Application采集指标,分析Spark Driver、Spark Executor执行过程中的CPU的运行时间(单位: vcore·second)占比,如果空闲时间超过一定的比例,判定为浪费,用户根据比例降低启用CPU数量。
计算Application CPU浪费过程中,采集到Executor执行开始和结束时间、Executor执行所有Job开始和结束时间、Job内部真正执行Task CPU时间, 最终获得以下指标:
所有Executor的并发个数Count,每个Executor固定核数ExecutorCores
所有Executor内Job真正执行时间和JobTime(计算Job开始结束时间交叉和)
所有Executor内Task个数 TaskCount及每个Task执行CPU时间
总CPU计算时间估算为:
实际使用CPU计算时间为:
CPU浪费百分比:
如果空闲比很大,可以适当降低参数spark.executor.cores的值,降低并发度,或者减少RDD分区数和Shuffle参数spark.sql.shuffle.partitions。
内存浪费分析
原因:分析Driver/Executor内存使用峰值占总内存比例,当空闲比例值超过阈值,为内存浪费
危害:没用高效利用资源
诊断:采集Spark Application Driver/Executor的相关内存指标,与CPU浪费计算同理,获得Executor指标如下:
所有Executor个数Count, 每个Executor内存ExecutorMemory
每个Executor执行时间
每个Executor执行过程内存峰值
总的内存时间估算为:
实际内存时间为:
浪费内存百分比:
如果空闲比很大,可以适当降低参数spark.executor.memory的值;
(4)稳定性分析
全表扫描问题
原因:SparkSQL查询大表数据时,没有进行分区条件筛选,或者SQL比较复杂时,发生了全表扫描;
危害:作业执行时间长,集群负载高,影响其他作业执行
诊断:Spark SQL扫描数据表时,尽管现在Spark对优化器已经有不少的优化,如谓词下推、列裁剪、常量合并等,但都相对简单,在没分区的大表或者用户Join大表和小表时,会出现全表扫描或者分区不合理暴力扫描情况。一旦执行了这种作业,一方面用户长时间才能得到数据结果,另一方面平台方承载作业扫描全表的压力,作业会占用集群主要资源,拖慢其他作业。因此用户需要根据具体业务做条件限制,调整Spark SQL以及对表分区等。
数据倾斜分析
原因:数据倾斜是Task计算过程中Key分布不均造成的,个别Key的数据特别多,超出计算节点的计算能力;
危害:会导致任务内存溢出、计算资源利用率低、作业执行时间超出预期;
诊断:数据倾斜发生时,大量的Map Stage数据发送到Reduce Stage,Reduce Stage节点需要处理大量数据,其他依赖该节点将处于长时间等待状态。比如Stage1依赖Stage0的执行执行结果,如果Stage0发生数据倾斜,导致执行过长或者直接挂起,Stage1将处于等待状态,整个作业也一直挂起,这是资源将被这个作业占有,但只有极少数Task在执行,造成计算资源浪费,利用率低;大量数据将集中在少数计算节点上,当数据量超出单个节点的内存范围,最终内存溢出,导致任务失败。一般出现在SQL字段:join on, group by, partition by, count distinct等,解决数据倾斜常用方式有:
增大并行度spark.sql.shuffle.partitions,使得数据再次分配到不同Task;
过滤异常值的数据,过多冗余值也会导致数据倾斜;
SQL中group by或者RDD的reduceByKey添加key的随机数打散Map, Reduce两个阶段数据,最后在Reduce阶段将随机数去掉;
表Join关联时,可以使用Broadcast方式广播小表数据,避免shuffle, 就不会发生数据倾斜;
Shuffle失败分析
原因:由于作业配置、网络、操作系统、硬件多个因素,Shuffle在节点之间传输数据会失败
危害:作业异常退出,资源浪费
诊断:作业计算过程中,Shuffle作为Spark MapReduce框架中的数据纽带,经常出现失败问题,问题可以分Shuffle Read和Shuffle Write两部分。
由图看出,Shuffle Write的分区(partition)数量跟MapTask(RDD)的数量一致,文件被分割后,经算子计算的中间排序结果临时存放在各个Executor所在的本地磁盘,可以理解为Shuffle Write做了本地磁盘保存文件操作。Shuffle Read的分区数有Spark提供的一些参数控制,参数不合理将会导致Reduce Task异常,如数据倾斜,甚至OOM造成Executor退出,下游网络连接不上。由诊断抓取异常了解到原因后,从Shuffle的数据量和处理Shuffle数据的分区数两个角度给出方案:
减少shuffle数据量,使用Broadcast Join或者去掉不必要字段等;
有group by、Join、 reduce by、partition by等算子操作可以通过shuffle的partitions参数,根据数据量或计算复杂度提高参数值,另外控制好并行度以及运行任务的总核数,官方推荐运行Task为核数的2-3倍;
提高Executor的内存,防止内存溢出或者JVM Crash;
提高Spark网络RPC通信时间配置,可以让数据处理完成等;
内存溢出
原因:Spark内存使用超出了容量造成内存溢出
危害:作业异常退出,资源浪费
诊断:按照Spark内存模型,用户实际使用内存如下
用户作业内存溢出分堆内和堆外两种方式:
堆外内存溢出:表现为作业被Yarn节点Kill, 主要原因是MonitorMemory超出申请内存限制
堆内内存溢出:表现为JVM内存空间不足或者GC超出限制,任务内的数据量过多导致
定位到原因后,可以有多种处理方式:
提高executorMemory, 堆内内存增大;
降低executorCores, 减少并行度,处理数据量变少;
重新分配分区(repartition), 对每个Task产生的RDD、Dataframe数据量减少等;
提高executorMemoryOverhead参数,堆外内存增大;
处理数据倾斜,如group by、reduce by等热点key打散;
SQL其他常见问题分析
原因:SQL执行过程中没权限、表不存在、语法错误等;
危害:任务执行异常退出,浪费资源
诊断:具有SQL失败特征从指标数据或者日志提取,用户根据问题去申请相应权限、创建表或者修正语法问题,能快速解决问题。
(5)降本增效
以上讲述了常见的问题案例场景,这里不再多介绍,接下来我们分析下降本增效。
通过作业层和引擎层分析识别异常、不合理任务,累计识别任务的内存、CPU资源,转化为相应的成本,通过任务元数据关联,按个人、业务、部门三个维度汇总给用户,并设置排名等机制,推进数据治理。
以下通过长期推进治理,可以看成本趋势,用户聚焦的任务问题得以改善。
05 总结与规划
OPPO大数据任务诊断平台主要围绕离线调度任务、计算引擎两个方面对问题进行定位分析,使用丰富的知识库,提供给用户解决优化方案,同时达到降本增效的目的。
技术方面采用非入侵方案对接其他系统,保证了其他系统的安全性。系统架构基于启发式规则定位、分析问题方式,但知识库比较依赖人员经验的积累,更深层次问题需要数据挖掘算法扩大检测范围,智能化诊断。
另外,除了对Spark任务问题诊断,OPPO大数据诊断平台还针对Flink任务进行异常、资源问题诊断,整体平台包含Spark、Flink两种计算引擎诊断,届时将会对平台(罗盘)进行开源。
#作者简介
BobZhuang OPPO高级数据平台工程师
专注大数据分布式系统研发,曾就职于Kingsoft公司。
Xiaoyou Wang OPPO数据平台工程师
2019年加入OPPO,负责大数据系统相关设计和开发工作,拥有丰富的后端研发经验。