NameNode内存优化---基于缓存相同文件名的方法

时间:2024-09-10 19:34:20
NameNode内存优化---重用相同的文件名     
原创文章,转载请注明:博客园aprogramer
     众所周知,Hadoop集群的NameNode的内存会随着文件(目录)数和block数的增长而增长。当集群的规模比较大时,NameNode的内存使用量就会越来越大,发生FullGC时需要的时间也会越来越长,严重影响集群的可用性。当文件数很多时,文件名会有很多相同的,如果重用这些相同的文件名,就会减少NameNode内存的使用量。

1. Hadoop中有多少文件的文件名是重复的?

     首先简单回顾一下NameNode是如何在内存中保存目录树的。在NameNode中文件用INodeFile类表示,而INodeFile继承自抽象类INode。而INode类有一个byte[] 类型的成员name,表示文件名。也就是说每个文件都有一个byte数组的成员保存文件名,当有两个文件的文件名完全一样时,就会new两个完全一样的byte数组,显然这是比较浪费的,而我们的优化思路就是把文件名相同的文件的公用一个byte数组,不要小看这个优化,当集群中有很多文件时,文件同名的可能性越大,优化的效果也就越明显。下表是Yahoo生产集群的统计数据:
File names used > 100000 times 24
File names used between 10001 to 100000 times 467
File names used between 1001 to 10000 times 4335
File names used between 101 to 1000 times 40031
File names used between 10 to 100 times 403975
File names used between 2 to 9 times 606579
File names used between 1 times 4114531
Total file names 5169942

从上表能够看出有516万的文件中有大约100万的文件重复了,在这重复的100万中又有40多万的文件名被重复用了10次以上,所以说文件名相同的情况还是很频繁的。

2. 如果缓存文件名大概能节省多少内存?

    hadoop的oiv工具可以分析fsimage文件,计算出能够大约节省多少空间,算法如下:
    1. 创建一个HashMap,key是String类型表示文件名,value是Integer类型表示这个文件名被几个文件使用,用来统计每个文件名被使用的次数。
    2. 通过读fsimage文件遍历目录数中的每个Inode,如果是文件,并且hashmap中没有这个文件名,则用文件名作为key,1作为value存到hashmap中;如果    hashmap中没有则把hashmap中的对应项的value值加1;
    3. 计算节省的空间:假设文件名filename被使用了N次(N>1),那么节省的空间数为:
        (N-1)*(filename的长度+24),24是byte[]类型对象头部的overhead。 
    下面的是分析淘宝云梯NameNode image文件的结果:
bin/hadoop oiv -i  fsimage_0000000015589116819 -o ovils -p NameDistribution 

Total unique file names 241275868
21 names are used by 2620832 files between 100000-214114 times. Heap savings ~89107574 bytes.
634 names are used by 20434805 files between 10000-99999 times. Heap savings ~703887328 bytes.
7593 names are used by 19447534 files between 1000-9999 times. Heap savings ~734908710 bytes.
49057 names are used by 16697480 files between 100-999 times. Heap savings ~729947272 bytes.
399464 names are used by 7812965 files between 10-99 times. Heap savings ~381085471 bytes.
638085 names are used by 4261035 files between 5-9 times. Heap savings ~223499710 bytes.
628335 names are used by 2513340 files 4 times. Heap savings ~118476621 bytes.
2550503 names are used by 7651509 files 3 times. Heap savings ~319960524 bytes.
3698086 names are used by 7396172 files 2 times. Heap savings ~230703725 bytes. Total saved heap ~3531576935bytes.

通过分析fsimage,可以减少3531576935bytes=3.289g的内存空间,还是很可观的 

3. 代码实现

    下面的实现参考了Hadoop2.0版本,社区jira链接是HDFS-1110
