java多线程爬虫实现

时间:2022-10-31 09:05:12

    先上做的结果吧:

   

  1. 开始爬虫.........................................  
  2. 当前有1个线程在等待  
  3. 当前有2个线程在等待  
  4. 当前有3个线程在等待  
  5. 当前有4个线程在等待  
  6. 当前有5个线程在等待  
  7. .....................  
  1. 爬网页http://dev.yesky.com成功,深度为2 是由线程thread-9来爬  
  2. 当前有7个线程在等待  
  3. 爬网页http://www.cnblogs.com/rexyoung/archive/2012/05/01/2477960.html成功,深度为2 是由线程thread-2来爬  
  4. 当前有8个线程在等待  
  5. 爬网页http://www.hjenglish.com 成功,深度为2 是由线程thread-0来爬  
  6. 当前有9个线程在等待  
  7. 爬网页http://www.cnblogs.com/snandy/archive/2012/05/01/2476675.html成功,深度为2 是由线程thread-5来爬  
  8. 当前有10个线程在等待  
  9. 总共爬了159个网页  
  10. 总共耗时53秒  

上面是爬博客园的主页,只爬了两级深度,10个线程,总共耗时53秒,应该速度还算不错的,下面是所有的代码:

  1. public class WebCrawler {  
  2.     ArrayList<String> allurlSet = new ArrayList<String>();//所有的网页url,需要更高效的去重可以考虑HashSet  
  3.     ArrayList<String> notCrawlurlSet = new ArrayList<String>();//未爬过的网页url  
  4.     HashMap<String, Integer> depth = new HashMap<String, Integer>();//所有网页的url深度  
  5.     int crawDepth  = 2//爬虫深度  
  6.     int threadCount = 10//线程数量  
  7.     int count = 0//表示有多少个线程处于wait状态  
  8.     public static final Object signal = new Object();   //线程间通信变量  
  9.       
  10.     public static void main(String[] args) {  
  11.         final WebCrawler wc = new WebCrawler();  
  12. //      wc.addUrl("http://www.126.com", 1);  
  13.         wc.addUrl("http://www.cnblogs.com"1);  
  14.         long start= System.currentTimeMillis();  
  15.         System.out.println("开始爬虫.........................................");  
  16.         wc.begin();  
  17.           
  18.         while(true){  
  19.             if(wc.notCrawlurlSet.isEmpty()&& Thread.activeCount() == 1||wc.count==wc.threadCount){  
  20.                 long end = System.currentTimeMillis();  
  21.                 System.out.println("总共爬了"+wc.allurlSet.size()+"个网页");  
  22.                 System.out.println("总共耗时"+(end-start)/1000+"秒");  
  23.                 System.exit(1);  
  24. //              break;  
  25.             }  
  26.               
  27.         }  
  28.     }  
  29.     private void begin() {  
  30.         for(int i=0;i<threadCount;i++){  
  31.             new Thread(new Runnable(){  
  32.                 public void run() {  
  33. //                  System.out.println("当前进入"+Thread.currentThread().getName());  
  34. //                  while(!notCrawlurlSet.isEmpty()){ ----------------------------------(1)  
  35. //                      String tmp = getAUrl();  
  36. //                      crawler(tmp);  
  37. //                  }  
  38.                     while (true) {   
  39. //                      System.out.println("当前进入"+Thread.currentThread().getName());  
  40.                         String tmp = getAUrl();  
  41.                         if(tmp!=null){  
  42.                             crawler(tmp);  
  43.                         }else{  
  44.                             synchronized(signal) {  //------------------(2)  
  45.                                 try {  
  46.                                     count++;  
  47.                                     System.out.println("当前有"+count+"个线程在等待");  
  48.                                     signal.wait();  
  49.                                 } catch (InterruptedException e) {  
  50.                                     // TODO Auto-generated catch block  
  51.                                     e.printStackTrace();  
  52.                                 }  
  53.                             }  
  54.                               
  55.                               
  56.                         }  
  57.                     }  
  58.                 }  
  59.             },"thread-"+i).start();  
  60.         }  
  61.     }  
  62.     public synchronized  String getAUrl() {  
  63.         if(notCrawlurlSet.isEmpty())  
  64.             return null;  
  65.         String tmpAUrl;  
  66. //      synchronized(notCrawlurlSet){  
  67.             tmpAUrl= notCrawlurlSet.get(0);  
  68.             notCrawlurlSet.remove(0);  
  69. //      }  
  70.         return tmpAUrl;  
  71.     }  
  72. //  public synchronized  boolean isEmpty() {  
  73. //      boolean f = notCrawlurlSet.isEmpty();  
  74. //      return f;  
  75. //  }  
  76.       
  77.     public synchronized void  addUrl(String url,int d){  
  78.             notCrawlurlSet.add(url);  
  79.             allurlSet.add(url);  
  80.             depth.put(url, d);  
  81.     }  
  82.       
  83.     //爬网页sUrl  
  84.     public  void crawler(String sUrl){  
  85.         URL url;  
  86.         try {  
  87.                 url = new URL(sUrl);  
  88. //              HttpURLConnection urlconnection = (HttpURLConnection)url.openConnection();   
  89.                 URLConnection urlconnection = url.openConnection();  
  90.                 urlconnection.addRequestProperty("User-Agent""Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.0)");  
  91.                 InputStream is = url.openStream();  
  92.                 BufferedReader bReader = new BufferedReader(new InputStreamReader(is));  
  93.                 StringBuffer sb = new StringBuffer();//sb为爬到的网页内容  
  94.                 String rLine = null;  
  95.                 while((rLine=bReader.readLine())!=null){  
  96.                     sb.append(rLine);  
  97.                     sb.append("/r/n");  
  98.                 }  
  99.                   
  100.                 int d = depth.get(sUrl);  
  101.                 System.out.println("爬网页"+sUrl+"成功,深度为"+d+" 是由线程"+Thread.currentThread().getName()+"来爬");  
  102.                 if(d<crawDepth){  
  103.                     //解析网页内容,从中提取链接  
  104.                     parseContext(sb.toString(),d+1);  
  105.                 }  
  106. //              System.out.println(sb.toString());  
  107.   
  108.               
  109.         } catch (IOException e) {  
  110. //          crawlurlSet.add(sUrl);  
  111. //          notCrawlurlSet.remove(sUrl);  
  112.             e.printStackTrace();  
  113.         }  
  114.     }  
  115.   
  116.     //从context提取url地址  
  117.     public  void parseContext(String context,int dep) {  
  118.         String regex = "<a href.*?/a>";  
  119. //      String regex = "<title>.*?</title>";  
  120.         String s = "fdfd<title>我 是</title><a href=\"http://www.iteye.com/blogs/tag/Google\">Google</a>fdfd<>";  
  121.         // String regex ="http://.*?>";  
  122.         Pattern pt = Pattern.compile(regex);  
  123.         Matcher mt = pt.matcher(context);  
  124.         while (mt.find()) {  
  125. //          System.out.println(mt.group());  
  126.             Matcher myurl = Pattern.compile("href=\".*?\"").matcher(  
  127.                     mt.group());  
  128.             while(myurl.find()){  
  129.                 String str = myurl.group().replaceAll("href=\"|\"""");  
  130. //              System.out.println("网址是:"+ str);  
  131.                 if(str.contains("http:")){ //取出一些不是url的地址  
  132.                     if(!allurlSet.contains(str)){  
  133.                         addUrl(str, dep);//加入一个新的url  
  134.                         if(count>0){ //如果有等待的线程,则唤醒  
  135.                             synchronized(signal) {  //---------------------(2)  
  136.                                 count--;  
  137.                                 signal.notify();  
  138.                             }  
  139.                         }  
  140.                           
  141.                     }  
  142.                 }  
  143.             }  
  144.         }  
  145.     }  
  146. }     

