HDFS副本存放策略

时间:2023-03-08 18:53:51

  在client向DataNode写入block之前,会与NameNode有一次通信,由NameNode来选择指定数目的DataNode来存放副本。具体的副本选择策略在BlockPlacementPolicy接口中,其子类实现是BlockPlacementPolicyDefault。该类中会有多个chooseTarget()方法重载,但最终调用了下面的方法:

 /**
* This is not part of the public API but is used by the unit tests.
*/
DatanodeDescriptor[] chooseTarget(int numOfReplicas,
DatanodeDescriptor writer,
List<DatanodeDescriptor> chosenNodes,
HashMap<Node, Node> excludedNodes,
long blocksize) {
//numOfReplicas:要选择的副本个数
//clusterMap.getNumOfLeaves():整个集群的DN个数
if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
return new DatanodeDescriptor[0];
} //excludedNodes:排除的DN(因为有些DN已经被选中,所以不再选择他们)
if (excludedNodes == null) {
excludedNodes = new HashMap<Node, Node>();
} int clusterSize = clusterMap.getNumOfLeaves();
//总的副本个数=已选择的个数 + 指定的副本个数
int totalNumOfReplicas = chosenNodes.size()+numOfReplicas;
if (totalNumOfReplicas > clusterSize) { //若总副本个数 > 整个集群的DN个数
numOfReplicas -= (totalNumOfReplicas-clusterSize);
totalNumOfReplicas = clusterSize;
} //计算每个一个rack能有多少个DN被选中
int maxNodesPerRack =
(totalNumOfReplicas-1)/clusterMap.getNumOfRacks()+2; List<DatanodeDescriptor> results =
new ArrayList<DatanodeDescriptor>(chosenNodes);
for (DatanodeDescriptor node:chosenNodes) {
// add localMachine and related nodes to excludedNodes
addToExcludedNodes(node, excludedNodes);
adjustExcludedNodes(excludedNodes, node);
} //客户端不是DN
if (!clusterMap.contains(writer)) {
writer=null;
} boolean avoidStaleNodes = (stats != null && stats
.shouldAvoidStaleDataNodesForWrite()); //选择numOfReplicas个DN,并返回本地DN
DatanodeDescriptor localNode = chooseTarget(numOfReplicas, writer,
excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes); results.removeAll(chosenNodes); // sorting nodes to form a pipeline
//将选中的DN(result中的元素)组织成pipe
return getPipeline((writer==null)?localNode:writer,
results.toArray(new DatanodeDescriptor[results.size()]));
}

  方法含义大概就如注释中写的,不过要注意其中的变量含义。在第48行,又调用chooseTarget()方法来选择指定数目的DN(选中的DN存放在result中),并返回一个DN作为本地DN。下面分析这个方法。

 /* choose <i>numOfReplicas</i> from all data nodes */
private DatanodeDescriptor chooseTarget(int numOfReplicas,
DatanodeDescriptor writer, HashMap<Node, Node> excludedNodes,
long blocksize, int maxNodesPerRack, List<DatanodeDescriptor> results,
boolean avoidStaleNodes) { if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
return writer;
}
int totalReplicasExpected = numOfReplicas + results.size(); int numOfResults = results.size();
boolean newBlock = (numOfResults==0);
if (writer == null && !newBlock) {
writer = results.get(0);
} // Keep a copy of original excludedNodes
final HashMap<Node, Node> oldExcludedNodes = avoidStaleNodes ?
new HashMap<Node, Node>(excludedNodes) : null; try {
if (numOfResults == 0) { //选择本地DN
writer = chooseLocalNode(writer, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes);
if (--numOfReplicas == 0) {
return writer;
}
}
if (numOfResults <= 1) { //选择远程rack上的DN
chooseRemoteRack(1, results.get(0), excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes);
if (--numOfReplicas == 0) {
return writer;
}
}
if (numOfResults <= 2) {
if (clusterMap.isOnSameRack(results.get(0), results.get(1))) { //若前两个DN在同一个rack上
//已选择的前两个DN在同一个rack上,则选择与第1个DN不在同一个rack上的DN
chooseRemoteRack(1, results.get(0), excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes);
} else if (newBlock){
//选择与第2个DN在同一个rack上的DN
chooseLocalRack(results.get(1), excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes);
} else {
//选择与write在同一个rack上的DN
chooseLocalRack(writer, excludedNodes, blocksize, maxNodesPerRack,
results, avoidStaleNodes);
}
if (--numOfReplicas == 0) {
return writer;
}
}
//在整个集群中随机选择剩余的DN
chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes);
} catch (NotEnoughReplicasException e) {
FSNamesystem.LOG.warn("Not able to place enough replicas, still in need of "
+ (totalReplicasExpected - results.size()) + " to reach "
+ totalReplicasExpected + "\n"
+ e.getMessage());
if (avoidStaleNodes) {
// Retry chooseTarget again, this time not avoiding stale nodes. // excludedNodes contains the initial excludedNodes and nodes that were
// not chosen because they were stale, decommissioned, etc.
// We need to additionally exclude the nodes that were added to the
// result list in the successful calls to choose*() above.
for (Node node : results) {
oldExcludedNodes.put(node, node);
}
// Set numOfReplicas, since it can get out of sync with the result list
// if the NotEnoughReplicasException was thrown in chooseRandom().
numOfReplicas = totalReplicasExpected - results.size();
return chooseTarget(numOfReplicas, writer, oldExcludedNodes, blocksize,
maxNodesPerRack, results, false);
}
}
return writer;
}

  下面依次分析这3个DN的选择过程。

