hadoop-NameNode篇(基于0.20版本)

时间:2022-10-17 04:30:20

 本篇文章中重点介绍hdfs中NameNode的实现原理,由于内容非常多,要在一条线上将所有的东西都说清楚很难,所以采用的是先从宏观角度说明整体流程,其中某个环节如果复杂,为了不影响把控大的流程,所以这种点上可能就稍稍带过,如果要更详细的了解该环节的具体情况,就需要跳到其他章节去看。所以文字组织上比较乱,个人文字功底薄弱,还望大家见谅。另外,hadoop是一个非常复杂的系统,后面的内容是结合先前看源代码加上最近时间的温习,业余时间整理出来的,有问题的地方,请大家指正,我积极修改,望不以误导人为目的。

 

本篇文章是第一篇,后面我会继续整理map/reduce,hive,hbase,storm以及本人之前做过的一些大数据量方面的计算框架的开发,希望不会误导赏脸的朋友。

 

整体流程:

首先从整体上理解一下NameNode的执行流程

首先通过NameNode.java的main触发启动流程,加载配置信息,根据命令参数判定当前的启动模式(常规/导入/升级/格式化等),触发相应的启动流程,这里我们先介绍常规模式:

  1. 初始化安全模块(hadoop.security.authorization控制是否启动,默认关闭,具体实现逻辑见文章后面章节)
  2. 初始化统计模块(通过该模块将统计数据上报到JMX和所配置的监控服务器)
  3. 启动InfoServer(就是http://hadoopMasterServer:50070这个服务,通过该服务,用户可以使用浏览器查看文件系统状态,SecondaryNameNode可以通过该接口下载快照和EditLog等,服务启动在dfs.http.address,内部使用jetty实现)
  4. 启动NameSystem模块,加载快照和EditLog到内存(该模块是NameNode最核心的模块,元数据的管理逻辑都封装在该模块中,本文章中大部分都是在解释他的逻辑)
  5. 启动RPC服务器(接收ClientDataNoderpc请求,端口在fs.default.name)(RPC实现原理见后面章节)

6.    启动Trash线程(定时清空回收站,是否启动使用fs.trash.interval配置,默认关闭)

7.    Main主线程循环等待RPC服务器stop,否则一直循环(要关闭nameNode,只能是kill进程,nameNode没有提供一个类似tomcatshutdown port。采用kill是否安全,后面章节也会分析)

启动完成之后,RPC服务器和Info服务器就开始接收请求,以及相关后台线程循环执行其对应逻辑:

  1. RPC服务器接收客户端的请求,如createdeleteaddBlock等元数据修改接口,这些请求会转到由FSNamesystem处理,RPC服务器也接收DataNode的请求,如register(注册DataNode),reportBlock(上报Block信息)等,这些命令内部的执行逻辑后面我们会说到。
  2. InfoServer接受用户用的管控请求(如/listPaths,/fsck等),也接受SecondaryNameNode的请求,如getimage下载快照和EditLog
  3. 后台一些Monitor线程处理如回收站内容删除,检查DFSClient异常关闭,检查Block备份数是否正确等

 

FSNamesystem

该模块维护HDFS文件系统的元数据,如系统中的有哪些文件,每个文件中有哪些Block,各个Block存放在哪些DataNode中,并且提供对这些元数据进行操作的接口。

首先,我们可以将元数据是否会持久化到磁盘分为两类:一类是需要持久化的,如文件目录树,文件的block列表,权限等数据,当改变这些数据时,会生成操作日志被记录到磁盘里面;另一类是不需要持久化的,如Block在哪些DataNode上存储,或者反过来DataNode上存储有哪些Block,这类数据当DataNode启动时,由DataNode主动上报到NameNode,有NameNode完成收集汇总。

接下来我们介绍需要持久化的元数据如何保存。首先这里面有两个术语:快照(Image,内存中的数据结构持久化到磁盘之后的文件)和操作日志(EditLog)。当对元数据进行修改时(如create/delete),除了修改元数据在内存中的结构而外,同时会生成这些操作的操作日志,也就是EditLog,这些日志会被即时的存储到硬盘中,元数据在内存中的结构会在某些时候生成快照被持久化到磁盘。当启动NameNode时,从磁盘中得到快照,和快照之后才产生的操作日志,根据操作日志在快照的基础上重新执行这些操作,就可以恢复内存中的数据结构。不需要持久化的Block存储的DataNode列表数据,被存储在BlockMap结构中,通过该结构,可以根据block信息得到其存储的DataNode。

 

元数据在内存中的对象模型:

hadoop-NameNode篇(基于0.20版本)

说明:从图中我们可以看出针对是否需要持久化到硬盘的数据,NameNode分别使用BlockMap和FSDirectory维护。在FSDirectory对象中保存需要持久化的元数据信息,数据分为两种形式存储,一种是内存结构(INodeDirectoryWithQuota,该类继承于INodeDirectory,增加了限额的功能),一种是磁盘中的FSImage结构。

l  INodeDirectoryWithQuota继承至INodeDirectory,可以保存子节点,当HDFS中新增一个文件时,从FSDirectory的rootDir开始逐层找到父节点的INodeDirectory实例,在该实例下新增一个子节点(这里我们顺便提一下INodeFile和其子类INodeFileUnderConstruction的区别,当一个文件没有被create/append打开时,文件在内存中由INodeFile维护,一旦被这两个操作打开,说明要修改文件内容,为了防止多用户同时操作同一个文件,以及防止client异常退出,于是进入租约模型,此时该文件替换为使用INodeFileUnderConstruction来维护,该类相比INodeFile多了一个非常重要的一个属性clientName,用于记录该文件目前正在被哪个client操作,当完成文件写操作之后,撤销租约,再换回成INodeFile来维护,对租约模型感兴趣的朋友,可以参见下面的章节)。

l  FsImage维护快照和操作日志(EditLog),快照和操作日志可以在多个目录下同时存储多份,用以防止某个磁盘坏掉的情况(如果担心整个机子挂掉,可以存储一份到NFS磁盘上,多路径是在配置文件中用逗号隔开,分别使用dfs.name.dir,dfs.name.edits.dir维护快照和操作日志的存储路径),所以我们看到Storage(FSImage继承至Storage)中storageDirs(保存数据的跟目录)是一个数组,FSEditLog中editStreams也是一个数组,当对元数据进行修改时,会向editStream数组中保存的所有输出流都写入操作日志,当然同时也会在修改FSDirectory维护的内存数据。

 

了解了元数据的对象结构之后,我们再看看这些数据被持久化到磁盘之后是什么样的结构:

在参数dfs.name.dir配置的路径下,我们一般可以看到有如下的目录结构,我们先说明一个每个文件的作用,然后再讲解一下每个文件的格式。存储在磁盘中的元数据有多个状态,不同的状态存储在不同的文件夹下,如current,lastcheckpoint.tmp,previous.checkpoint等。

当从内存保存元数据到磁盘时:

1.      将current  rename为lastcheckpoint.tmp

2.      存储内存数据到current

3.      如果previous.checkpoint 存在,先删除,然后将lastcheckpoint.tmp rename为 previous.checkpoint

        current
                edits   存储操作日志

                fsimage  存储当前快照,使用fsimage+edits即是当前hdfs的文件元数据

                fstime

                VERSION

        image
                fsimage

        in_use.lock  使用文件锁的形式标识该文件夹被其他进程再使用,以防止在同一个文件下启动多个nameNode的情况。

        previous.checkpoint

                edits

                fsimage

                fstime

                VERSION

        previous

        removed.tmp

        previous.tmp

        finalized.tmp

        lastcheckpoint.tmp

 

文件中格式如下:

VERSION文件中保存: layoutVersion(格式版本号,和hadoop版本相关),namespaceID(集群标识,formate时生成),cTime,checkpointTime

 

OK,说明了元数据在内存中的结构和在磁盘中的结构之后,我们再了解一下在各个环节中,系统是如何在这两种结构中切换的:

  1. 在NameNode的启动环节,会从磁盘恢复此元数据环节:启动时调用FSDirectory的loadFSImage(Collection<File> dataDirs,Collection<File> editsDirs, StartupOption startOpt)函数加载元数据,其中参数分别表示存储Image的路径和存储EditLog的路径(dfs.name.dir,dfs.name.edits.dir配置),在该函数中,快照的路径和EditLog的路径被实例话成StorageDirectory对象被加Image的storageDirs属性中,后面可以根据类型(存储的是快照还是Log)得到相应的StorageDirectory。首先校验这些StorageDirectory的状态(不存在/需要格式化等状态),根据不同的状态和NameNode的启动模式,执行对应的逻辑,其判定逻辑如下:
    1. 如果启动模式是FORMAT:重新生成namespaceID,清空所有StorageDirectory目录下current子文件夹中的内容,如果该StorageDirectory会存储Image,则生成新的fsimage文件(写入格式版本号,新生成的namespaceID等),如果会存储EditLog,则生成edits文件,再生成VERSION文件,继续执行下面序号2的逻辑
    2. 首先检查是否支持升级(得到image/fsimage中的layoutVersion版本号,看当前hadoop版本能否支持从该版本号升级),如果不支持,就会报出异常。然后根据StorageDirectory文件夹下是否存在诸如previous,removed.tmp等文件夹存在,进行一个非常复杂的判定,最终得到StorageDirectory的状态。继续下面的步骤。
      1. 如果是NON_EXISTENT,则说明文件夹不存在或不可写,抛异常;
      2. 如果不是NON_EXISTENT,NOT_FORMATTED和NORMAL,说明需要执行recover流程
      3.  
    3. 如果是升级模式:未完待续
    4. 如果是导入模式:未完待续
    5. 如果是回退模式:未完待续
    6. 上述逻辑处理完之后,说明StorageDirectory中存储的是正常的数据:于是首先从存储快照和EditLog的所有StorageDirectory中,得到最后checkpoint的快照和以及此checkpoint之后生成的EditLog,首先加载快照到内存生成FSDirectory的rootDir属性(元数据在内存中的目录树结构),然后在此基础再执行EditLog中的操作,从而恢复NameNode到shutdown之前的最新状态。为了避免下次启动时,这些EditLog再次重复执行,所以如果本次启动中有执行过快照中没有的EditLog,则将内存中的结构再持久化到磁盘,形成一个新的快照和空的EditLog文件
  2. 在NameNode运行过程中时:对元数据进行修改,只会修改rootDir熟悉中的内存结构,同时生成EditLog日志,不会定时将生成快照持久化到硬盘(所以运行很长时间之后,就会产生大量没有持久化到快照上的EditLog,如果在NameNode启动时才执行这些Log,将会花费大量的时间,于是产生了SecondaryNameNode(SND),由SND来定时生成快照,再将快照同步回NameNode目录下,于是NameNode启动时就只需由SND生成的快照之后的EditLog就可以了,具体细节请参见SND功能逻辑部分)。在下面我们专门有章节来介绍每个针对文件系统命令,对内存结构和EditLog所发生的操作
  3. NameNode停止时:NameNode没有专门设计一个关闭的流程,stop时直接发送一个kill命令,而shutdown钩子里面也仅仅只有一个描述关闭的日志输出而已。所以停止服务时,内存结构和快照都不会做任何修改(每次操作生成的EditLog在操作结束时就被sync到了磁盘,因此不会造成数据丢失)

 到这儿为止,我们介绍了NameNode中存储元数据的内存结构,快照和EditLog的结构,元数据在内存和磁盘中如何镜像,接下来我们介绍各个针对元数据的操作接口是如何实现的,但是再次之前,为了更清晰的了解函数具体实现,我们回头再看看上面的UML图,关于INode,INodeFile,INodeFileUnderConstruction,INodeDirectory和INodeDirectoryWithQuota的关系和区别:集成关系UML图中很清楚,每个类的区别,UML下也有简单说明,希望朋友们没有错过。

首先我们从RPC的Protocol接口上看看NameNode给Client,DataNode,SecondaryNameNode提供哪些功能,这里我们就捡重点的几个举例即可,有兴趣的朋友可以继续研究,欢迎讨论。

给Client调用的几个函数:

create,append,addBlock,setReplication,rename,delete,renewLease,getDatanodeReport,getFileInfo,fsync

给DataNode调用的几个函数:

Register,sendHeartbeat,blockReport,errorReport,reportBadBlocks,commitBlockSynchronization

给SecondaryNameNode调用的几个函数:

getEditLogSize,rollEditLog,rollFsImage,getBlocks

 

从函数名称上来看,大家应该都很容易能够明白是什么功能,后面我们挨个介绍一下其内部实现,NameNode实际只是一个接受请求的壳,任何对元数据的修改经过他做一些简单的验证,日志输出,统计调用次数等操作之后都最终交由FSNamesystem处理。

l  Create:在NameNode中新增一个文件节点

1.      调用startFileInternal函数,首先判定NameNode目前是否处于安全模式(关于安全模式的说明参见其他章节),在处于安全模式的情况下,不能对文件系统做任何操作,因此直接会抛出异常,交由Client去处理。

2.      判定文件是否存在,或是文件夹,如果是文件夹,抛出异常

3.      判定是否拥有可写权限(权限的说明参见其他章节)

4.      从FSNamesystem的dir属性得到要创建的文件的INode(逐层查找Children,上面的UML图中我们有说明INodeDirectory中有children属性维护其子节点)

5.      如果该文件存在,且处于被写入的状态,则需要根据租约模型(Lease)判定当前操作是否合法,如何判定,请参加其他章节。

6.      备份数量是否在系统设置的限制范围之内:conf.getInt("dfs.replication.max", 512);conf.getInt("dfs.replication.min", 1)

7.      如果文件存在且是overwrite模式,则先执行delete(如何delete请参见后面的章节)

8.      如果是append模式,使用INodeFileUnderConstruction替换掉先前的INodeFile,将其加到租约管理器(leaseManager,防止client异常down掉,文件被该client一直占着的情况)中

9.      如果不是append,检查是否超过hdfs所限制的object数量(dfs.max.objects),EditLog写入一个timestamp操作,实例化INodeFileUnderConstruction对象,加到INode树中,向EditLog中写入OP_ADD操作(包括:Replication,ModificationTime,AccessTime,PreferredBlockSize,Blocks,PermissionStatus,ClientName,ClientMachine),加到租约管理器(leaseManager)

10.   调用logSync函数将EditLog持久化到磁盘。如何持久化的,FSEditLog有一个策略,这里面我们可以简单介绍一下,朋友们设计类似系统时可以作为参考:

hadoop-NameNode篇(基于0.20版本)

EditLog中有个事务编号txId,每新增一个Log,就会加1,FSEditLog中维护的是整个元数据库中最后的事务编号,每个线程中,维护的是线程中最后写入的Log事务编号,写入的Log数据,最开始都存储在bufCurrent buffer中,当执行logSync时,先锁住FSEditLog.this,判定是不是是不是其他线程正在持久化日志(isSyncRunning),并且正在持久化的日志最后事务编号(synctxid)比自己的txId小,说明自己的日志需要写入磁盘,那么等上一次持久化完了之后再继续执行,接下来如果发现synctxid比自己的txId大,说明自己生成的log信息已经被其他线程持久化了,不需要再处理,反之,说明本线程需要执行持久化的逻辑,首先将FSEditLog的txId赋值给synctxid,设置isSyncRunning=true,切换bufReady和bufCurrent,释放锁住的FSEditLog.this,这样其他线程可以继续写入Log,当前线程继续执行将bufReady中的数据写入磁盘,互补影响,并行处理。

l  Append:逻辑和create类似,也是调用startFileInternal函数,只是调用logSync之后,因为append要返回client用于存储数据的Block(create是当client需要时,再向NameNode调用addBlock获取存储数据的Block,但是append需要判定是新增加一个Block,还是返回先前还未填充满的Block给Client,让Client继续往这个Block中写入数据),首先得到该file的所有block,取出最后一个block,确认这个block是否还未填充满,如果没有,就返回这个Block,否则返回null

l  addBlock:首先确认是否超过hdfs允许的Objects数量大小,然后检查当前操作的文件是否被打开(是否是INodeFileUnderConstruction),并且租约是被当前请求的Client拿着,判断当前文件其他Block备份数是否大于系统允许最小备份数,调用ReplicationTargetChooser.chooseTarget选择一组合适存储数据的DataNode,如果DataNode数量小于系统所允许的备份数,则失败;新生成一个Block对象,加入到BlocksMap中,返回Client,完成请求。此函数需要注意的是,该函数中没有生成EditLog日志,虽然有生成Block,但是Block的信息只存储在BlockMap中,该结构只存储在内存中,只有client调用fync时,block的信息才会写入EditLog(具体如何操作,请参加后面介绍Client的章节)。

l  setReplication:检验备份数,可读权限等,根据变化备份数所引起的磁盘消耗,验证容量是否允许,写入修改备份数的EditLog。得到文件的所有Block,将Block存放到neededReplications队列中,该队列中的数据会被ReplicationMonitor线程定时计算应该由哪些DataNode去做备份,详细逻辑见文章后面的章节。如果新的备份数比先前的备份数小,说明要删除一些备份。

l  delete:首先从文件目录树中删除对应的节点,得到这个文件的block列表,从BlockMap中删除每个block,然后把block加入到recentInvalidateSets队列中,通过DataNode的心跳包发回DataNode,删除DataNode上对应的内容,写入删除命令到EditLog日志

l  renewLease:当Client要针对文件进行写入操作时,需要向NameNode拿到这个文件的租约,如果这个租约被其他client拿着,则不能操作。为了防止因为client崩溃导致租约不被释放,所以租约有超时时间,如果client要继续保持租约,则调用renewLease函数,该函数会修改Lease对象中的最后更新时间为当前时间。租约模型的详细介绍,请参见其他章节。

l  Fsync:将某个文件的信息持久化到EditLog中,数据内容和create时一致。此处需要注意的是关于block的信息,只持久化了blockId,字节数和生成时间,至于存储在哪些DataNode中,因为hdfs会自动在dataNode之间拷贝block,所以该信息有dataNode上报上来。

l  registerDatanode:DataNode使用该接口向NameNode注册。

l  sendHeartbeat:DataNode定时给NameNode发送心跳包,NameNode通过该函数返回xxx。NameNode接受到请求时,首先验证NameNode和DataNode文件格式的版本号(LAYOUT_VERSION),然后再验证是否是属于同一个集群的(比较NameNode和DataNode上storage.getNamespaceID()+storage.getLayoutVersion()+storage.getCTime()是否相同)。DataNode是否被设置为DECOMMISSIONED状态,得到需要Recovery(create,append时租约到期时维护,请参见租约环节的详细说明)的信息,Replication和InvalidateBlock (由后台ReplicationMonitor线程维护,具体逻辑请参见其他章节)等信息在DataNode中处理。

l  blockReport:dataNode使用该接口上报该DataNode上的Block信息,将信息存储到BlockMap中。根据BlockMap中当前的block信息和上报上来的block信息做对比,识别出哪些在DataNode上的Block是无效的(blockId没有在BlockMap中存在,说明该block不归属于任何文件,为失效),哪些是新增的(blockMap中的dataNodeList中没有当前node的,为新增),哪些是已经被删除的Block(blockMap中有,但是DataNode没有上报上来的,说明dataNode上已经没有存储该Block,为删除)。其中被判定为失效的block,会被加入到recentInvalidateSets中,ReplicationMonitor线程会定时从这个集合中取出数据,加入到InvalidateBlock,从而被DataNode发送心跳包时将该信息带回到DataNode,完成DataNode上的删除工作。

l  commitBlockSynchronization:未完待续

写了这么多,终于写到这儿了。内容太多,都不知道怎么组织了,能力有限,大家勿要怪罪。

前面将NameNode的核心逻辑都已经说的差不多了,还剩下些旁支的逻辑,或者是前面为了不影响整体流程,所以细节内容一笔带过的,下面我们也会挨个解释:

 

4、SecondaryNameNode的作用:我们前面的分析中,NameNode在运行过程中不会合并快照和EditLog,只会在启动时合并依次,运行过程中的所有操作都被记录在EditLog中,如果运行很长时间之后再重启,必然导致启动时间很长,因为有大量的EditLog需要执行。所以提出了一个SecondaryNameNode的概念,合并的操作有他去处理,合并完之后再同步给NameNode就可以了。下面我们接说明一下SecondaryNameNode的运行逻辑。当在NameNode上启动集群时,回去conf/masters(这个文件名起的实在是瘪)文件中得到SecondaryNameNode启动的服务器,连接到对应的服务器上执行SecondaryNameNode,从配置文件中得到NameNode的RPC地址和NameNode的InfoServer地址,得到用于存储快照和EditLog的目录,recover这些目录(逻辑和NameNode启动时判定StorageDirectory逻辑类似,这个逻辑很是纠结。。。),启动SecondaryNameNode的Infoserver(dfs.secondary.http.address),启动定时check的线程(间隔时间fs.checkpoint.period),每次执行过程中,首先调用NameNode的getEditLogSize函数得到edits文件大小加上内存中还未持久化到硬盘的buffer大小,如果超过需要checkpointsize的大小或时间超过间隔,就可以做合并,调用doCheckpoint函数,先通知NameNode关闭EditLog日志文件(会生成一个新的edits.new文件存储这段时间内的EditLog写入操作),从NameNode的InfoServer上使用Get /getimage?getimage=1请求下载快照到本地,/getimage?Getedit=1下载EditLog到本地,加载快照到内存,使用EditLog更新内存结构,然后将内存结构再持久化到磁盘,然后再通过/getimage?putimage=1&port =x&machine=告知NameNode来SecondaryNameNode下载已经合并好的内容(并告知SecondaryNameNode InfoServer的地址)(NameNode上接受到该请求之后,访问SecondaryNameNode InfoServer接口,下载快照数据存储成fsimage.ckpt)。在SecondaryNameNode再调用NameNode上的rollFsImage函数(NameNode上删除edits文件,将edits.new修改为edits,将fsimage.ckpt修改为fsimage)。

 

l  负载均衡流程:启Balancer对象,首先确认系统当前是否有一个Balancer在执行(文件/system/balancer.id是否存在),未完待续

l  删除回收站流程:在启动NameNode时,会启动Emptier线程,如果fs.trash.interval配置的是0,标识没有回收站功能,线程立即就会结束,否则就会循环。从/user/.Trash/目录下得到所有文件夹,文件夹是以创建这个文件夹时的时间以yyMMddHHmm格式命名的,如果这个文件夹创建时间已经超过了fs.trash.interval时间,则删除文件。

l  选择DataNode策略

 hadoop-NameNode篇(基于0.20版本)

为了实现该功能,主要有三个类:ReplicationTargetChooser封装选择策略,NetworkTopology存储网络拓扑图,而DNSToSwitchMapping则是负责解析网络位置。当NodeNode调用FSNamesystem.registerDatanode注册到NameNode上时,首先使用DNSToSwitchMapping解析出Node的网络位置,然后调用NetworkTopology.addNode将该节点加入到网络拓扑图中,当需要选择Data(addBlock函数中需要选择一组节点用于存储block数据),调用ReplicationTargetChooserchooseTarget函数,该函数中可以根据当前的网络拓扑图返回合适的Node列表。

 

0.20版本的hadoopReplicationTargetChooser当前的实现是:

1.   首先尝试在本地节点存储一份(如果是在DataNode上上传文件,即当前Clienthost能够在NameNodedataNode列表中找到,则有localNode,否则localNode为空,此处则是随机选择一个isGoodTarget),如果本机不是一个合适的节点(通过函数isGoodTarget判断),则在LocalRack中随机选择一个isGoodTarget节点,如果LocalRack中没有合适的,则尝试在已经确认要存储该数据的节点列表(某些场景下数据已经在某些节点中有保存,比如需要多做一个备份时,如果是新的Block,则这个列表为空)中选择第一个节点的LocalRack选择一个,如果还是不行,那就在全网里面选择一个isGoodTarget的节点。

2.      然后在远端的Rack中存储一份,如果远端没有合适的,则在LocalRack中随机选择一个

3.      然后在LocalRack中存储一份

4.      剩下的,在全网按照isGoodTarget随机选择

5.   isGoodTarget的判断逻辑

a)      首先已经Decommission或正在DecommissionDataNode肯定是不合适的