在上面(1)(2)两个地方卡了很久,两个地方其实是一个知识点,都是多线程的知识:

一开始用了

  1. //                  while(!notCrawlurlSet.isEmpty()){ ----------------------------------(1)  
  2. //                      String tmp = getAUrl();  
  3. //                      crawler(tmp);  
  4. //                  }  

一进入线程就判断notCrawlurlSet为不为空,但是是多线程的,一开始notCrawlurlSet不为空,所以所有的线程都进入了循环,尽管getAul()方法我设置了synchronized,但是一旦一个线程从getAurl()方法出来,另外一个线程就会进去,看一开始的getAurl方法的代码:

  1.     public synchronized  String getAUrl() {  
  2.         String tmpAUrl;  
  3. //      synchronized(notCrawlurlSet){  
  4.             tmpAUrl= notCrawlurlSet.get(0);  
  5.             notCrawlurlSet.remove(0);  
  6. //      }  
  7.         return tmpAUrl;  
  8.     }  
每一次都会删除一个notCrawlurlSet数组里面的元素,导致第一个线程执行完getAUrl方法时,且notCrawlurlSet恰好为空的时候,另外一个线程进入就会报错,因为notCrawlUrlSet没有元素,get(0)会报错。后来把getAUrl函数改成:

  1.     public synchronized  String getAUrl() {  
  2.         if(notCrawlurlSet.isEmpty())  
  3.             return null;  
  4.         String tmpAUrl;  
  5. //      synchronized(notCrawlurlSet){  
  6.             tmpAUrl= notCrawlurlSet.get(0);  
  7.             notCrawlurlSet.remove(0);  
  8. //      }  
  9.         return tmpAUrl;  
  10.     }  

