图片来源:pexels
背景
Shuffle是分布式计算框架用来衔接上下游任务的数据重分布过程,在分布式计算中所有涉及到数据上下游衔接的过程都可以理解为shuffle。针对不同的分布式框架,shuffle有几种实现形态:
- 基于文件的pull based shuffle,如MapReduce、Spark。这种shuffle方式多用于类MR的框架,比如MapReduce、Spark,它的特点是具有较高的容错性,适合较大规模的批处理作业。由于实现的是基于文件的shuffle方案,因此失败重跑时只须重跑失败的task、stage,而无须重跑整个job。
- 基于管道的push based shuffle,比如Flink、Storm等。基于管道的push based shuffle的实现方式多用于Flink、Storm等流式框架,或是一些MPP框架,如Presto、Greenplum等,它的特点是具有较低的延迟和较高的性能,但是比较大的问题是由于没有将shuffle数据持久化下来,因此任务的失败会导致整个作业的重跑。
Shuffle是分布式框架里面最为重要的一个环节,shuffle的性能和稳定性直接影响到了整个框架的性能和稳定性,因此改进shuffle框架是非常有必要的。
业务痛点
Spark在云原生场景下的挑战
基于本地磁盘的shuffle方式,使得Spark在云原生、存储计算分离、在离线环境中有极大的使用限制:
- 在云原生环境中,serverless化是服务部署的一个目标,但是由于弹性或是抢占情况的发生,节点或是容器被抢占导致executor被kill是一种常态,现有的shuffle无法使计算做到serverless,在节点/容器被抢占时往往需要重新计算shuffle数据,有很高的代价。
- 在线集群通常只有少量的本地磁盘和大量的CPU core,因此其计算和IO是不平衡的,在这样的集群中根据算力去调度作业时非常容易将磁盘写满。
- 现在越来越多的数据中心架构采用了存储计算分离的部署方式,在这样的部署方式下基于本地磁盘的shuffle方式首先会遇到的问题是由于本地磁盘的不足导致无法存放shuffle数据;其次,虽然可以通过块存储(RBD)的方式来解决本地存储,但是对于shuffle这样的IO使用模式,使用块存储会带来极大的网络开销和性能问题。
Spark在生产环境的挑战
当前分布式计算平台上大多数的批处理作业是Spark作业,少量是MR作业,相比于MR作业,Spark作业的稳定性较差,而稳定性的问题中至少有一半是由于shuffle的失败造成的。
Shuffle失败导致的任务陷入重试,严重拖慢作业。shuffle fetch失败会导致map任务重跑重新生成shuffle数据,然后再重跑reduce任务,如果reduce任务反复失败会导致map任务需要反复重跑,在集群压力较高的情况下重跑的代价很高,会严重影响作业。
邵铮在SPARK-1529中就有相应的评论,地址如下:
https://issues.apache.org/jira/browse/SPARK-1529
对于超大规模的shuffle数据(T级别以上的shuffle量)的作业,非常难以顺利跑过,这里面的问题有:
- shuffle数据非常容易将磁盘写满。只有通过反复调整和重试使executor尽量分布到多的节点(anti-affinity)上避免这个问题。
- 海量的shuffle partition导致非常多的shuffle连接,使得shuffle框架极容易发生超时问题,以及非常高的随机访问IO所导致的问题。
基于本地磁盘的shuffle方式有较为严重的写放大问题和随机IO问题,当任务数量达到10K乃至100K以上时,随机IO的问题非常严重,严重影响了集群的性能和稳定性。
因此实现一个更好的、能解决上述业务痛点的shuffle框架显得尤为重要。
业界趋势
业界在shuffle[1]上也有了多年的探索,围绕各自的业务场景构建了相应的能力,这里罗列一下主流公司在shuffle上所做的工作。
百度DCE shuffle
百度DCE shuffle是较早在业内实践并大规模使用的remote shuffle service方案,它设计的初衷是为了解决几个问题,一是在离线混部,二是提高MR作业的稳定性和处理规模。百度内部的MR作业已经改造接入DCE shuffle并使用多年,现在Spark批处理作业也已经改造使用DCE shuffle做为其shuffle引擎。
Facebook Cosco Shuffle[2]
Facebook Cosco Shuffle的设计初衷和百度非常接近,Facebook数据中心的构建是存储计算分离,因此传统的基于本地文件的shuffle方式有较大的开销,同时在Facebook中最大的作业规模shuffle量达100T,这对shuffle有极大的挑战,因此Facebook实现了基于HDFS的remote shuffle service - Cosco Shuffle。
Google Dataflow Shuffle[3]
Google Dataflow Shuffle是Google在Google Cloud上的Shuffle服务,针对云上的弹性易失环境,Google开发了一套Dataflow Shuffle服务供Google Cloud的大数据服务使用。Dataflow Shuffle也是一套remote shuffle service,将shuffle存储移到了VM之外,提供了计算作业更大的弹性。
Uber Zeus[4]
Uber 为了解决上述提到的Shuffle痛点,也实现了Zeus这个Remote Shuffle Service,该项目已经开源。从设计文档及实现看,他们部署了多台Shuffle Server用来接收并聚合Shuffle数据,采用SSD作为存储介质来提升Shuffle性能。
阿里ESS[5]
阿里的ESS(EMR Remote Shuffle Service)主要是为了解决Spark on Kubernetes面临的计算存储分离问题,使得Spark能够适配云原生环境。
业务价值
实现Remote Shuffle Service,能带来几点业务价值:
- 云原生架构的支持:现有的分布式计算框架(如Spark需要依赖本地磁盘存储Shuffle数据)极大地限制了云原生的部署模式。使用Remote Shuffle Service可以有效减少对本地磁盘的部分依赖,支持集群的多种部署模式,提升资源利用率,助力云原生架构。
- 提升Spark作业shuffle稳定性:对于shuffle数据量达到TB,甚至10TB级别,这类任务会对磁盘空间造成很大压力,同时,Task数据较多还造成了网络访问的压力,最终导致失败率较高,而Remote Shuffle Service能更好的解决这些问题,使得业务方能平稳运行这类任务。
Firestorm介绍
目标
在腾讯内部每天有上百万的Spark任务在运行,上述各类Shuffle问题也经常遇到。同时,为了更好的利用硬件资源,计算存储分离的部署模式也在逐步推进。因此,我们进行了Firestorm的研发,该项目的目标如下:
- 支持大Shuffle量的任务(如,TeraSort 40T+)
- 支持云原生的部署模式(如,计算存储分离部署模式)
- 支持多种存储系统(LocalFile, HDFS, COS等)
- 支持数据完整性校验
- 性能与计算引擎的原生方案接近
架构设计方案
Remote Shuffle Service的架构如下:
其中,各个组件的功能如下:
- Coordinator,基于心跳机制管理Shuffle Server,存储Shuffle Server的资源使用等元数据信息,还承担任务分配职责,根据Shuffle Server的负载,分配合适的Shuffle Server给Spark应用处理不同的Partition数据。
- Shuffle Server,主要负责接收Shuffle数据,聚合后再写入存储中,基于不同的存储方式,还能用来读取Shuffle数据(如LocalFile存储模式)。
- Shuffle Client,主要负责和Coordinator和Shuffle Server通讯,发送Shuffle数据的读写请求,保持应用和Coordinator的心跳等。
- 在Shuffle Server和Storage交互过程中,解耦了Storage Handler组件,基于这个组件可以灵活接入不同的存储,满足各种存储需求。
架构设计差异点
相较于业界的其它方案,Firestorm有其独特的地方:
- 架构方面,引入Coordinator组件,更好的管理Shuffle Server并基于Shuffle Server的节点状态合理分配Shuffle任务,集群本身也支持灵活的横向扩展以满足生产需要
- 技术方面,解耦了存储模块,支持新的Shuffle数据存储方式仅需要实现相关接口。作为整个系统最重要的数据校验部分,除了CRC,数据去重等实现外,还增加了读写数据一致性校验,使得数据在传输过程中更安全可靠。
- 运营方面,Firestorm提供了各类运行统计数据并接入内部监控平台,便于观察集群整体状况,了解性能瓶颈,并能在异常状况下及时收到告警信息。
整体流程
基于Firestorm的整体Shuffle流程如下:
- Driver从Coordinator获取分配信息
- Driver向Shuffle Server注册Shuffle信息
- 基于分配信息,Executor将Shuffle数据以Block的形式发送到Shuffle Server
- Shuffle Server将数据写入存储
- 写任务结束后,Executor向Drive更新结果
- 读任务从Driver侧获取成功的写Task信息
- 读任务从Shuffle Server获得Shuffle元数据(如,所有blockId)
- 基于存储模式,读任务从存储侧读取Shuffle数据
写流程
在写Shuffle数据的时候,需要考虑内存合理使用,文件的异步写入,Shuffle数据的合并等,具体流程如下:
- Task基于PartitionId,将数据发送到对应Buffer中
- 当Buffer到达阀值时,将Buffer 的数据发送到数据队列里
- 不断的从数据队列里获取数据,并提交给发送线程
- 发送线程先向Shuffle Server请求内存空间,再将数据发送至Shuffle Server的缓冲区。
- Shuffle Server缓冲区到达阀值后,将Shuffle数据发送至写队列
- 不断从写队列中获取数据,并提交给写入线程
- 基于Shuffle数据信息(ApplicationId, ShuffleId, PartitionId)获取存储路径,将Shuffle数据写入Index文件和Data文件中
- Task写入完成后,告知Shuffle Server任务已完成并获取当前所有任务完成数,假如任务完成数小于预期值,则进入下一步,假如任务完成数大于预期值,则发送信息给Shuffle Server将缓冲区相关信息写入存储,并等待写入结果,成功后进入下一步
- Task完成后,将TaskId记录在MapStatus中,并发送到Driver,该步骤用来支持Spark推测执行功能
读流程
在读Shuffle数据的时候,主要考虑数据的完整性,具体流程如下:
- 从Driver侧获取Write阶段所有成功TaskId
- 读取shuffle数据,先读取Index文件,校验BlockId是否都存在,基于Index文件Offset信息,再读取Data文件,获取shuffle数据
· 如果Storage是HDFS,则直接从HDFS读取
· 如果Storage是Local File,则需要通过Shuffle Server读取文件
Shuffle文件
对于Shuffle数据,存储为Index文件和Data文件,其中实际的Shuffle数据以Block形式存储在Data文件中,而Index文件则存储每个Block的元数据,具体存储信息如下:
- BlockId: 每个Block的唯一标识,long型,前19位为自增Int,中间20位为PartitionId,最后24位为TaskId
- Offset: Block在Data文件里的偏移量
- Crc: Block的Crc校验值,该值在Block生成时计算并最终存储在Index文件,读取Block时用来验证数据完整性
- CompressLength: Block压缩后的数据长度
- UnCompressLength: Block未压缩的数据长度,用来提高读取时解压效率
- TaskId: 用来过滤无效的block数据
数据校验
数据的正确性对于Shuffle过程来说是最为关键的,下面介绍了Firestorm如何保障数据的正确性:
- 写任务为每个Block数据计算CRC校验值,读任务会对每个Block基于CRC进行校验,避免数据不一致
- 每个BlockId存储在Shuffle Server侧,读取数据的时候,将验证所有BlockId都被处理,避免数据丢失
- 成功的Task信息将记录在Driver侧,读取时过滤冗余的Block,避免推测执行导致的数据不一致问题
支持多存储
由于存储的选择较多,LocalFile,HDFS,OZONE,COS等,为了能方便接入各类存储,在设计上对存储做了解耦,抽象了读写接口。对于不同的存储只需要实现相关接口,即可作为Shuffle数据的后端存储使用。
Firestorm 收益
支撑云原生的部署模式
Firestorm目前在腾讯内部已经落地于近万个节点的在离线混布集群,每天支撑近5W的分布式计算作业,每天的Shuffle数据量接近2PB,任务失败率从原先的14%降低到了9%,已经达到了初期制定的第一阶段目标,助力分布式计算上云。
改善Shuffle阶段的稳定性及性能
基于TPC-DS 1TB数据量,我们对使用原生Spark Shuffle 和 使用Firestorm进行了性能对比测试,测试环境如下:
- 3台服务器作为计算节点,80 core + 256G + HDD
- 3台服务器作为ShuffleServer,112core + 128G + HDD存储Shuffle数据
TPC-DS的SQL复杂程度不一,对于简单的SQL,由于Shuffle数据量较少,原生Spark Shuffle表现更好,但是性能优势并不明显,而对于复杂的SQL,涉及到大量的partition的Shuffle过程,则Firestorm表现更稳定,且性能有大幅提升,下面将分别描述这2种场景:
场景1,简单SQL,以query43为例,下图是query43的Stage图,由2个stage组成,shuffle数据量非常少,使用原生Spark Shuffle整个query运行耗时12秒左右,而使用Remote Shuffle Service则需要15秒左右。
那这个时间是损耗在哪里了呢?下图展示了第一个stage的相关耗时,可以看到在write time这一列的统计上,原生Spark Shuffle是有性能优势的,耗时都在毫秒级别,而使用Firestorm,由于在shuffle write阶段增加了RPC通信,导致耗时增加,再加上任务数量需要分多批次跑完,每批次都会产生几百毫秒的差值,最终造成了原生Spark Shuffle在这个query上有了3秒左右的性能优势。
随着SQL的执行时间增加,这类性能优势会逐步下降,几乎可以忽略不计,这一类的SQL有query1, query3等等,这里就不一一列举了。
场景2,复杂SQL,以query17为例,下图展示了分别使用不同shuffle模式的Stage图,从图中可以看到这个SQL的stage数量多,且shuffle数据量大,执行的耗时上使用原生Spark Shuffle为8分钟左右,而使用Remote Shuffle Service仅为3分钟左右。
将耗时最长的Stage展开,进一步看下具体的耗时比对,先看下Shuffle Read的耗时,由于原生Spark Shuffle需要从各个Executor上拉取数据,涉及到大量的网络开销以及磁盘的随机IO,耗时非常长,甚至达到了2分钟,而Remote Shuffle Service由于读取时降低了网络开销,且读取的是整块Shuffle数据,所以耗时短且较为稳定。
再来看下Shuffle Write的耗时,原生Spark Shuffle依然耗时长,且不稳定,这个主要是由于这个时间点,计算节点同时处理Shuffle Read和Shuffle Write,都需要对本地磁盘频繁访问,且数据量较大,最终导致了耗时大幅增长,而Remote Shuffle Service在读写机制上很好的规避了这类问题,所以整体性能有了大幅提升且更稳定。
这类SQL还有query25,query29等待,这里就不一一举例了。
除了上述2个场景外,还有部分query由于shuffle数据量更大,使用原生Spark Shuffle不能正常跑出结果,而使用Remote Shuffle Service则可以顺利运行,如,query64,query67。
总的来说,在Shuffle数据量较小的场景下,相比原生Spark Shuffle,Remote Shuffle Service并无优势,性能有5%-10%的小幅下降或基本持平,而在Shuffle数据量大的场景下,Remote Shuffle Service则优势明显,基于TPC-DS的部分SQL测试结果显示,性能有50% - 100%的提升。
总结
本文介绍了现有Spark Shuffle实现的各类问题及业界的应对方式,结合腾讯公司内部的Spark任务实际运行状况,介绍了我们自研的Firestorm的架构,设计,性能,应用等。希望在云原生的场景中,Firestorm能更好的协助分布式计算引擎上云。
开源版本地址:
https://github.com/Tencent/Firestorm
欢迎大家关注和star,同时也欢迎各位优秀的开发者加入腾讯大数据研发团队。
附录
[1]https://issues.apache.org/jira/browse/SPARK-25299
[2]https://www.slideshare.net/databricks/cosco-an-efficient-facebookscale-shuffle-service
[3]https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#cloud-dataflow-shuffle
[4]https://github.com/uber/RemoteShuffleService