b)     其次剩下空间已经小于5blockSizedataNode是不合适的

c)      如果考虑压力的话(参数dfs.replication.considerLoad控制,默认考虑),当前DataNode活跃连接数大于集群平均连接数两倍的,说明该Node太慢,也是不合适的

d)     然后看当前Node所在rack中,已经被选择存储数据的node数量是否超过了maxNodesPerRack(计算规则:(totalNumOfReplicas-1)/clusterMap.getNumOfRacks()+2),如果超过了,说明该rack中存储的份数太多,也是不合适的

 

DNSToSwitchMapping的实现:

0.20版本中,采用参数topology.node.switch.mapping.impl配置,默认默认采用的是org.apache.hadoop.net.ScriptBasedMapping,该类采取的策略是It invokes a script specified in topology.script.file.name to resolve node names. If the value for topology.script.file.name is not set, the default value of DEFAULT_RACK is returned for all node names调用配置的脚本去解析。

StaticMapping实现是从hadoop.configured.node.mapping配置中读取。

 

l  增加或删除DataNode时:DataNode的修改都是通过修改dfs.hosts和dfs.hosts.exclude配置的文件内容来达到目的。将想要删除的dataNode加到dfs.hosts.exclude配置的文件中去,想要增加DataNode时,如果dfs.hosts为空,则直接启动DataNode即可,如果不为空,则需要增加dataNode到dfs.hosts所配置的文件中。