在线程的run函数改成:

  1. while (true) {   
  2.             System.out.println("当前进入"+Thread.currentThread().getName());  
  3.     String tmp = getAUrl();  
  4.     if(tmp!=null){  
  5.         crawler(tmp);  
  6.     }else{  
  7.         synchronized(signal) {  
  8.             try {  
  9.                 count++;  
  10.                 System.out.println("当前有"+count+"个线程在等待");  
  11.                 signal.wait();  
  12.             } catch (InterruptedException e) {  
  13.                 // TODO Auto-generated catch block  
  14.                 e.printStackTrace();  
  15.             }  
  16.         }  
  17.           
  18.           
  19.     }  
  20. }  

即线程进入后就调用getAUrl函数,从notCrawlurlSet数组取url,如果没有取到,则用signal来让此线程等待,但是在哪里唤醒呢?肯定在notCrawlurlSet有元素的时候唤醒,即notCrawlurlSet不能空的时候,这其中有个很重要的变量count,它表示正在等待的线程个数,只有count大于0才会唤醒线程,即只有有线程在等待的时候才会调用signal.notify(); 此段实现在parseContext函数里面:
  1. if(str.contains("http:")){ //取出一些不是url的地址  
  2.     if(!allurlSet.contains(str)){  
  3.         addUrl(str, dep);//加入一个新的url  
  4.         if(count>0){ //如果有等待的线程,则唤醒  
  5.             synchronized(signal) {  
  6.                 count--;  
  7.                 signal.notify();  
  8.             }  
  9.         }  
  10.     }  
  11. }  

这个count变量还解决了我一个问题,当所有的线程启动后,也正确的爬取网页了,但是不知道怎么结束这些线程,因为线程都是永久循环的,有了count变量,就知道有多少线程在等待,当等待的线程等于threadCount的时候,就表示已经爬完了,因为所有线程都在等待了,不会往notCrawlurlSet添加新的url了,此时已经爬完了指定深度的所有网页。

写下自己的一点感悟,明白原理是一回事,有时候实现起来也挺费神的。

代码几度修改,还有待完善的地方及我的思路:

1:爬取的网页要存起来,该怎么存放也是一个问题,目录怎么生成?网页自动分类?等等,分类可以用考虑贝叶斯分类器,分好类之后安装类别来存储。

2:网页去重问题,如果url太多,内存装不下去怎么办?考虑先压缩,比如MD5压缩,同时MD5又能得到hash值,最简单的是hash去重,或者可以考虑用bloom filter去重,还有一种方法是考虑用key-value数据库来实现去重,不过我对key-value数据库不是很了解,应该类似hash,但是效率的问题数据库已经帮你解决了。

3:url不同的网页也可能内容一样,怎么判断网页相似度问题。网页相似度可以先提取网页正文,方法有行块函数法,提取正文后再可以用向量余弦法来计算相似度。