1、选择本地DN:chooseLocalNode()

  /* choose <i>localMachine</i> as the target.
* if <i>localMachine</i> is not available,
* choose a node on the same rack
* @return the chosen node
*/
protected DatanodeDescriptor chooseLocalNode(DatanodeDescriptor localMachine,
HashMap<Node, Node> excludedNodes, long blocksize, int maxNodesPerRack,
List<DatanodeDescriptor> results, boolean avoidStaleNodes)
throws NotEnoughReplicasException {
// if no local machine, randomly choose one node
if (localMachine == null) //client端上没有DN
//从整个集群中随机选择一个DN作为本地DN
return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes); // otherwise try local machine first
Node oldNode = excludedNodes.put(localMachine, localMachine);
if (oldNode == null) { // was not in the excluded list
//该client端的DN还没有被选中时,判断这个DN是否负载过重
if (isGoodTarget(localMachine, blocksize, maxNodesPerRack, false,
results, avoidStaleNodes)) {
results.add(localMachine);
// add localMachine and related nodes to excludedNode
addToExcludedNodes(localMachine, excludedNodes);
return localMachine;
}
} // try a node on local rack
//选择与该client同rack的DN
return chooseLocalRack(localMachine, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes);
}

  本地DN的选择分三步:

  1.1)如果client上没有DN,则从整个集群中随机选择一个DN(chooseRandom()方法),并判断是否该DN是否负载过重(步骤如1.2);如果负载过重则重新随机选择一个。以此类推.....

  1.2)如果该client有DN,则判断该DN是否负载过重(isGoodTarget()方法),步骤如下:结点是否可用、结点是否在“stale”状态、结点容量是否足够、结点流量情况、该节点所在的机架中存放当前数据的DN是否过多;

  1.3)如果前两个条件都不满足,则选择与client同rack的DN(chooseLocalRack()方法)作为本地结点,步骤如下:

  a)随机选择一个与client同rack的DN(步骤同1.1);

  b)否则从整个集群中随机选择一个DN(步骤同1.1)。

  这两步需要解释一下,他们的步骤与1.1都是相同的,那么怎么会得出不同的结果。原因在于传给chooseRandom()方法的第一个参数。如果参数是“NodeBase.ROOT”,实质上就是"/",表示的是整个集群;如果是“localMachine.getNetworkLocation()”,则表示localMachine所在的rack。这样,通过第一个参数就可以表示要进行选择的范围。在NetworkTopology接口中定义了DN与rack的关系,机架感知也是借此来实现。

2、选择远程rack上的DN:chooseRemoteRack()

 /* choose <i>numOfReplicas</i> nodes from the racks
* that <i>localMachine</i> is NOT on.
* if not enough nodes are available, choose the remaining ones
* from the local rack
*/
protected void chooseRemoteRack(int numOfReplicas,
DatanodeDescriptor localMachine,
HashMap<Node, Node> excludedNodes,
long blocksize,
int maxReplicasPerRack,
List<DatanodeDescriptor> results,
boolean avoidStaleNodes)
throws NotEnoughReplicasException {
int oldNumOfReplicas = results.size();
// randomly choose one node from remote racks
try {
//选择与localMachine不在同一个rack上的DN
chooseRandom(numOfReplicas, "~" + localMachine.getNetworkLocation(),
excludedNodes, blocksize, maxReplicasPerRack, results,
avoidStaleNodes);
} catch (NotEnoughReplicasException e) {
//选择与localMachine在同一个rack上的DN
chooseRandom(numOfReplicas-(results.size()-oldNumOfReplicas),
localMachine.getNetworkLocation(), excludedNodes, blocksize,
maxReplicasPerRack, results, avoidStaleNodes);
}
}

  远程DN的选择分两步:

  2.1)从非本地rack上选择一个DN(步骤同1.1);

  2.2)否则从本地rack上选择一个DN(步骤同1.1);

  同样,这两步还是复用了chooseRandom()方法。2.1)的参数为"~" + localMachine.getNetworkLocation(),即在集群中除了localMachine所在的rack中选择一个DN(“~”表示排除);2.2)的参数为“localMachine.getNetworkLocation()”,表示从localMachine所在的rack中选择一个DN。这里很重要,可以看到,选择的第二个DN与第一个DN并不一定就在不同的rack。

3、选择第3个DN

  代码在上面第二段代码分析的第37~50行中,具体步骤如下:  

  3.1)如果前两个DN在同一个rack上,则选择一个与他们不在同一个rack上的DN,同步骤2;

  3.2)否则,如果newBlock为true,则选择与第二个DN同rack的DN,步骤同1.3;

  3.3)否则,选择与第一个DN同rack的DN,步骤同1.3;

4、 从整个集群中选择剩余副本个数的DN,步骤同1.1。(代码在上面第二段代码分析的第56行)

  最后返回到上面第一段代码的最后部分,将这些选中的DN组织成pipeline。

  通过上面的分析也就明白一个问题:网上经常会看到,有人说第三个DN是与第二个DN是同rack的,也有人说第三个DN是与第一个DN同rack的。那么到底哪个说法对呢?关键就看第二个DN的选择,我在上面写了,第二个DN可能是与第一个DN不在同一个rack,但也可能在同一个rack中,具体要根据当时集群中的情况来分析。所以不能简单的认死理。

  本文基于hadoop1.2.1

  如有错误,还请指正

  参考文章:http://blog.****.net/xhh198781/article/details/7109764

  转载请注明出处:http://www.cnblogs.com/gwgyk/p/4137060.html