n  删除DataNode:将要删除的节点加到dfs.hosts.exclude中,然后执行FSNamesystem.refreshNodes函数(hadoop dfsadmin –refreshNodes命令可以调用该函数),重新加载dfs.hosts和dfs.hosts.exclude文件,得到NameNode上注册的所有DataNode,如果某个DataNode不在dfs.hosts中(如果dfs.hosts文件为空则默认都在),则设置该dataNode状态为DECOMMISSIONED,如果在dfs.hosts.exclude文件中,设置状态为DECOMMISSION_INPROGRESS,将这个Node上的所有Block提交到neededReplications队列告知需要增加备份数。ReplicationMonitor线程会从这个队列中取出任务,看这些增加的备份应该由哪个DataNode去做(具体逻辑可以参见后面解释ReplicationMonitor线程的章节)

l  租约模块:防止client崩溃和阻止同时操作一个文件

hadoop-NameNode篇(基于0.20版本)

先解释一下租约里面的一个术语:Holder,表示租约的拥有者,一般是一个DFSClient实例,这个值在客户端初始化DFSClient时可以随机生成,调用NameNode的某些接口时,将该值传上来(如下图的create接口中的clientName),NameNode就可以识别当前操作是谁在操作。

当某个Client打算对一个文件进行内容修改操作时(包括create/append),会针对该文件申请一个租约(addLease(String holder, String src))。加上租约之后,如果没有调用complete表示文件操作完毕,当前Client再次调用create/append,则是违法的,从而控制同一个文件不能被同一个Client同时操作;如果调用create/append的client不同,则会首先拿到这个文件的租约(lease = leaseManager.getLease(pendingFile.clientName)),查看该租约是否到期,如果到期,说明该租约的holder异常退出,触发recovery of the last block的流程,如果没有过期,当然继续让holder操作,但是都会抛出异常说明该文件已经被创建。