4:增量抓取的问题,一个网页抓取之后,什么时候再重新来抓?可以针对具体的网页的更新频率来解决这个问题,如新浪首页的新闻可能更新快一些,重新来爬的频率会更快一点。

 很早就知道爬虫的原理,但是一直没有去实现过,今天写起来还真遇到很多困难,尤其是多线程同步的问题。还是自己对多线程不熟,没有大量实践过的原因。

    先上我做的结果吧:

   

  1. 开始爬虫.........................................  
  2. 当前有1个线程在等待  
  3. 当前有2个线程在等待  
  4. 当前有3个线程在等待  
  5. 当前有4个线程在等待  
  6. 当前有5个线程在等待  
  7. .....................  
  1. 爬网页http://dev.yesky.com成功,深度为2 是由线程thread-9来爬  
  2. 当前有7个线程在等待  
  3. 爬网页http://www.cnblogs.com/rexyoung/archive/2012/05/01/2477960.html成功,深度为2 是由线程thread-2来爬  
  4. 当前有8个线程在等待  
  5. 爬网页http://www.hjenglish.com 成功,深度为2 是由线程thread-0来爬  
  6. 当前有9个线程在等待  
  7. 爬网页http://www.cnblogs.com/snandy/archive/2012/05/01/2476675.html成功,深度为2 是由线程thread-5来爬  
  8. 当前有10个线程在等待  
  9. 总共爬了159个网页  
  10. 总共耗时53秒  

