爬虫在工作过程中,会有大量的URL需要存储和分配,如何高效的管理这些URL,是一个爬虫系统的重中之重。
crawler4j默认运行最多每小时解析几千个URL,在修改过后可以达到每小时几十万个(后面的文章中介绍),这么多的URL,应该如何管理呢?
crawler4j使用嵌入式数据库Berkeley DB JE 进行URL的临时存储和分配管理,关于Berkeley DB JE ,我在另一篇文章里做了简单介绍:
海量简单数据不想用SQL?试试高效的嵌入式数据库Berkeley DB JE吧!
WebURL:
还是先从BasicCrawlController的main函数开始,看程序是如何添加入口URL的:
controller.addSeed("http://www.ics.uci.edu/");
controller.addSeed("http://www.ics.uci.edu/~lopes/");
controller.addSeed("http://www.ics.uci.edu/~welling/");
再看CrawlController的addSeed()方法:
public void addSeed(String pageUrl) {
addSeed(pageUrl, -1);
}
public void addSeed(String pageUrl, int docId) {
String canonicalUrl = URLCanonicalizer.getCanonicalURL(pageUrl);
if (canonicalUrl == null) {
logger.error("Invalid seed URL: " + pageUrl);
return;
}
if (docId < 0) {
docId = docIdServer.getDocId(canonicalUrl);
if (docId > 0) {
// This URL is already seen.
return;
}
docId = docIdServer.getNewDocID(canonicalUrl);
} else {
try {
docIdServer.addUrlAndDocId(canonicalUrl, docId);
} catch (Exception e) {
logger.error("Could not add seed: " + e.getMessage());
}
}
WebURL webUrl = new WebURL();
webUrl.setURL(canonicalUrl);
webUrl.setDocid(docId);
webUrl.setDepth((short) 0);
if (!robotstxtServer.allows(webUrl)) {
logger.info("Robots.txt does not allow this seed: " + pageUrl);
} else {
frontier.schedule(webUrl);
}
}
这里定义了一个WebURL作为URL的Model类,存储了一些 URL的属性:域、子域、路径、锚、URL地址,这些在调用setURL方法时就会被解析出来,setURL主要是字符串的截取,还用到了TLDList.getInstance().contains(domain),就是从域名列表文件tld-names.txt里查找判断URL里哪部分是域名,因为域名包括的部分可能不太一样,如.cn、.com.cn、.gov、.gov.cn;还有一些 爬虫属性:分配的ID、父URLID、父URL、深度、优先级,这些会在爬虫工作时指定,所谓父URL就是在哪个页面发现的该地址,深度是第几级被发现的,如入口URL是0,从入口URL页面发现的地址是1,从1发现的新的是2,依此类推,优先级高的(数字小的)会优先分配爬取。
DocIDServer:
addSeed里面setDocid是给URL分配一个惟一的ID,默认是从1开始自动增长:1 2 3 4 5... 虽然这里可以使用JAVA自带的集合类来管理和存储这些ID,但是为了确保惟一且保证在ID增长到了几十上百万时依然高效,crawler4j使用了前面说的BDB JE来存储,当然还有一个原因是为了可恢复,即系统挂了恢复后爬虫可以继续,但我并不打算讨论这种情况,因为在这种情况下,crawler4j的运行效率相当低!
用docIdServer.getDocId()来检查该URL是否已经存储,如果没有则docId = docIdServer.getNewDocID(canonicalUrl);获取新ID。看下docIdServer是怎么工作的,首先在CrawlController构造函数中初始化并传入Environment(关于Env,请参考文章开头BDB JE链接):
docIdServer = new DocIDServer(env, config);
DocIdServer类只负责管理URL的ID,构造函数:
public DocIDServer(Environment env, CrawlConfig config) throws DatabaseException {
super(config);
DatabaseConfig dbConfig = new DatabaseConfig();
dbConfig.setAllowCreate(true);
dbConfig.setTransactional(config.isResumableCrawling());
dbConfig.setDeferredWrite(!config.isResumableCrawling());
docIDsDB = env.openDatabase(null, "DocIDs", dbConfig);
if (config.isResumableCrawling()) {
int docCount = getDocCount();
if (docCount > 0) {
logger.info("Loaded " + docCount + " URLs that had been detected in previous crawl.");
lastDocID = docCount;
}
} else {
lastDocID = 0;
}
}
这里只是简单的创建了一个名叫DocIDs的DB(有关可恢复不做讨论,这里和下面涉及resumable都是false)。这个DB是以URL为key,以ID为value存储的,因为key的惟一性,可保证URL不重复,且更好的用URL来进行ID查询。
再看getDocId():
public int getDocId(String url) {
synchronized (mutex) {
if (docIDsDB == null) {
return -1;
}
OperationStatus result;
DatabaseEntry value = new DatabaseEntry();
try {
DatabaseEntry key = new DatabaseEntry(url.getBytes());
result = docIDsDB.get(null, key, value, null);
if (result == OperationStatus.SUCCESS && value.getData().length > 0) {
return Util.byteArray2Int(value.getData());
}
} catch (Exception e) {
e.printStackTrace();
}
return -1;
}
}
因为是多线程访问,所以这里用了synchronized (mutex) 保证线程安全。如果能从DB中查询出key是指定的URL的话,则返回相应的ID value,否则返回-1说明没有找到。
public int getNewDocID(String url) {
synchronized (mutex) {
try {
// Make sure that we have not already assigned a docid for this URL
int docid = getDocId(url);
if (docid > 0) {
return docid;
}
lastDocID++;
docIDsDB.put(null, new DatabaseEntry(url.getBytes()), new DatabaseEntry(Util.int2ByteArray(lastDocID)));
return lastDocID;
} catch (Exception e) {
e.printStackTrace();
}
return -1;
}
}
用getNewDocID生成新的ID并将它和URL存入DB。
addUrlAndDocId()是当你不想自动生成ID而想自己指定一个ID时使用,一般不建议用,除非是第二次使用并想用和之前一样的ID,但如果这样的话得先查出前一次的ID,效率不高,且真的没多大必要!
DocIDServer主要就这两个方法了,逻辑很简单,功能也很单一。
Frontier
回到addSeed方法,最后一句frontier.schedule(webUrl);将指定URL加入队列,只有加入队列之后爬虫线程才能对该URL进行解析。
Frontier有两个重要的新属性,一个是计数器Counters,另一个是URL队列WorkQueues:
protected WorkQueues workQueues = new WorkQueues(env, "PendingURLsDB", config.isResumableCrawling());
protected Counters counters = new Counters(env, config);
计数器Counters实现比较简单,用一个HashMap存储,目前只存储了两个值:已加入队列的URL数和已爬取完成的URL数。
URL队列WorkQueues保存当前已发现的但是又还没有分配给爬虫线程的WebURL,用BDB JE存储,创建了一个名为PendingURLsDB的数据库:
public WorkQueues(Environment env, String dbName, boolean resumable) throws DatabaseException {
this.env = env;
this.resumable = resumable;
DatabaseConfig dbConfig = new DatabaseConfig();
dbConfig.setAllowCreate(true);
dbConfig.setTransactional(resumable);
dbConfig.setDeferredWrite(!resumable);
urlsDB = env.openDatabase(null, dbName, dbConfig);
webURLBinding = new WebURLTupleBinding();
}
自定义了一个WebURLTupleBinding,可以在JE中保存WebURL的各个属性。如果你需要给WebURL添加一些属性,比如锚的标签名是a,img还是iframe,除了要在WebURL里面添加外,也需要修改WebURLTupleBinding,否则不会被存入DB,线程取出的时候该属性就会为空!
WorkQueues使用put, delete, get方法来实现增删查,以6位byte作为key,第一位是WebURL的priority属性,第二位是WebURL的深度属性,剩下4位是用WebURL的ID转换成byte;用WebURLTupleBinding中定义的内容作为value。因为数据库是以key为索引存储的,所以优先级高的即数字小的会排在前面,接着深度小的也会排在前面。
关于优先级,crawler4j有个小BUG,就是WebURL的priority属性默认就是最小0,这使得如果你想优先爬取某URL就不可能了,解决方法是在WebURL构造函数或setURL里为priority赋上默认值,至于赋什么值好,就看着办吧嘿嘿!
Frontier提供两个方法添加URL到队列:
public void scheduleAll(List<WebURL> urls) {
int maxPagesToFetch = config.getMaxPagesToFetch();
synchronized (mutex) {
int newScheduledPage = 0;
for (WebURL url : urls) {
if (maxPagesToFetch > 0 && (scheduledPages + newScheduledPage) >= maxPagesToFetch) {
break;
}
try {
workQueues.put(url);
newScheduledPage++;
} catch (DatabaseException e) {
logger.error("Error while puting the url in the work queue.");
}
}
if (newScheduledPage > 0) {
scheduledPages += newScheduledPage;
counters.increment(Counters.ReservedCounterNames.SCHEDULED_PAGES, newScheduledPage);
}
synchronized (waitingList) {
waitingList.notifyAll();
}
}
}
public void schedule(WebURL url) {
int maxPagesToFetch = config.getMaxPagesToFetch();
synchronized (mutex) {
try {
if (maxPagesToFetch < 0 || scheduledPages < maxPagesToFetch) {
workQueues.put(url);
scheduledPages++;
counters.increment(Counters.ReservedCounterNames.SCHEDULED_PAGES);
}
} catch (DatabaseException e) {
logger.error("Error while puting the url in the work queue.");
}
}
}
单个添加和批量添加,添加到队列的同时设置计数器,多个逻辑有各自的实现类,实现分离,Frontier负责组合这些逻辑,外部只需调用Fontier即可!Frontier还有一方法就是获取队列中的数据,一次可获取多条:
public void getNextURLs(int max, List<WebURL> result) {
while (true) {
synchronized (mutex) {
if (isFinished) {
return;
}
try {
List<WebURL> curResults = workQueues.get(max);
workQueues.delete(curResults.size());
if (inProcessPages != null) {
for (WebURL curPage : curResults) {
inProcessPages.put(curPage);
}
}
result.addAll(curResults);
} catch (DatabaseException e) {
logger.error("Error while getting next urls: " + e.getMessage());
e.printStackTrace();
}
if (result.size() > 0) {
return;
}
}
try {
synchronized (waitingList) {
waitingList.wait();
}
} catch (InterruptedException ignored) {
// Do nothing
}
if (isFinished) {
return;
}
}
}
爬虫线程每次调用这个方法领取50个URL,领取完就从队列删除,开始解析,解析完后重新调用领取。如果队列是空的,线程将会在这个方法里面 等待wait(),其它线程也会在synchronized处排队,直到scheduleAll方法被调用,线程才会重新被 激活notifyAll()。
以上就是crawler4j爬虫存储和分配URL的代码分析,涉及的类都被放在了edu.uci.ics.crawler4j.frontier包,该包还有一个类InProcessPagesDB是用来作可恢复爬取的,不做讨论。