租约模型除了防止一个文件同时被多个client操作而外,还负责解决client异常时遗留的问题,也就是上面我们说的recovery of the last block。除了上面提到的场景会触发recovery of the last block而外,leaseManager中有个Monitor线程,也会定期检测所有的租约是否到期,如果到期,也会触发该流程。实现该流程的是FSNamesystem.internalReleaseLease(Lease lease, String src)函数,主要解决当client异常退出是最后一个Block如何处理的问题,当然如果这个文件根本就没有任何Block,那就直接使用INodeFile替换掉INodeFileUnderConstruction,在释放租约即可。如果有,找到最后一个Block,得到存储这个Block的所有DataNode,找到一个还活着的DataNode,加到这个DataNode的recoverBlocks队列中,这个recover的命令将在DataNode的心跳包回执中拿回到dataNode做相应处理(如何处理,我们在dataNode的环节再说明)。

l  NameNode停止流程/kill -9可能导致的问题:NameNode停止是使用kill命令关闭,针对元数据的修改因为EditLog在命令结果返回给用户之前就已经被存储到硬盘,而其他操作都是无状态的,因此不会出现不一致的问题。

l  格式化/如何升级/如何导入/如何回滚等

当NameNode启动时,通过启动参数将启动模式传入到NameNode,启动参数有:

hadoop-NameNode篇(基于0.20版本)

n  解释每个命令具体执行逻辑之前,先说明一下因为这些每个操作对元数据的处理都会有好几步,很有可能执行到中途的时候发生异常,所以需要每次启动时都判断元数据目录的状态,从而将先前没有执行的步骤执行完成,其实现逻辑大致如下:

对元数据的操作分为这个几类:UPGRADE,FINALIZE,ROLLBACK和CHECKPOINT,基本思路大多是先生成变化后的元数据到临时目录,然后移动current到previous,然后临时目录移动为current,然后再删除previous。根据比较所存在的文件夹,就能得到当前上一次集群做了什么操作,做到哪一步的时候挂了,如可能有COMPLETE_UPGRADE表示升级已经完成了,需要previous.tmp -> previous,RECOVER_UPGRADE表示current已经移动了,需要previous.tmp -> current。

完成这些半吊子的动作之后,再更加启动模式执行相应的逻辑。当然,例外的是formate,formate时是不管当前目录是啥状态,统统清空重新初始化。

             

n  IMPORT:可能从其他地方得到了元数据,比如以前的服务器坏了,换了新的服务器,需要将元数据导入到新的服务器上,则需要执行Import指令。执行导入时,需要将需要导入的元数据存储到fs.checkpoint.dir和fs.checkpoint.edits.dir目录下,在FSNameSystem.loadImage中,会从这个目录下面加载元数据,加载完成之后存储到current下面。如果导入的版本不兼容,需要先升级到能够支持的版本