因为要共用相同的文件名,而在Hadoop中文件名是用byte[]表示的,所以我们最终要缓存的是byte数组,又因为byte数组不适合作为HashMap的key,所以我们先对byte数组封装一下,重写了equals和hashCode方法。代码如下:
 public class ByteArray {
private int hash = 0; // 字节数组的hashcode
private final byte[] bytes; public ByteArray(byte[] bytes) {
this.bytes = bytes;
} public byte[] getBytes() {
return bytes;
} @Override
public int hashCode() {
if (hash == 0) {
hash = Arrays.hashCode(bytes);
}
return hash;
} @Override
public boolean equals(Object o) {
if (!(o instanceof ByteArray)) {
return false;
}
return Arrays.equals(bytes, ((ByteArray)o).bytes);
}
}

有了缓存项,还要有管理缓存项的缓存类NameCache,NameCache有两个重要的成员cache和transientMap,transientMap用来记录每个文件名被使用的次数,每次addFile时,先把文件名放入transientMap,当文件被重用的次数大于一个阀值时(dfs.namenode.name.cache.threshold配置项),文件名从transientMap 迁移到cache对象中。NameCache中最重要的方法是 K put(final K name),在该方法中,首先检查cache,如果cache中已经缓存了文件名,那么直接返回cache中的文件名;如果cache中没有那么先把文件名放入transientMap,文件的重用次数加1,如果加1后大于阀值,则执行void promote(final K name)方法,该方法中将transientMap中的对应项remove掉,然后放到cache中。该过程的时序图如下所示:

NameNode内存优化---基于缓存相同文件名的方法

查看大图:大图

需要注意的是HashMap相关方法的调用,并不是每次都调用所有的HashMap的方法。

1. 当cache中已经缓存了文件名,则直接返回,这个过程只调用了一次get方法;

2. 当cache中没有缓存文件名(判断过程调用一次get),并且transientMap中也没有这个文件名(判断过程再一次调用get),则向transientMap中put文件名和使用次数(调用一次put),共计2次get,1次put;

3. 当cache中没有缓存文件名(判断过程调用一次get),并且transientMap中也有这个文件名(判断过程再一次调用get)时,把transientMap中的对应文件名的的重用次数加1,如果文件名重用次数小于阀值不大于阀值则返回,共计调用2次get;

4. 当cache中没有缓存文件名(判断过程调用一次get),并且transientMap中也有这个文件名(判断过程再一次调用get)时,把transientMap中的对应文件名的的重用次数加1,如果文件名重用次数大于等于阀值,则调用promotion,把transientMap中的对应项remove掉,并把该文件名put到cache中,此过程共计调用2次get,1次remove,1次put。

还需要注意的是boolean型成员initialized,代表NameNode是否加载完fsimage了,初值为false,当fsimage加载完并回放了editlog,会调用NameCache的initialized()方法,将其设置为true,代码如下:

 // <K> name to be added to the cache
class NameCache<K> {
/**
* Class for tracking use count of a name
*/
private class UseCount {
int count;
final K value; // Internal value for the name UseCount(final K value) {
count = 1;
this.value = value;
} void increment() {
count++;
} int get() {
return count;
}
} static final Log LOG = LogFactory.getLog(NameCache.class.getName()); /** indicates initialization is in progress */
private boolean initialized = false; /** names used more than {@code useThreshold} is added to the cache */
private final int useThreshold; /** of times a cache look up was successful */
private int lookups = 0; /** Cached names */
final HashMap<K, K> cache = new HashMap<K, K>(); /** Names and with number of occurrences tracked during initialization */
Map<K, UseCount> transientMap = new HashMap<K, UseCount>(); /**
* Constructor
* @param useThreshold names occurring more than this is promoted to the
* cache
*/
NameCache(int useThreshold) {
this.useThreshold = useThreshold;
} /**
* Add a given name to the cache or track use count.
* exist. If the name already exists, then the internal value is returned.
*
* @param name name to be looked up
* @return internal value for the name if found; otherwise null
*/
K put(final K name) {
K internal = cache.get(name);
if (internal != null) {
lookups++;
return internal;
} // Track the usage count only during initialization
if (!initialized) {
UseCount useCount = transientMap.get(name);
if (useCount != null) {
useCount.increment();
if (useCount.get() >= useThreshold) {
promote(name);
}
return useCount.value;
}
useCount = new UseCount(name);
transientMap.put(name, useCount);
}
return null;
} /**
* Lookup count when a lookup for a name returned cached object
* @return number of successful lookups
*/
int getLookupCount() {
return lookups;
} /**
* Size of the cache
* @return Number of names stored in the cache
*/
int size() {
return cache.size();
} /**
* Mark the name cache as initialized. The use count is no longer tracked
* and the transient map used for initializing the cache is discarded to
* save heap space.
*/
void initialized() {
LOG.info("initialized with " + size() + " entries " + lookups + " lookups");
this.initialized = true;
transientMap.clear();
transientMap = null;
} /** Promote a frequently used name to the cache */
private void promote(final K name) {
transientMap.remove(name);
cache.put(name, name);
lookups += useThreshold;
} public void reset() {
initialized = false;
cache.clear();
if (transientMap == null) {
transientMap = new HashMap<K, UseCount>();
} else {
transientMap.clear();
}
}
}