上面是爬博客园的主页,只爬了两级深度,10个线程,总共耗时53秒,应该速度还算不错的,下面是所有的代码:

  1. public class WebCrawler {  
  2.     ArrayList<String> allurlSet = new ArrayList<String>();//所有的网页url,需要更高效的去重可以考虑HashSet  
  3.     ArrayList<String> notCrawlurlSet = new ArrayList<String>();//未爬过的网页url  
  4.     HashMap<String, Integer> depth = new HashMap<String, Integer>();//所有网页的url深度  
  5.     int crawDepth  = 2//爬虫深度  
  6.     int threadCount = 10//线程数量  
  7.     int count = 0//表示有多少个线程处于wait状态  
  8.     public static final Object signal = new Object();   //线程间通信变量  
  9.       
  10.     public static void main(String[] args) {  
  11.         final WebCrawler wc = new WebCrawler();  
  12. //      wc.addUrl("http://www.126.com", 1);  
  13.         wc.addUrl("http://www.cnblogs.com"1);  
  14.         long start= System.currentTimeMillis();  
  15.         System.out.println("开始爬虫.........................................");  
  16.         wc.begin();  
  17.           
  18.         while(true){  
  19.             if(wc.notCrawlurlSet.isEmpty()&& Thread.activeCount() == 1||wc.count==wc.threadCount){  
  20.                 long end = System.currentTimeMillis();  
  21.                 System.out.println("总共爬了"+wc.allurlSet.size()+"个网页");  
  22.                 System.out.println("总共耗时"+(end-start)/1000+"秒");  
  23.                 System.exit(1);  
  24. //              break;  
  25.             }  
  26.               
  27.         }  
  28.     }  
  29.     private void begin() {  
  30.         for(int i=0;i<threadCount;i++){  
  31.             new Thread(new Runnable(){  
  32.                 public void run() {  
  33. //                  System.out.println("当前进入"+Thread.currentThread().getName());  
  34. //                  while(!notCrawlurlSet.isEmpty()){ ----------------------------------(1)  
  35. //                      String tmp = getAUrl();  
  36. //                      crawler(tmp);  
  37. //                  }  
  38.                     while (true) {   
  39. //                      System.out.println("当前进入"+Thread.currentThread().getName());  
  40.                         String tmp = getAUrl();  
  41.                         if(tmp!=null){  
  42.                             crawler(tmp);  
  43.                         }else{  
  44.                             synchronized(signal) {  //------------------(2)  
  45.                                 try {  
  46.                                     count++;  
  47.                                     System.out.println("当前有"+count+"个线程在等待");  
  48.                                     signal.wait();  
  49.                                 } catch (InterruptedException e) {  
  50.                                     // TODO Auto-generated catch block  
  51.                                     e.printStackTrace();  
  52.                                 }  
  53.                             }  
  54.                               
  55.                               
  56.                         }  
  57.                     }  
  58.                 }  
  59.             },"thread-"+i).start();  
  60.         }  
  61.     }  
  62.     public synchronized  String getAUrl() {  
  63.         if(notCrawlurlSet.isEmpty())  
  64.             return null;  
  65.         String tmpAUrl;  
  66. //      synchronized(notCrawlurlSet){  
  67.             tmpAUrl= notCrawlurlSet.get(0);  
  68.             notCrawlurlSet.remove(0);  
  69. //      }  
  70.         return tmpAUrl;  
  71.     }  
  72. //  public synchronized  boolean isEmpty() {  
  73. //      boolean f = notCrawlurlSet.isEmpty();  
  74. //      return f;  
  75. //  }  
  76.       
  77.     public synchronized void  addUrl(String url,int d){  
  78.             notCrawlurlSet.add(url);  
  79.             allurlSet.add(url);  
  80.             depth.put(url, d);  
  81.     }  
  82.       
  83.     //爬网页sUrl  
  84.     public  void crawler(String sUrl){  
  85.         URL url;  
  86.         try {  
  87.                 url = new URL(sUrl);  
  88. //              HttpURLConnection urlconnection = (HttpURLConnection)url.openConnection();   
  89.                 URLConnection urlconnection = url.openConnection();  
  90.                 urlconnection.addRequestProperty("User-Agent""Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.0)");  
  91.                 InputStream is = url.openStream();  
  92.                 BufferedReader bReader = new BufferedReader(new InputStreamReader(is));  
  93.                 StringBuffer sb = new StringBuffer();//sb为爬到的网页内容  
  94.                 String rLine = null;  
  95.                 while((rLine=bReader.readLine())!=null){  
  96.                     sb.append(rLine);  
  97.                     sb.append("/r/n");  
  98.                 }  
  99.                   
  100.                 int d = depth.get(sUrl);  
  101.                 System.out.println("爬网页"+sUrl+"成功,深度为"+d+" 是由线程"+Thread.currentThread().getName()+"来爬");  
  102.                 if(d<crawDepth){  
  103.                     //解析网页内容,从中提取链接  
  104.                     parseContext(sb.toString(),d+1);  
  105.                 }  
  106. //              System.out.println(sb.toString());  
  107.   
  108.               
  109.         } catch (IOException e) {  
  110. //          crawlurlSet.add(sUrl);  
  111. //          notCrawlurlSet.remove(sUrl);  
  112.             e.printStackTrace();  
  113.         }  
  114.     }  
  115.   
  116.     //从context提取url地址  
  117.     public  void parseContext(String context,int dep) {  
  118.         String regex = "<a href.*?/a>";  
  119. //      String regex = "<title>.*?</title>";  
  120.         String s = "fdfd<title>我 是</title><a href=\"http://www.iteye.com/blogs/tag/Google\">Google</a>fdfd<>";  
  121.         // String regex ="http://.*?>";  
  122.         Pattern pt = Pattern.compile(regex);  
  123.         Matcher mt = pt.matcher(context);  
  124.         while (mt.find()) {  
  125. //          System.out.println(mt.group());  
  126.             Matcher myurl = Pattern.compile("href=\".*?\"").matcher(  
  127.                     mt.group());  
  128.             while(myurl.find()){  
  129.                 String str = myurl.group().replaceAll("href=\"|\"""");  
  130. //              System.out.println("网址是:"+ str);  
  131.                 if(str.contains("http:")){ //取出一些不是url的地址  
  132.                     if(!allurlSet.contains(str)){  
  133.                         addUrl(str, dep);//加入一个新的url  
  134.                         if(count>0){ //如果有等待的线程,则唤醒  
  135.                             synchronized(signal) {  //---------------------(2)  
  136.                                 count--;  
  137.                                 signal.notify();  
  138.                             }  
  139.                         }  
  140.                           
  141.                     }  
  142.                 }  
  143.             }  
  144.         }  
  145.     }  
  146. }     

在上面(1)(2)两个地方卡了很久,两个地方其实是一个知识点,都是多线程的知识:

一开始用了

  1. //                  while(!notCrawlurlSet.isEmpty()){ ----------------------------------(1)  
  2. //                      String tmp = getAUrl();  
  3. //                      crawler(tmp);  
  4. //                  }  