n  UPGRADE:升级由UpgradeManagerNamenode(extends UpgradeManager):未完待续

n  ROLLBACK:回退相对简单,判定有没有previous内容,如果有,就可以使用previous的替换掉current就可以了,否则不能回退。

l  安全模式:安全模式的设置,是为了保证集群中的文件的可用性,只有当集群中文件可用性达到一定比例之后,才会退出安全模式,当然手动进入安全模式的除外。可用性的比例是使用备份数大于等于系统允许最小备份数的block数量占比整个集群的block数量,系统默认值使用参数dfs.safemode.threshold.pct配置,值是0.999.系统启动时,在FSNamesystem.initialize中,Image元数据加载完之后,得到集群block总数,于是设置SafeModeInfo的blockTotal值,调用checkMode确认是否进入安全模式,因为此时NameNode还未完全启动,所以DataNodeblock信息还没有上报上来,所以blockSafe0,肯定进入安全模式。然后当NameNode启动之后,dataNodeblock信息上报到NameNodenameNode检验block正确之后(逻辑参见上面NameNode. blockReport函数的实现说明),于是调用incrementSafeBlockCount函数,增加blockSafe的值,于是安全的block占比逐渐增大,当达到系统配置的dfs.safemode.threshold.pct值时,于是便自动退出安全模式。当然安全模式也可以手动退出,NameNode上有函数setSafeMode可以手动进入和退出安全模式。

l  RPC模型:很简单,就是使用Select模式启动一个server,客户端使用代理模式得到一个接口的实例,调用该实例上的方法时,将接口名称,method和方法参数编码发送到服务端,服务器解码,交给Handler线程处理逻辑,使用反射调用method,拿到执行结果,再返回给调用端

l  统计模块:该模块用于在统计NameNode中的各操作的统计指标,如numFilesCreated,numDeleteFileOps,numFilesAppended等。

hadoop-NameNode篇(基于0.20版本)

NameNode中维护有NameNodeMetrics对象实例,其是统计的工具类,在NameNode.Initialize函数中被初始化,NameNodeMetrics对象中有多个用于计数的MetricsTimeVaryingInt实例,并且这些实例被增加到MetricsRegistry对象的metricsList属性中,然后再将MetricsRegistry对象存入到NameNodeActivtyMBean中,NameNodeActivtyMBean对象是动态MBean,继承于MetricsDynamicMBeanBase对象,MetricsDynamicMBeanBase对象由createMBeanInfo函数对JMXServer暴露MBean信息。