最后在FSDrectory中添加一个NameCache类型成员,在addNode和addToParent方法中调用cacheName方法,在cacheName方法中用byte[]类型的文件名构造ByteArray实例,并调用nameCache的put方法,代码如下:

 private <T extends INode> T addNode(String src, T child,
long childDiskspace)
throws QuotaExceededException, UnresolvedLinkException {
byte[][] components = INode.getPathComponents(src);
byte[] path = components[components.length-1];
child.setLocalName(path);
cacheName(child);
INode[] inodes = new INode[components.length];
writeLock();
try {
rootDir.getExistingPathINodes(components, inodes, false);
return addChild(inodes, inodes.length-1, child, childDiskspace);
} finally {
writeUnlock();
}
} INodeDirectory addToParent(byte[] src, INodeDirectory parentINode,
INode newNode, boolean propagateModTime) throws UnresolvedLinkException {
// NOTE: This does not update space counts for parents
INodeDirectory newParent = null;
writeLock();
try {
try {
newParent = rootDir.addToParent(src, newNode, parentINode,
propagateModTime);
cacheName(newNode);
} catch (FileNotFoundException e) {
return null;
}
if(newParent == null)
return null;
if(!newNode.isDirectory() && !newNode.isLink()) {
// Add file->block mapping
INodeFile newF = (INodeFile)newNode;
BlockInfo[] blocks = newF.getBlocks();
for (int i = 0; i < blocks.length; i++) {
newF.setBlock(i, getBlockManager().addBlockCollection(blocks[i], newF));
}
}
} finally {
writeUnlock();
}
return newParent;
} void cacheName(INode inode) {
// Name is cached only for files
if (inode.isDirectory() || inode.isLink()) {
return;
}
ByteArray name = new ByteArray(inode.getLocalNameBytes());
name = nameCache.put(name);
if (name != null) {
inode.setLocalName(name.getBytes());
}
}
}

4. 优化结果及分析

  优化前 优化后
fsimage加载时间 898秒  1102秒
Heap Used
115.252G
112.148G
     优化后能够减少3.1G的内存,但是Namenode的fsimage时间却延长了204s。虽然减少了内存使用量,但fsimage加载时间却增加了3分钟多,这也是很令人难以接受的。
    通过对比优化前后的代码,发现fsimage慢的原因就是因为在addNode和addToParent中调用了cacheName,而cacheName调用了NameCache的put方法,在put中调用了可能会调用HashMap的get和put或者remove方法,因为文件数量较多,所以HashMap的size比较大,再加上调用次数多,导致fsimage的加载时间变长。如果把调用NameCache.put()方法异步化,调用cacheName时把要缓存的文件的Inode放入一个List中,就立即返回,启动一个线程执行文件的缓存,这样使loadImage和缓存文件名同时进行。