一进入线程就判断notCrawlurlSet为不为空,但是是多线程的,一开始notCrawlurlSet不为空,所以所有的线程都进入了循环,尽管getAul()方法我设置了synchronized,但是一旦一个线程从getAurl()方法出来,另外一个线程就会进去,看一开始的getAurl方法的代码:

  1.     public synchronized  String getAUrl() {  
  2.         String tmpAUrl;  
  3. //      synchronized(notCrawlurlSet){  
  4.             tmpAUrl= notCrawlurlSet.get(0);  
  5.             notCrawlurlSet.remove(0);  
  6. //      }  
  7.         return tmpAUrl;  
  8.     }  
每一次都会删除一个notCrawlurlSet数组里面的元素,导致第一个线程执行完getAUrl方法时,且notCrawlurlSet恰好为空的时候,另外一个线程进入就会报错,因为notCrawlUrlSet没有元素,get(0)会报错。后来把getAUrl函数改成:

  1.     public synchronized  String getAUrl() {  
  2.         if(notCrawlurlSet.isEmpty())  
  3.             return null;  
  4.         String tmpAUrl;  
  5. //      synchronized(notCrawlurlSet){  
  6.             tmpAUrl= notCrawlurlSet.get(0);  
  7.             notCrawlurlSet.remove(0);  
  8. //      }  
  9.         return tmpAUrl;  
  10.     }  

在线程的run函数改成:

  1. while (true) {   
  2.             System.out.println("当前进入"+Thread.currentThread().getName());  
  3.     String tmp = getAUrl();  
  4.     if(tmp!=null){  
  5.         crawler(tmp);  
  6.     }else{  
  7.         synchronized(signal) {  
  8.             try {  
  9.                 count++;  
  10.                 System.out.println("当前有"+count+"个线程在等待");  
  11.                 signal.wait();  
  12.             } catch (InterruptedException e) {  
  13.                 // TODO Auto-generated catch block  
  14.                 e.printStackTrace();  
  15.             }  
  16.         }  
  17.           
  18.           
  19.     }  
  20. }  

即线程进入后就调用getAUrl函数,从notCrawlurlSet数组取url,如果没有取到,则用signal来让此线程等待,但是在哪里唤醒呢?肯定在notCrawlurlSet有元素的时候唤醒,即notCrawlurlSet不能空的时候,这其中有个很重要的变量count,它表示正在等待的线程个数,只有count大于0才会唤醒线程,即只有有线程在等待的时候才会调用signal.notify(); 此段实现在parseContext函数里面:
  1. if(str.contains("http:")){ //取出一些不是url的地址  
  2.     if(!allurlSet.contains(str)){  
  3.         addUrl(str, dep);//加入一个新的url  
  4.         if(count>0){ //如果有等待的线程,则唤醒  
  5.             synchronized(signal) {  
  6.                 count--;  
  7.                 signal.notify();  
  8.             }  
  9.         }  
  10.     }  
  11. }  

这个count变量还解决了我一个问题,当所有的线程启动后,也正确的爬取网页了,但是不知道怎么结束这些线程,因为线程都是永久循环的,有了count变量,就知道有多少线程在等待,当等待的线程等于threadCount的时候,就表示已经爬完了,因为所有线程都在等待了,不会往notCrawlurlSet添加新的url了,此时已经爬完了指定深度的所有网页。

写下自己的一点感悟,明白原理是一回事,有时候实现起来也挺费神的。

代码几度修改,还有待完善的地方及我的思路:

1:爬取的网页要存起来,该怎么存放也是一个问题,目录怎么生成?网页自动分类?等等,分类可以用考虑贝叶斯分类器,分好类之后安装类别来存储。

2:网页去重问题,如果url太多,内存装不下去怎么办?考虑先压缩,比如MD5压缩,同时MD5又能得到hash值,最简单的是hash去重,或者可以考虑用bloom filter去重,还有一种方法是考虑用key-value数据库来实现去重,不过我对key-value数据库不是很了解,应该类似hash,但是效率的问题数据库已经帮你解决了。

3:url不同的网页也可能内容一样,怎么判断网页相似度问题。网页相似度可以先提取网页正文,方法有行块函数法,提取正文后再可以用向量余弦法来计算相似度。

4:增量抓取的问题,一个网页抓取之后,什么时候再重新来抓?可以针对具体的网页的更新频率来解决这个问题,如新浪首页的新闻可能更新快一些,重新来爬的频率会更快一点。