FSNamesystem中也有一个统计类FSNamesystemMetrics,用于暴露如capacityRemainingGBblocksTotalfilesTotal等文件系统的相关指标。在FSNamesysteminitialize函数中,使用StandardMBeanFSNamesystem暴露给JMX(因为FSNamesystem实现了FSNamesystemMBean接口,所以可以被以Standard的方式注册到JMX环境中),可以修改hadoop的启动配置加上JMX相关参数,就可以使用jconsole或其他jmx客户端查看这些指标。

Hadoop系统中的metrics是一个挺复杂的模块,除了和JMX集成而外,还能和诸如Ganglia这类监控系统集成,在org.apache.hadoop.metrics及其子包中实现向这类监控系统上报数据的逻辑,在hadoop-metrics.properties文件可以配置dfsmapredjvm向哪个系统上报,我们不在此详细讨论,有兴趣的朋友可以再研究,欢迎分享。

l  解决NameNode单点故障:经过我们上面详细的分析hdfs内部实现,能够发现hdfs的文件元数据只在NameNode服务中存储为快照和EditLog,如果NameNode坏掉,丢失元数据的风险非常大,一旦元数据丢失,那么整个HDFS中存储的数据都不在可用。如果防止这类问题呢,最简单的办法是存储一份快照和EditLog数据到NFS磁盘,该磁盘是其他服务器通过mount过来的,当然如果你愿意,也可以多mount几个服务器过来也可以,其次如果NameNode上存储的数据丢失之后,SecondaryNameNode上还有数据,虽然最新的数据可能还没有同步到SecondaryNameNode上来(取决于系统配置的同步间隔时间fs.checkpoint.period),但是有总比没有好。当然其他版本的hadoop有更好的方式,大家可以上网搜索一下,我这里就抛砖引玉了。。。

 InfoServer

infoserver使用http接口对外提供对集群的管控功能,如fsck,listPaths,data等,启动在dfs.http.address端口上,可以通过配置dfs.https.enable决定是否启动https服务(http启动在端口dfs.datanode.https.address),如果启动https的话,通过dfs.https.need.client.auth配置是否需要验证客户端证书(默认关闭),如果需要验证客户端,则存储CA的truststore位置在ssl.server.truststore.location,truststore的密码通过ssl.server.truststore.password配置,类别为ssl.server.truststore.type,默认为jks。服务器公钥证书在ssl.server.keystore.location。

如果需要通过infoserver访问文件系统,需要提供访问的帐号,帐号信息通过URL参数ugi传递,ugi参数可以包含一个userName和多个group,中间使用逗号分隔,第一个为user,后面的为group。

权限模型:

权限这块可以分为两部分:

一部分是控制哪些用户能够连接上NameNode执行RPC接口服务(默认从hadoop-policy.xml读取,文件名可以通过系统参数传进去String policyFile = System.getProperty("hadoop.policy.file", HADOOP_POLICY_FILE);),如以下的配置表示允许任何用户都能够连接上NameNode执行ClientProtocol接口的RPC调用,如果只允许某一个用户能够连接,则在value中使用用户名替换掉"*"即可,除了配置用户名而外,还可以配置group,允许某些用户或某个group中的所有用户连接,多个值的时候中间使用逗号隔开

  <property>
    <name>security.client.protocol.acl</name>
    <value>*</value>
    <description>ACL for ClientProtocol, which is used by user code
    via the DistributedFileSystem.
    The ACL is a comma-separated list of user and group names. The user and
    group list is separated by a blank. For e.g. "alice,bob users,wheel".
    A special value of "*" means all users are allowed.</description>
  </property>

每个property name对应的RPC接口可以在PolicyProvider的子类中找到,如在HDFSPolicyProvider中:

    new Service("security.client.protocol.acl", ClientProtocol.class),
    new Service("security.client.datanode.protocol.acl",
                ClientDatanodeProtocol.class),
    new Service("security.datanode.protocol.acl", DatanodeProtocol.class),
    new Service("security.inter.datanode.protocol.acl",
                InterDatanodeProtocol.class),
    new Service("security.namenode.protocol.acl", NamenodeProtocol.class),
    new Service("security.refresh.policy.protocol.acl",
                RefreshAuthorizationPolicyProtocol.class),

 

另外一部分是控制文件系统的权限,如某个路径只能是root组才可写。后面我们依次介绍。

首先详细介绍一下控制连接NameNode(JobTracker也类似),先看一下UML图

hadoop-NameNode篇(基于0.20版本)

再解释一下,Java里面Permission,Policy这一套东西还真是麻烦,网上介绍实现流程的非常少,基本都是说policy文件怎么配置,花了不少时间才有点眉目,如有说错,请熟悉这块的朋友耐心指出哈。。。

hadoop这块的权限文件不是采用的java默认配置格式,而是在hadoop自身的配置中来配置的,所以定义了ConfiguredPolicy类(通过调用Policy.setPolicy函数可以替换掉系统默认的Policy实现),该类中由refresh函数从hadoop-policy.xml中加载权限的信息到内存,形成Map<Principal, Set<Permission>>结构,该结构中维护着每个身份(身份可以是user,可以是group)拥有哪些权限(如调用ClientProtocol接口)。