5. 异步缓存

    创建FSDirectoryNameCache,该类有最重要的方法是cacheName(INode inode),如果fsimage还没加载完,则把Inode放入队列中,队列的size大于一个阀值时,创建一个CacheWorker,让cachingExecutor调度执行,cachingExecutor是size为1的FixedThreadPool,该过程的序列图如下图所示:
NameNode内存优化---基于缓存相同文件名的方法
查看大图:大图
   详细代码如下:
 public class FSDirectoryNameCache {
// buffer this many elements in temporary queue
private static final int MAX_QUEUE_SIZE = 10000; // actual cache
private final NameCache<ByteArray> nameCache;
private volatile boolean imageLoaded; // initial caching utils
private ExecutorService cachingExecutor;
private List<Future<Void>> cachingTasks;
private List<INode> cachingTempQueue; public FSDirectoryNameCache(int threshold) {
nameCache = new NameCache<ByteArray>(threshold);
imageLoaded = false; // executor for processing temporary queue (only 1 thread!!)
cachingExecutor = Executors.newFixedThreadPool(1);
cachingTempQueue = new ArrayList<INode>(MAX_QUEUE_SIZE);
cachingTasks = new ArrayList<Future<Void>>();
} /**
* Adds cached entry to the map and updates INode
*/
private void cacheNameInternal(INode inode) {
// Name is cached only for files
if (inode.isDirectory()) {
return;
}
ByteArray name = new ByteArray(inode.getLocalNameBytes());
name = nameCache.put(name);
if (name != null) {
inode.setLocalName(name.getBytes());
}
} void cacheName(INode inode) {
if (inode.isDirectory()) {
return;
}
if (this.imageLoaded) {
// direct caching
cacheNameInternal(inode);
return;
} // otherwise add it to temporary queue
cachingTempQueue.add(inode); // if queue is too large, submit a task
if (cachingTempQueue.size() >= MAX_QUEUE_SIZE) {
cachingTasks.add(cachingExecutor
.submit(new CacheWorker(cachingTempQueue)));
cachingTempQueue = new ArrayList<INode>(MAX_QUEUE_SIZE);
}
} /**
* Worker for processing a list of inodes.
*/
class CacheWorker implements Callable<Void> {
private final List<INode> inodesToProcess; CacheWorker(List<INode> inodes) {
this.inodesToProcess = inodes;
} @Override
public Void call() throws Exception {
for (INode inode : inodesToProcess) {
cacheNameInternal(inode);
}
return null;
}
} /**
* Inform that from now on all caching is done synchronously.
* Cache remaining inodes from the queue.
* @throws IOException
*/
void imageLoaded() throws IOException {
if(cachingTasks == null) {
return;
}
for (Future<Void> task : cachingTasks) {
try {
task.get();
} catch (InterruptedException e) {
throw new IOException("FSDirectory cache received interruption");
} catch (ExecutionException e) {
throw new IOException(e);
}
} // will not be used after startup
this.cachingTasks = null;
this.cachingExecutor.shutdownNow();
this.cachingExecutor = null; // process remaining inodes
for(INode inode : cachingTempQueue) {
cacheNameInternal(inode);
}
this.cachingTempQueue = null; this.imageLoaded = true;
} void initialized() {
this.nameCache.initialized();
} int size() {
return nameCache.size();
} int getLookupCount() {
return nameCache.getLookupCount();
} public void reset() {
nameCache.reset();
}
}

6. 异步实验结果及分析

加载淘宝云梯fsimage,测试结果如下所以:

  优化前 同步cache 异步cache
fsimage加载时间 898秒 1102秒 956秒
Heap Used  115.252G   112.148G  112.147G
    从实验结果上看异步cache和同步cache在内存使用上一致,都比优化前少了3G,但fsimage比同步cache少了146秒,比优化前只多了58秒,不到1分钟,优化效果明显。当然异步cache也有一个弱点,因为Inode先放入队列中,然后再交给size为1的FixedThreadPool处理,这样可能会造成任务堆积,生成大量的临时队列,造成内存碎片,严重时可能促发FullGC。