创建DataNode的入口DataNode.main(String[] args),主要的处理逻辑在方法【DataNode.createDataNode(String[] args, Configuration conf, SecureResources resources)】中,分别初始化DataNode和启动DataNode的守护线程两大步。
一.【DataNode.instantiateDataNode(String[] args,Configuration conf, SecureResources resources) 】初始化DataNode:
1.读取"dfs.data.dir"配置参数,放入dataDirs:String[]中;
2.【DataNode.makeInstance(String[] dataDirs,Configuration conf, SecureResources resources)】
2.1)【FileSystem.getLocal(Configuration conf)】初始化一个LocalFileSystem对象,即本地文件系统对象;
2.2)遍历dataDirs:
2.2.1)调用【DiskChecker.checkDir(LocalFileSystem localFS, Path dir, FsPermission expected)】:
1)【DiskChecker.mkdirsWithExistsAndPermissionCheck(LocalFileSystem localFS,Path dir,FsPermission expected)】:创建${dfs.data.dir}目录并赋上权限;
2)检查此目录是否有读写权限以及是否为目录,否则抛出异常;
2.2.2)对每个路径初始化File对象,并存入dirs:ArrayList<File>队列中;
2.3)【DataNode.DataNode(Configuration conf, AbstractList<File> dataDirs, SecureResources resources)】,初始化DataNode对象;内部调用【DataNode.startDataNode(Configuration conf,AbstractList<File> dataDirs, SecureResources resources)】方法,具体实现步骤如下:
2.3.1)获取NameNode的访问地址,并初始化为InetSocketAddress对象:nameNodeAddr;
1)读取配置文件中的dfs.namenode.servicerpc-address参数的值;若有值则返回;否则进入第2步;
2)读取配置文件中的dfs.namenode.rpc-address参数的值;若有值则返回;否则进入第3步;
3)读取配置文件中的fs.default.name参数的值并返回;
2.3.2)读取配置文件中的参数“dfs.socket.timeout”,“dfs.datanode.socket.write.timeout”,“dfs.datanode.transferTo.allowed”,“dfs.write.packet.size”等配置信息;
2.3.3)获取DataNode的数据块流的读写的地址,配置文件中的参数dfs.datanode.address值或'dfs.datanode.bindAddress':'dfs.datanode.port'值
2.3.4)【new DataStorage()】初始化DataNode.storage变量,主要用于管理数据目录的类,完成格式化,升级,回滚等功能;
2.3.5)读取配置"slave.host.name“;若不为空,则赋值给DataNode.machineName:String;若为空,则读取"dfs.datanode.dns.interface" 和 "dfs.datanode.dns.nameserver",此两个配置缺省均为default,通过【DNS.getDefaultHost(String strInterface, String nameserver)】生成DataNode.machineName变量的值;
2.3.6)通过DataNode.machineName和第2.3.3步的port值 初始化DataNode.dnRegistration:DatanodeRegistration对象;
2.3.7)调用RPC.waitForProxy(...)方法,通过nameNodeAddr网络地址获取与NameNode通信的客户端DataNode.namenode;
2.3.8)【DataNode.handshake()】通过RPC方式调用【NameNode.versionRequest()】,从NameNode获取namespaceid,ctime信息然后封装成NamespaceInfo对象返回;在NameNode端调用【FSNamesystem.getNamespaceInfo()】
2.3.9)【DataStorage.recoverTransitionRead(NamespaceInfo nsInfo, Collection<File> dataDirs, StartupOption startOpt)】
2.3.9.1)遍历形参dataDirs(在第2.2.2步中生成):
1)根据集合中的File对象初始化StorageDirectory对象并存入Storage.storageDirs:List<StorageDirectory>中;
2)【StorageDirectory.analyzeStorage(StartupOption startOpt)】检查所有${dfs.data.dir}目录下文件的一致性,大致思路如下:
2.1)检测${dfs.data.dir}文件夹是否存在,是否有写权限;
2.2)根据VERSION,previous,previous.tmp,finalized.tmp,lastcheckpoint.tmp文件的存在与否来判断文件的状态;
2.3.9.2)遍历Storage.storageDirs集合,调用【DataStorage.doTransition(StorageDirectory sd,NamespaceInfo nsInfo, StartupOption startOpt)】:
1)如果启动参数为“-rollback”,则调用【DataStorage.doRollback(StorageDirectory sd,NamespaceInfo nsInfo)】进行回滚,大致思路:删掉current文件,并将previous文件名改为current;
2)读取VERSION文件的信息;
2.3.9.3)更新所有${dfs.data.dir}目录下写VERSION文件信息;
2.3.10)【DatanodeRegistration.setStorageInfo(DataStorage storage)】将DataNode.storage变量设置到DatanodeRegistration.storageInfo和DatanodeRegistration.storageID中,此时storageID为"",在向NameNode注册的时候分配;
2.3.11)【FSDataset.FSDataset(DataStorage storage, Configuration conf)】生成FSDatasetInterface接口的实现类FSDataset,并赋值给DataNode.data变量,FSDataset类是所有数据块读写的实际操作类
2.3.11.1)读取"dfs.datanode.numblocks"参数值,表示每个block块的最大值;
2.3.11.2)初始化FSVolume[]数组,其中数组大小为Storage.storageDirs集合中StorageDirectory的个数,即list.size()的值;
2.3.11.3)遍历Storage.storageDirs集合,调用【FSDataset.FSVolume.FSVolume(File currentDir, Configuration conf)】初始化FSVolume对象,并存入数组FSVolume[]中;其中currentDir='${dfs.data.dir}/current'的File对象;FSVolume的成员初始化如下:
1)【FSDataset.FSDir.FSDir(File dir)】初始化FSVolume.dataDir:FSDir,通过这个FSDir的初始化过程可以生成以${dfs.data.dir}/current为根目录的树形结构,结构中每个节点的数据成员如下:
FSDir.dir 为当前文件或文件夹的File对象;
FSDir.children表示当前目录下面存在的文件夹的File对象集合;
FSDir.numBlocks表示当前目录下面block文件的个数;
2)FSVolume.currentDir是current文件的File对象;
3)FSVolume.detachDir是对'${dfs.data.dir}/detach'文件生成的File对象;
4)若存在detach文件夹,则调用【FSVolume.recoverDetachedBlocks(File dataDir, File dir)】,若detach里面的文件在current中不存在,则将此文件恢复到current下面;若存在,则删除detach下面的此文件;
5)若存在'${dfs.data.dir}/tmp'文件,则删除;
6)若存在'${dfs.data.dir}/blocksBeingWritten'文件,并且参数"dfs.durable.sync"为true;则调用【FSVolume.recoverBlocksBeingWritten(File bbw)】, 将blocksBeingWritten文件下面的内容放入FSDataset.volumeMap和FSDataset.ongoingCreates中;若参数"dfs.durable.sync"为false,则直接删除blocksBeingWritten文件;
7)创建detach,tmp,blocksBeingWritten文件;
8)初始化FSVolume.usage:DF 和FSVolume.dfsUsage:DU;DF和DU是用于在linux上执行shell命令"df"和"du"检查磁盘空间等信息(df命令可以获取硬盘占用了多少空间,还剩下多少空间;du是面向文件的命令,查看目录大小),在NameNode.sendHeartbeat()的时候会用到;
2.3.11.4)【FSVolumeSet(FSVolume[] volumes)】根据上一步中生成的FSVolume[]数组初始化FSVolumeSet对象;并将此FSVolumeSet对象赋值给FSDataset.volumes变量;
2.3.11.5)【FSVolumeSet.getVolumeMap(HashMap<Block,DatanodeBlockInfo> volumeMap)】生成HashMap<Block,DatanodeBlockInfo> volumeMap,将所有dfs.data.dir/current目录下的所有block块存入FSDataset.volumeMap中;{##注释:current中block由blk_blockId和blk_blockId_GENSTAMP.meta两个文件组成##}
1)遍历FSVolume[]数组,调用【FSVolume.getVolumeMap(HashMap<Block,DatanodeBlockInfo> volumeMap)】;内部递归调用【FSVolume.dataDir.getVolumeMap(HashMap<Block,DatanodeBlockInfo> volumeMap, FSVolume volume)】,以${dfs.data.dir}/current根节点开始遍历整个目录树,将所有block块存入FSDataset.volumeMap中;生成Block的逻辑如下:
1.1)获取generationStamp,此值是文件名blk_blockId_GENSTAMP.meta中GENSTAMP的值;
1.2)根据blockId,blk_blockId文件的长度,第1.1步中的generationStamp生成Block对象;
1.3)根据FSVolume和blk_blockId文件的File对象生成DatanodeBlockInfo对象;
1.4)以Block对象为Key,DatanodeBlockInfo对象为Value,存入volumeMap中;
2.3.11.6)启动FSDataset.AsyncBlockReport线程,生成FSDataset.scan:遍历FSVolume[]数组,将所有的blk_blockId文件存入AsyncBlockReport.scan:HashMap<Block, File>中;将此scan赋值给FSDataset.scan;
2.3.11.7)以'${dfs.data.dir}/current'生成的File队列初始化FSDatasetAsyncDiskService对象,并赋值给FSDataset.asyncDiskService变量;
2.3.12)初始化线程DataNode.dataXceiverServer:DataXceiverServer;
2.3.13)初始化DataNode.ipcServer:Server,此Server注册的是BlockTokenSecretManager类;
2.3.13)初始化DataNode.blockScanner:DataBlockScanner对象;
2.3.14)初始化线程池DataNode.readaheadPool:ReadaheadPool;
2.3.15)初始化DataNode.infoServer:HttpServer服务,并开启此HTTP服务;注册Servlet包括:StreamFile,FileChecksumServlets.GetServlet,DataBlockScanner.Servlet;
至此DataNode的初始化工作结束。
----------------------------------------------------
二.启动DataNode的第二大步就是启动DataNode的守护线程【DataNode.runDatanodeDaemon(DataNode dn) 】,在此大步中要完成两个工作:
1.【DataNode.register()】向NameNode注册此DataNode:
1.1)若DataNode.dnRegistration(在一.2.3.6和2.3.10步中创建的)的storageID为"",则创建一个storageID,规则为["DS-" + rand + "-"+ ip + "-" +dnRegistration.getPort() + "- +System.currentTimeMillis()];
1.2)设置dnRegistration.name等于[machineName(一.2.3.5步生成的)+":"+dnRegistration.getPort()];
1.3)【NameNode.register(DatanodeRegistration nodeReg)】通过RPC调用NameNode的register方法,把DataNode.dnRegistration对象传递给NameNode;内部调用【FSNamesystem.registerDatanode(DatanodeRegistration nodeReg)】完成DataNode的注册工作:
1.3.1)获取此远程DataNode的IP地址,通过FSNamesystem.hostsReader中的访问或禁止列表检查是否运行此DataNode访问;
1.3.2)用传入的nodeReg对象更新DatanodeRegistration对象的name,infoPort,ipcPort字段值;(感觉有点多余了)
1.3.3)根据dnRegistration.getStorageID()在FSNamesystem.datanodeMap中查找nodeS:DatanodeDescriptor对象;
1.3.4)根据dnRegistration.getName()在FSNamesystem.host2DataNodeMap中查找nodeN:DatanodeDescriptor对象;
1.3.5)若nodeN != null && nodeN != nodeS,则:
1)从FSNamesystem.heartbeats中删除nodeN对象;
2)从FSNamesystem.blocksMap删除nodeN对象中的所有block块(即此DataNode上的所有block块信息);
3)从网络拓扑图FSNamesystem.clusterMap:NetworkTopology中删除nodeN节点;
4)从FSNamesystem.datanodeMap和FSNamesystem.host2DataNodeMap中删除nodeN节点;
1.3.6)若nodeS==null,则进入1.3.7至1.3.11步;若nodeS!=null则进入1.3.12至1.3.16步;
1.3.7)【DatanodeDescriptor(DatanodeID nodeID, NetworkTopology.DEFAULT_RACK, String hostName)】初始化DatanodeDescriptor对象;
1.3.8)【FSNamesystem.resolveNetworkLocation(DatanodeDescriptor node)】通过FSNamesystem.dnsToSwitchMapping对象(在创建NameNode时初始化)将DataNdoe的hostname转换成网络路径并设置给(DatanodeInfo)dnRegistration.location变量;此转换的目的是便于生成一个DataNode的网络拓扑图。
1.3.9)【FSNamesystem.unprotectedAddDatanode(DatanodeDescriptor nodeDescr)】,添加FSNamesystem.datanodeMap和FSNamesystem.host2DataNodeMap内容;
1)以StorageID为Key,dnRegistration对象为Value,存入FSNamesystem.datanodeMap中;
2)以hostname为Key,DatanodeDescriptor对象为Value,存入FSNamesystem.host2DataNodeMap中;Host2NodesMap实质的数据结构是HashMap<String, DatanodeDescriptor[]>;
1.3.10)【NetworkTopology.add(Node node)】将此DataNode节点添加到一个以DataNode的网络位置形成的网络拓扑树状图(用FSNamesystem.clusterMap表示)中;此树状结构的叶子节点为DatanodeDescriptor对象,非叶子节点为InnerNode对象,表示转换器Switch或机架Rack的路由器Router.具体逻辑参考博客《http://blog.csdn.net/zqhxuyuan/article/details/10198639?reload》;
1.3.11)将DatanodeDescriptor对象存入FSNamesystem.heartbeats中;作用待分析;
1.3.12)从网络拓扑图FSNamesystem.clusterMap:NetworkTopology中删除nodeS节点;
1.3.13)用传入的nodeReg对象更新nodeS对象的name,infoPort,ipcPort字段值;
1.3.14)【FSNamesystem.resolveNetworkLocation(DatanodeDescriptor node)】:hostname转换成网络路径,见1.3.7步;
1.3.15)【NetworkTopology.add(Node node)】,参考1.3.9步;
1.3.16)将DatanodeDescriptor对象存入FSNamesystem.heartbeats中;作用待分析;
1.4)若'dfs.durable.sync'为true,
1.4.1)【FSDataset.getBlocksBeingWrittenReport()】获取blocksBeingWritten目录下面的所有Block封装成Block[]数组;
1.4.2)【BlockListAsLongs.convertToArrayLongs(Block[] blockArray)】将Block[]数组转换成long[]数组,利于传输;;转换规则如下:
1)long[]数组的长度=Block[]数组长度*3;
2)long[index*3]=Block[index].BlockId
3)long[index*3+1]=Block[index].NumBytes
4)long[index*3+2]=Block[index].GenerationStamp
1.4.3)【NameNode.blocksBeingWrittenReport(DatanodeRegistration nodeReg, long[] blocks)】向NameNode报告正在写入的文件;
1.5)【FSDataset.requestAsyncBlockReport()】唤醒FSDataset.AsyncBlockReport线程;
2.启动DataNode线程,此线程的主要工作如下:
2.1)启动DataXceiverServer线程;此线程用于监听其他DataNode或DFSClient客户端的请求,用于发送和接受block数据;
2.2)启动ipcServer服务;用于处理DFSClient的RPC调用请求;
2.3)循环执行【DataNode.offerService()】;DataNode提供服务,定时发送心跳给NameNode,响应NameNode返回的命令并执行。具体工作如下:
2.3.1)间隔DataNode.heartBeatInterval时间向NameNode发送心跳消息;【NameNode.sendHeartbeat(DatanodeRegistration nodeReg, long capacity, long dfsUsed, long remaining, int xmitsInProgress, int xceiverCount) 】,其中capacity,dfsUsed,remaining是调用DF和DU中的方法得到的;
2.3.2)【DataNode.processCommand(DatanodeCommand[] cmds)】在DataNode上根据第2.3.1步中NameNode对心跳信息的返回结果执行相关操作;
2.3.3)检测是否有新的block到达,即DataNode.receivedBlockList队列的大小大于0;若新block达到,则调用【NameNode.blockReceived(DatanodeRegistration nodeReg, Block[] blocks, String[] delHints)】,在此方法中将Block对应的DataNode信息放入对应的BlockInfo.Object[]中,即完善存放此Block的DataNode信息(NameNode内存中目录树结构的叶子节点INode对象上BlockInfo的成员变量Object[];
2.3.4)间隔DataNode.blockReportInterval时间向NameNode发送block报告;
2.3.4.1)【FSDataset.retrieveAsyncBlockReport()】内部调用FSDataset.reconcileRoughBlockScan(HashMap<Block,File> seenOnDisk) 其中seenOnDisk= FSDataset.scan;找到需要报告的block块;
2.3.4.2)【BlockListAsLongs.convertToArrayLongs(Block[] blockArray)】将Block[]数组转换成long[]数组;转换规则如下:
1)long[]数组的长度=Block[]数组长度*3;
2)long[index*3]=Block[index].BlockId
3)long[index*3+1]=Block[index].NumBytes
4)long[index*3+2]=Block[index].GenerationStamp
2.3.4.2)【NameNode.blockReport(DatanodeRegistration nodeReg, long[] blocks)】,NameNode内部调用【FSNamesystem.processReport(DatanodeID nodeID,
BlockListAsLongs newReport)】,具体实现如下:
2.3.4.2.1)【FSNamesystem.getDatanode(DatanodeID nodeID) 】(DatanodeRegistration为DatanodeID的子类);根据DatanodeRegistration对象的StorageID值在FSNamesystem.datanodeMap中查找DatanodeDescriptor对象:node;
2.3.4.2.2)【node.reportDiff(BlocksMap blocksMap, BlockListAsLongs newReport, Collection<Block> toAdd, Collection<Block> toRemove,Collection<Block> toInvalidate)】
1)遍历newReport即Block[]数组转换成的long[]数组;
1.1)根据传来的long[]生成Block对象block,在FSNamesystem.blocksMap中根据block查找BlockInfo对象storedBlock;若storedBlock==null,则以long[index*3],long[index*3+1],generationStamp=1为参数初始化Block对象,然后在FSNamesystem.blocksMap中查找storedBlock对象;
1.2)若storedBlock==null,则将Block对象放入toInvalidate集合中;继续遍历下一组long[];
1.3)【BlockInfo.findDatanode(DatanodeDescriptor dn)】查找当前DatanodeDescriptor对象是否在blockinfo(第a.步查找到的)的Object[]数组的Object[index*3]中;若不在:
1.3.1)若storedBlock.getNumBytes()等于新生成的block.getNumBytes(),则将storedBlock放入toAdd集合中;
1.3.2)若不相等,则将新生成的block对象放入toAdd集合中;
1.3.3)继续遍历下一组long[];
1.4)【DatanodeDescriptor.moveBlockToHead(BlockInfo b)】:若当前的DatanodeDescriptor对象已经存入blockinfo的Object[]数组Object[index*3]中;则将该blockinfo调整为此DataNode上由BlockInfo构成的双向链表的头节点;并将DatanodeDescriptor.blockList指向此blockinfo,用于指向链表的头节点;继续遍历下一组long[];
2)遍历此DataNode上的双向链表,依次往下遍历读取BlockInfo对象storedBlock,若storedBlock.inode==null或storedBlock.inode.isUnderConstruction()=false,则将此storedBlock对象放入toRemove集合中;
2.3.4.2.3)遍历toRemove集合,调用【FSNamesystem.removeStoredBlock(Block block,DatanodeDescriptor node)】从FSNamesystem.blocksMap中删除此block
2.3.4.2.4)遍历toAdd集合,调用【FSNamesystem.addStoredBlock(Block block,DatanodeDescriptor node, DatanodeDescriptor delNodeHint)】
1)从BlocksMap.getStoredBlock(Block b)中获取BlockInfo类的对象storedBlock;
2)【node.addBlock(BlockInfo b)】将BlockInfo放入此DataNode的双向链表的头节点中,具体实现如下:
2.1)将DatanodeDescriptor对象存入BlockInfo对象的Object[lastindex*3]中;将Object[lastindex*3+1]和Object[lastindex*3+2]设置成null;
2.2)将此BlockInfo对象的Object[lastindex*3+2]设置为DatanodeDescriptor.blockList,在第一次加入此DataNode的双向链表时blockList=null;
2.3)将此BlockInfo对象赋值给DatanodeDescriptor.blockList;
3)余下的工作是对正在创建的文件的处理,未分析;
2.3.4.2.5)遍历toInvalidate集合,将toInvalidate集合内容放入FSNamesystem.recentInvalidateSets中;
2.3.4.2.6)设置DatanodeDescriptor.firstBlockReport=false;
2.3.5)【DataNode.processCommand(DatanodeCommand cmd)】在DataNode上根据第2.3.4步中NameNode对block报告的返回结果执行相关操作;