1概述
本文是描述的是关于“分布式计算框架”(英文全称Distribute Compute Framework,简称DCF)的设计相关内容。为方便起见,以下将系统名称简称为DCF。
DCF内部采用C++语言实现了类MapReduce原理的一套分布式计算机制,力求为用户提供一套高效、稳定、可扩容的分布式计算框架。让用户在有分布式计算需求时,只需要实现计算相关的代码,很大程度提高用户的开发效率。
将来的版本,系统还将提供统一输入界面,能让用户输入类似SQL的语句来对已存储的数据进行分析,无需用户再另外实现计算的代码。
2系统总体结构
2.1应用架构
DCF包括2个master,若干个woker。若要整个框架能进行分布式计算还需要一个分布式存储系统(简称DFS),和一个用来发起计算的客户端。master和worker是独立的进程,可以部署在同一台机器或者不同的机器上,但原则上两个master需要部署在不同的机器上。
主master负责验证client是否合法,以及为client分配计算资源;
从master作为主master的备份,不断与主master通过同步消息保持同步,在主master无法服务时,可立即替代主master提供服务。
Worker是计算的主体,也可以称作计算资源,为客户提供CPU和内存等计算资源,执行客户预定义的计算。
应用/Client是分布式计算任务的发起者,并以动态库形式提供计算方法在worker上进行计算。
DFS是分布式文件系统的简称,它为本框架提供存储支持。
2.2系统逻辑结构
2.2.1功能模块简介
DCF共划分了7大功能模块,其中核心模块为:master、worker、client、dfsclient,辅助模块为common、message、test。下面将分别介绍各模块的功能特性。
1) master:
负责应用client的身份认证,处理应用client的计算请求,包括向应用client分配计算资源,管理worker状态等。
2) worker:
负责实际执行应用client请求的计算。主要操作包括:接受master的指派为某个应用client提供计算服务;接收应用client上传的计算动态库,以及从dfs获取由应用client切分好的原始数据,进行计算;向应用client汇报计算进度;向master汇报计算完成消息等。
3) client:
实现了大部分发起计算时需要与master和worker进行的通信及相关代码。用户程序(应用client)只需引用client库文件并调用相关函数即可发起计算任务。
4) dfsclient:
定义了分布式文件系统操作接口,以及该接口对应于各个不同的分布式文件系统的实现。
前期先支持sector(一款用C++开发的分布式文件系统),后期还会支持HDFS和TFS等开源分布式文件系统,也可能支持自行开发一款内存型的分布式文件系统。
5) common:
包括各模块通用的代码。包括网络通信模块,公共函数,操作系统相关操作代码,通用结构和基类的定义。
6) message:
定义各模块之间进行通信的各种消息格式和解析方法。
7) test:
包括部分单元测试和集成测试代码。
2.2.2系统逻辑结构图
分布式计算框架DCF,向上通过client库向应用client提供接口函数,应用client通过调用这些接口函数来配置和发起计算任务,并获得计算结果。向下通过对DFS访问操作进行抽象封装,使其能够支持访问多种分布式文件系统,具有良好的扩展性。
DCF系统模块逻辑结构图:
3系统特性
3.1平滑扩容
DCF集群可以通过增加worker进程来提升集群的计算能力。
DCF集群的计算能力与DCF集群内的worker进程数量、机器数量以及机器配置相关。如果在执行某计算任务时耗时较长,首先检查worker是否配置合理,单台机器内worker进程数量=CPU内核数 * 常数K;一般情况下,常数K=1.0或者1.5。其次,如果worker配置合理,那么只能通过增加机器来解决。
在新增的服务器上配置并启动n(n=CPU内核数 * 常数K)个worker进程, worker进程启动后会主动向master发送心跳包,master会记录worker相关的信息,如果有新的计算任务,master就会将新进的worker作为计算资源使用。
3.2并行计算机制
通常两种情况下可以用到并行计算,第一种就是待计算的数据量很小,但要进行一系列复杂的运算,耗时很长,如果这一系列复杂的运算能够拆分成一些离散的部分,则就可以用并行计算来提高运算速度;第二种情况是待计算的数据量相当大,凭借单台机器的CPU、内存和存储资源有可能需要很长时间来完成计算任务,这种情况也可以利用并行计算来解决问题。
DCF针对的就是第二种情况,将海量的数据切分成一块块的小数据,然后交给不同的计算单元进行计算,最后再合并结果。实际上,现实中对于并行计算的需求大部分是来源于数据量巨大,单台机器无法处理。
DCF采用的是MapReduce原理,首先在splitting将超大数据随机切分成小的数据块,然后调用每个可用的计算资源进行Mapping过程,随后partition和shuffle阶段将数据按自定义的规则(可理解为按group字段)来进行汇集和分发,接着再利用reduce阶段对数据进一步处理,最后再合并经过reduce阶段的处理结果。原理图如下:
3.3容错机制
3.3.1 Master容错
master采用HA结构,两台master互为热备,一台为主,一台为备。当主节点出现问题无法服务时,备用节点就会监测,自动代替主节点提供服务。
3.3.2 Worker容错
如果worker在正在进行计算时出现问题,client会检测到错误发生,并将worker正在计算的那部分数据重新分配给其他可用worker来计算。同时Master也会检测到错误发送,会将worker的状态从被占用改为失效,并更新它的worker列表。
如果worker在已经被master分配给client,但还没有执行计算时出现问题,client会检测到错误,然后将该worker从资源列表中删除;master也会检测到错误,同时也会更新它的worker列表。
3.3.3 局限性
在下列情况时,无法保证系统能正常工作:
第一种情况,如果DCF集群正在执行某个计算任务时,任务的发起者应用client发生错误,那么这次计算任务将无法完成,master会停止这次计算并记录错误原因。
第二种情况,某个时间段内集群仅有一台master能够正常服务,而随后这台master也发生了问题无法正常服务,那么此时集群将停止服务,直到至少一台master正常启动。
4关键业务处理流程描述
4.1常规流程
4.1.1client发起计算流程
1、client每隔n秒向master请求计算资源,直到master回复请求成功或者超过最大请求次数。
2、master返回给client可用的计算资源列表,已经分配的本次任务的标识task_id。
3、master将这些计算资源标记为占用状态。
4、client将计算库文件上传至dfs:/dcf/compute/lib/task_id/目录下。
5、client根据可用资源数切割输入数据。
6、client将切割好的数据块根据就近原则与资源(worker)打包成计算桶(bucket),并建立bucket与worker的对应关系。
7、client向每个计算资源(worker)发送计算开始消息。
4.1.2 计算任务执行流程(含reduce)
本流程描述的是客户端需要进行reduce操作,即实现了reduce函数。
1、 client向已分配的每个worker提交计算消息(请求包)。
2、 worker调用计算库的map函数。
3、 worker调用partition函数,并以map函数的输出结果作为输入参数,算出结果将要分配到哪个新的worker,并将结果存放到DFS:/dcf/compute/result/tmp/task_id/bucket_id.result下 。(这里需要注意的问题是,可能存在对bucket_id.result并发写的情况,对于不支持并发写的DFS需要注意避免写冲突。)
4、 等待所有worker完成了步骤3以后,worker检查在/dcf/compute/result/tmp/task_id目录下有没有与worker_id对应的*.result文件(worker_id与bucket_id在发起计算时已经建立对应关系),如果有,则打开该文件,读取内容并调用计算库的reduce函数算出结果,并将结果存放至DFS:/dcf/compute/result/tmp/task_id/worker_id.r.result文件下。
5、 Client收集/dcf/compute/result/tmp/task_id/*.r.result文件,合并文件内容,输出结果。
4.1.3 计算任务执行流程(不含reduce)
本流程描述的是客户端不需要进行reduce阶段的计算,即只实现了process函数。
1、client向已分配的每个worker提交计算消息(请求包)。
2、worker调用计算库的process函数,并将结果写入DFS:/dcf/compute/result/tmp/task_id/buket_id.result。(这里需要注意的问题是,可能存在对bucket_id.result并发写的情况,对于不支持并发写的DFS需要注意避免写冲突。)
3、等待所有worker完成了步骤2后,client收集/dcf/compute/result/tmp/task_id目录下的*.result文件,合并文件的内容,输出结果。
4.2错误处理流程
4.2.1 worker正在计算时发生错误时处理流程
1、client检测到worker发生错误。
2、client将分配给该worker的桶(包括worker出错时正在执行的桶)进行重新分配。
3、重新分配桶的流程如下:
3.1、client检查剩余的可用资源(worker),如果可用资源数少于指定值或者剩余桶数/可用资源数的值大于指定值,client就向master申请更多计算资源。
3.2、master返回申请结果。
3.3、如果dcf集群此时有空闲的计算资源,那么master会返回最多不多于与client上报的出错的worker数量相等的资源数。Client将剩余的桶分配到新申请的资源。
如果dcf集群此时没有空闲的计算资源,则返回申请失败。Client将剩余的桶平均分配给client剩余的可用资源(worker)。