分布式计算框架体设计

时间:2021-09-23 14:27:58

1概述

本文是描述的是关于“分布式计算框架”(英文全称Distribute Compute Framework,简称DCF)的设计相关内容。为方便起见,以下将系统名称简称为DCF。

DCF内部采用C++语言实现了类MapReduce原理的一套分布式计算机制,力求为用户提供一套高效、稳定、可扩容的分布式计算框架。让用户在有分布式计算需求时,只需要实现计算相关的代码,很大程度提高用户的开发效率。

将来的版本,系统还将提供统一输入界面,能让用户输入类似SQL的语句来对已存储的数据进行分析,无需用户再另外实现计算的代码。

2系统总体结构

2.1应用架构

DCF包括2个master,若干个woker。若要整个框架能进行分布式计算还需要一个分布式存储系统(简称DFS),和一个用来发起计算的客户端。master和worker是独立的进程,可以部署在同一台机器或者不同的机器上,但原则上两个master需要部署在不同的机器上。

主master负责验证client是否合法,以及为client分配计算资源;

从master作为主master的备份,不断与主master通过同步消息保持同步,在主master无法服务时,可立即替代主master提供服务。

Worker是计算的主体,也可以称作计算资源,为客户提供CPU和内存等计算资源,执行客户预定义的计算。

应用/Client是分布式计算任务的发起者,并以动态库形式提供计算方法在worker上进行计算。

DFS是分布式文件系统的简称,它为本框架提供存储支持。

              分布式计算框架体设计

2.2系统逻辑结构

2.2.1功能模块简介

DCF共划分了7大功能模块,其中核心模块为:master、worker、client、dfsclient,辅助模块为common、message、test。下面将分别介绍各模块的功能特性。

1) master:

负责应用client的身份认证,处理应用client的计算请求,包括向应用client分配计算资源,管理worker状态等。

 

2) worker:

负责实际执行应用client请求的计算。主要操作包括:接受master的指派为某个应用client提供计算服务;接收应用client上传的计算动态库,以及从dfs获取由应用client切分好的原始数据,进行计算;向应用client汇报计算进度;向master汇报计算完成消息等。

 

3) client:

实现了大部分发起计算时需要与master和worker进行的通信及相关代码。用户程序(应用client)只需引用client库文件并调用相关函数即可发起计算任务。

 

4) dfsclient:

定义了分布式文件系统操作接口,以及该接口对应于各个不同的分布式文件系统的实现。

前期先支持sector(一款用C++开发的分布式文件系统),后期还会支持HDFS和TFS等开源分布式文件系统,也可能支持自行开发一款内存型的分布式文件系统。

 

5) common:

包括各模块通用的代码。包括网络通信模块,公共函数,操作系统相关操作代码,通用结构和基类的定义。

 

6) message:

定义各模块之间进行通信的各种消息格式和解析方法。

 

7) test:

包括部分单元测试和集成测试代码。

2.2.2系统逻辑结构图

         分布式计算框架DCF,向上通过client库向应用client提供接口函数,应用client通过调用这些接口函数来配置和发起计算任务,并获得计算结果。向下通过对DFS访问操作进行抽象封装,使其能够支持访问多种分布式文件系统,具有良好的扩展性。

         DCF系统模块逻辑结构图:

         分布式计算框架体设计

3系统特性

3.1平滑扩容

         DCF集群可以通过增加worker进程来提升集群的计算能力。

DCF集群的计算能力与DCF集群内的worker进程数量、机器数量以及机器配置相关。如果在执行某计算任务时耗时较长,首先检查worker是否配置合理,单台机器内worker进程数量=CPU内核数 * 常数K;一般情况下,常数K=1.0或者1.5。其次,如果worker配置合理,那么只能通过增加机器来解决。

在新增的服务器上配置并启动n(n=CPU内核数 * 常数K)个worker进程, worker进程启动后会主动向master发送心跳包,master会记录worker相关的信息,如果有新的计算任务,master就会将新进的worker作为计算资源使用。

3.2并行计算机制

         通常两种情况下可以用到并行计算,第一种就是待计算的数据量很小,但要进行一系列复杂的运算,耗时很长,如果这一系列复杂的运算能够拆分成一些离散的部分,则就可以用并行计算来提高运算速度;第二种情况是待计算的数据量相当大,凭借单台机器的CPU、内存和存储资源有可能需要很长时间来完成计算任务,这种情况也可以利用并行计算来解决问题。

         DCF针对的就是第二种情况,将海量的数据切分成一块块的小数据,然后交给不同的计算单元进行计算,最后再合并结果。实际上,现实中对于并行计算的需求大部分是来源于数据量巨大,单台机器无法处理。

         DCF采用的是MapReduce原理,首先在splitting将超大数据随机切分成小的数据块,然后调用每个可用的计算资源进行Mapping过程,随后partition和shuffle阶段将数据按自定义的规则(可理解为按group字段)来进行汇集和分发,接着再利用reduce阶段对数据进一步处理,最后再合并经过reduce阶段的处理结果。原理图如下:

        分布式计算框架体设计