当Client要连接RPC服务器时,通过Server.getProxy方法,此时可以传一个UserGroupInformation进去,也可以不用传。如果不传的话,就从配置文件的hadoop.job.ugi参数中读取(中间使用逗号隔开,第一个是用户名,后面的是group名),如果没有配置,则读取操作系统的帐号和group(whoami和groups的执行结果),使用这些信息可以生成UserGroupInformation对象,后面和服务器通信时,这个信息会被作为ConnectionHeader中的一部分,传递给RPC服务器。服务器接收到请求之后,从ConnectionHeader中解出UserGroupInformation,根据UserGroupInformation中的user和group信息相应生成继承至Principal的User和Group对象,使用得到的user和group构建成Subject(表示当前用户用户哪些身份),生成ConnectionPermission对象,调用checkPermission函数验证当前Subject是否拥有对应接口上的ConnectionPermission的权限(ConnectionPermission中的protocol表示要连接什么接口),使用Subject.doAs函数,传入Subject对象,在JDK底层,将subject包装成ProtectionDomain,调用PermissionCollection ConfiguredPolicy.getPermissions(ProtectionDomain domain)函数,从domain中得到所有的用户身份,从ConfiguredPolicy得到这些用户身份所拥有的所有权限(通过PermissionCollection返回),然后jdk中再迭代每个Permission,确认该Permission和需要验证的ConnectionPermission是否匹配,如果匹配则说明拥有权限:

  public boolean implies(Permission permission) {
    if (permission instanceof ConnectionPermission) {
      ConnectionPermission that = (ConnectionPermission)permission;
      if (that.protocol.equals(VersionedProtocol.class)) {
        return true;
      }
      return this.protocol.equals(that.protocol);
    }
    return false;
  }

通过上面的解释,没有明白hadoop在这块控制的意义在哪儿,期望能够限制访问RPC服务的帐号,但是帐号既没有向NameNode认证(取的是本地uninx帐号),又没有NameNode颁发的密码,又没有限制客户端服务器(比如可以类似mysql,帐号后面增加了host,如root@192.68.2.5),非常容易被人绕过这个权限限制。 

 接下来再说一下文件系统权限控制的实现:在通过上面说的ConnectionPermission验证之后,在RPC Server的Handler中(RPC的实现可以参见本文章中介绍RPC的章节)使用Subject.doAs函数将调用业务逻辑的代码封装起来,使得在RPC接口实现中能够通过Subject.getSubject(AccessController.getContext())得到当前的Subject对象,知道执行当前逻辑是哪个User,属于哪些Group,结合需要操作文件或文件夹上的权限属性(类似linux中文件的权限属性),就可以知道当前用户能否执行操作。在需要验证权限的地方,如create,delete等,调用FSNamesystem.checkPermission函数,生成PermissionChecker对象,在PermissionChecker中,首先判断当前用户(subject)是否和超级用户(当NameNode启动的初始化过程中,会读取当前系统的UnixUserGroupInformation,该用户则为HDFS的超级用户)帐号相同,或者当前用户所处组中包含超级用户所在的组(从配置dfs.permissions.supergroup中读取,默认supergroup),如果是超级用户,则所有权限都通过,否则根据需要判断(详细过程不再说,和linux类似)。

从上面的分析来看,hadoop的权限控制非常弱,基本不能应用于实际控制。如果知道了启动hadoo的帐号,任何客户端使用同样的帐号连接上NameNode,则为超级用户。但是hadoop0.20版本中也并未对这块提供扩展点用于扩展安全授权这块功能。

 

l  主要线程

hadoop-NameNode篇(基于0.20版本)

我们通过debug或是使用jstack能够发现NameNode启动之后有如上线程,我们结合前面讨论的,解释一下每个线程功能:

PendingReplicationBlocks$PendingReplicationMonitor

LeaseManager$Monitor:循环检测过期的租约,其功能介绍可以参见本文章中的租约模块部分

FSNamesystem$HeartbeatMonitor:检测DataNode是否还存活(node.getLastUpdate() <(now() - heartbeatExpireInterval)),如果已经dead,从FSNamesystem的blockMap中移除该dataNode上的所有block信息,从网络拓扑(clusterMap)中移除该dataNode信息。

FSNamesystem$ReplicationMonitor定时计算每个DataNode的任务,比如dataNode上需要增加拷贝或删除失效的block等,这些任务会放到对应dataNode的队列中,等dataNode心跳包上来的时候,会将这些任务领回去。

DecommissionManager$Monitor检测每个dataNode当前是否处于Decommission

IPC Server Handler:RPC 服务器处理请求的Handler请求,在NameNode中Handler的数量由dfs.namenode.handler.count控制,默认是10.

IPC Server listener:该线程用于读取socket的可读事件,读取数据解码成Call对象,放到队列中,等待Handler来处理。

IPC Server responder:该线程读取socket的可写事件,将需要发送给socket的数据发送出去。

 

 

到这里,NameNode部分的主要内容就基本介绍完了,通过阅读这块源代码,除了了解到hadoop的实现原理,有利于在实际使用过程中快速定位解决问题而外,对后面自己的系统设计也有了很多参照的模板,当然也发现了其局限性:

1.      比如大家都知道的NameNode单点故障问题和集群扩展性短板;所有元数据放在内存,内存消耗过大;此处可以采用类似于缓存+Hbase的方案存储,考虑到即时整个集群中文件非常多,但是热点数据一般只是一小部分而已,我想应该影响不是太大。如果解决了小文件存储之后,hadoop用俩存储图片,在前面再加上CDN,应该也OK

2.      集群启动时,DataNode将Block信息向NameNode上报,导致网络风暴(按照集群存储10T,每个block120M,保存三个备份来计算,看看启动时需要有多少block信息上报。。。);是否能够在DataNode本地存储本次上报和上次上报的差异做增量上报,在NameNode端,对block数据也做持久化。

3.      不适合小文件存储,因为小文件一样产生INode和Block,对NameNode消耗和大文件相似,所以集群资源利用率上不去;目前我们项目中<500K的文件就采用的是bigTable存储方案,而非GFS方案