3.3容错机制

3.3.1 Master容错

master采用HA结构,两台master互为热备,一台为主,一台为备。当主节点出现问题无法服务时,备用节点就会监测,自动代替主节点提供服务。

3.3.2 Worker容错

如果worker在正在进行计算时出现问题,client会检测到错误发生,并将worker正在计算的那部分数据重新分配给其他可用worker来计算。同时Master也会检测到错误发送,会将worker的状态从被占用改为失效,并更新它的worker列表。

如果worker在已经被master分配给client,但还没有执行计算时出现问题,client会检测到错误,然后将该worker从资源列表中删除;master也会检测到错误,同时也会更新它的worker列表。

3.3.3 局限性

在下列情况时,无法保证系统能正常工作:

第一种情况,如果DCF集群正在执行某个计算任务时,任务的发起者应用client发生错误,那么这次计算任务将无法完成,master会停止这次计算并记录错误原因。

第二种情况,某个时间段内集群仅有一台master能够正常服务,而随后这台master也发生了问题无法正常服务,那么此时集群将停止服务,直到至少一台master正常启动。

4关键业务处理流程描述

4.1常规流程

4.1.1client发起计算流程

1、client每隔n秒向master请求计算资源,直到master回复请求成功或者超过最大请求次数。

2、master返回给client可用的计算资源列表,已经分配的本次任务的标识task_id。

3、master将这些计算资源标记为占用状态。

4、client将计算库文件上传至dfs:/dcf/compute/lib/task_id/目录下。

5、client根据可用资源数切割输入数据。

6、client将切割好的数据块根据就近原则与资源(worker)打包成计算桶(bucket),并建立bucket与worker的对应关系。

7、client向每个计算资源(worker)发送计算开始消息。

4.1.2 计算任务执行流程(含reduce)

本流程描述的是客户端需要进行reduce操作,即实现了reduce函数。

1、  client向已分配的每个worker提交计算消息(请求包)。

2、  worker调用计算库的map函数。

3、  worker调用partition函数,并以map函数的输出结果作为输入参数,算出结果将要分配到哪个新的worker,并将结果存放到DFS:/dcf/compute/result/tmp/task_id/bucket_id.result下 。(这里需要注意的问题是,可能存在对bucket_id.result并发写的情况,对于不支持并发写的DFS需要注意避免写冲突。)

4、  等待所有worker完成了步骤3以后,worker检查在/dcf/compute/result/tmp/task_id目录下有没有与worker_id对应的*.result文件(worker_id与bucket_id在发起计算时已经建立对应关系),如果有,则打开该文件,读取内容并调用计算库的reduce函数算出结果,并将结果存放至DFS:/dcf/compute/result/tmp/task_id/worker_id.r.result文件下。

5、  Client收集/dcf/compute/result/tmp/task_id/*.r.result文件,合并文件内容,输出结果。

4.1.3 计算任务执行流程(不含reduce)

本流程描述的是客户端不需要进行reduce阶段的计算,即只实现了process函数。

1、client向已分配的每个worker提交计算消息(请求包)。

2、worker调用计算库的process函数,并将结果写入DFS:/dcf/compute/result/tmp/task_id/buket_id.result。(这里需要注意的问题是,可能存在对bucket_id.result并发写的情况,对于不支持并发写的DFS需要注意避免写冲突。)

3、等待所有worker完成了步骤2后,client收集/dcf/compute/result/tmp/task_id目录下的*.result文件,合并文件的内容,输出结果。

4.2错误处理流程

4.2.1 worker正在计算时发生错误时处理流程

1、client检测到worker发生错误。

2、client将分配给该worker的桶(包括worker出错时正在执行的桶)进行重新分配。

3、重新分配桶的流程如下:

3.1、client检查剩余的可用资源(worker),如果可用资源数少于指定值或者剩余桶数/可用资源数的值大于指定值,client就向master申请更多计算资源。

         3.2、master返回申请结果。

3.3、如果dcf集群此时有空闲的计算资源,那么master会返回最多不多于与client上报的出错的worker数量相等的资源数。Client将剩余的桶分配到新申请的资源。

如果dcf集群此时没有空闲的计算资源,则返回申请失败。Client将剩余的桶平均分配给client剩余的可用资源(worker)。