python分布式抓取网页

时间:2023-12-15 10:54:26

呵呵,前两节好像和python没多大关系。。这节完全是贴代码,

这是我第一次写python,很多地方比较乱,主要就看看逻辑流程吧。

对于编码格式确实搞得我头大。。取下来页面不知道是什么编码,所以先找charset,然后转unicode。统一在unicode下操作,但是数据库是utf8的,WINDOWS的控制台又必须是gbk的,但是我IDE控制台必须是utf8的。。所以才会有DEBUG这个变量存在。。。主要是为了控制输出编码。

本程序连跑了24小时,然后分布式在10台机器上部署,长时间续航基本没有问题。

之后每天将进行10万次网页的爬取。

源码如下:

内容爬取及工具

  1. '''''
  2. Created on 2010-9-15
  3. @author: chenggong
  4. '''
  5. import urllib2
  6. import re
  7. import socket
  8. DEBUG = 0
  9. '''''
  10. 工具类
  11. '''
  12. class Tools():
  13. #log函数
  14. @staticmethod
  15. def writelog(level,info,notify=False):
  16. if DEBUG == 0:
  17. try:
  18. print "["+level+"]"+info.decode('UTF-8').encode('GBK')
  19. except:
  20. print "["+level+"]"+info.encode('GBK')
  21. else:
  22. print "["+level+"]"+info
  23. #if notify:
  24. #    print "[notify]报告管理员!!"
  25. #转unicode
  26. @staticmethod
  27. def toUnicode(s,charset):
  28. if( charset == "" ):
  29. return s
  30. else:
  31. try:
  32. u = unicode( s, charset )
  33. except:
  34. u = ""
  35. return u
  36. #正则抓取
  37. #@param single 是否只抓取一个
  38. @staticmethod
  39. def getFromPatten(patten,src,single=False):
  40. rst = "";
  41. p = re.compile(patten,re.S)
  42. all = p.findall(src)
  43. for matcher in all:
  44. rst += matcher + " "
  45. if( single ):
  46. break
  47. return rst.strip()
  48. '''''
  49. 网页内容爬虫
  50. '''
  51. class PageGripper():
  52. URL_OPEN_TIMEOUT = 10 #网页超时时间
  53. MAX_RETRY = 3 #最大重试次数
  54. def __init__(self):
  55. socket.setdefaulttimeout(self.URL_OPEN_TIMEOUT)
  56. #获取字符集
  57. def getCharset(self,s):
  58. rst = Tools.getFromPatten(u'charset=(.*?)"',s,True)
  59. if rst != "":
  60. if rst == "utf8":
  61. rst = "utf-8"
  62. return rst
  63. #尝试获取页面
  64. def downloadUrl(self,url):
  65. charset = ""
  66. page = ""
  67. retry = 0
  68. while True:
  69. try:
  70. fp = urllib2.urlopen(url)
  71. break
  72. except urllib2.HTTPError,e: #状态错误
  73. Tools.writelog('error','HTTP状态错误 code='+e.code)
  74. raise urllib2.HTTPError
  75. except urllib2.URLError,e: #网络错误超时
  76. Tools.writelog('warn','页面访问超时,重试..')
  77. retry+=1
  78. if( retry > self.MAX_RETRY ):
  79. Tools.writelog('warn','超过最大重试次数,放弃')
  80. raise urllib2.URLError
  81. while True:
  82. line = fp.readline()
  83. if charset == "":
  84. charset = self.getCharset(line)
  85. if not line:
  86. break
  87. page += Tools.toUnicode(line,charset)
  88. fp.close()
  89. return page
  90. #获取页面
  91. def getPageInfo(self,url):
  92. Tools.writelog( "info","开始抓取网页,url= "+url)
  93. info = ""
  94. try:
  95. info = self.downloadUrl(url)
  96. except:
  97. raise
  98. Tools.writelog("debug","网页抓取成功")
  99. return info
  100. '''''
  101. 内容提取类
  102. '''
  103. class InfoGripper():
  104. pageGripper = PageGripper()
  105. def __init__(self):
  106. Tools.writelog('debug',"爬虫启动")
  107. #抓取标题
  108. def griptitle(self,data):
  109. title = Tools.getFromPatten(u'box2t sp"><h3>(.*?)</h3>', data, True)
  110. if title == "":
  111. title = Tools.getFromPatten(u'<title>(.*?)[-<]',data,True)
  112. return title.strip()
  113. #抓取频道
  114. def gripchannel(self,data):
  115. zone = Tools.getFromPatten(u'频道:(.*?)</span>',data,True)
  116. channel = Tools.getFromPatten(u'<a.*?>(.*?)</a>',zone,True)
  117. return channel
  118. #抓取标签
  119. def griptag(self,data):
  120. zone = Tools.getFromPatten(u'标签:(.*?)</[^a].*>',data,True);
  121. rst = Tools.getFromPatten(u'>(.*?)</a>',zone,False);
  122. return rst
  123. #抓取观看次数
  124. def gripviews(self,data):
  125. rst = Tools.getFromPatten(u'已经有<em class="hot" id="viewcount">(.*?)</em>次观看',data);
  126. return rst
  127. #抓取发布时间
  128. def griptime(self,data):
  129. rst = Tools.getFromPatten(u'在<em>(.*?)</em>发布',data,True)
  130. return rst
  131. #抓取发布者
  132. def gripuser(self,data):
  133. rst = Tools.getFromPatten(u'title="点击进入(.*?)的用户空间"',data,True)
  134. return rst
  135. #获取页面字符集
  136. def getPageCharset(self,data):
  137. charset = Tools.getFromPatten(u'charset=(.*?)"',data,True)
  138. if( charset == "utf8" ):
  139. charset = "utf-8"
  140. return charset
  141. #获取CC相关数据
  142. def getCCData(self,data):
  143. zone = Tools.getFromPatten(u'SWFObject(.*?)</script>',data,True)
  144. #判断是否使用bokecc播放
  145. isFromBokeCC = re.match('.*bokecc.com.*', zone)
  146. if( not isFromBokeCC ):
  147. return "",""
  148. ccSiteId = Tools.getFromPatten(u'siteid=(.*?)[&,"]',zone,True)
  149. ccVid = Tools.getFromPatten(u'vid=(.*?)[&,"]',zone,True)
  150. return ccSiteId,ccVid
  151. #获取站内vid
  152. def gripVideoId(self,data):
  153. vid = Tools.getFromPatten(u'var vid = "(.*?)"',data,True)
  154. return vid
  155. #获取点击量
  156. def gripViewsAjax(self,vid,url,basedir):
  157. host = Tools.getFromPatten(u'http://(.*?)/',url,True)
  158. ajaxAddr = "http://" + host + basedir + "/index.php/ajax/video_statistic/" + vid
  159. '''''
  160. try:
  161. content = self.pageGripper.getPageInfo(ajaxAddr)
  162. except Exception,e:
  163. print e
  164. Tools.writelog ("error", ajaxAddr+u"抓取失败")
  165. return "error"
  166. '''
  167. Tools.writelog('debug', u"开始获取点击量,url="+ajaxAddr)
  168. while True:
  169. try:
  170. fp = urllib2.urlopen(ajaxAddr)
  171. break
  172. except urllib2.HTTPError,e: #状态错误
  173. Tools.writelog('error','HTTP状态错误 code='+"%d"%e.code)
  174. return ""
  175. except urllib2.URLError,e: #网络错误超时
  176. Tools.writelog('warn','页面访问超时,重试..')
  177. retry+=1
  178. if( retry > self.MAX_RETRY ):
  179. Tools.writelog('warn','超过最大重试次数,放弃')
  180. return ""
  181. content = fp.read()
  182. fp.close()
  183. views = Tools.getFromPatten(u'"viewcount":(.*?),',content,True)
  184. views = views.replace('"','')
  185. return views
  186. #从网页内容中爬取点击量
  187. def gripViewsFromData(self,data):
  188. views = Tools.getFromPatten(u'已经有<.*?>(.*?)<.*?>次观看',data,True)
  189. return views
  190. def gripBaseDir(self,data):
  191. dir = Tools.getFromPatten(u"base_dir = '(.*?)'",data,True)
  192. return dir
  193. #抓取数据
  194. def gripinfo(self,url):
  195. try:
  196. data = self.pageGripper.getPageInfo(url)
  197. except:
  198. Tools.writelog ("error", url+" 抓取失败")
  199. raise
  200. Tools.writelog('info','开始内容匹配')
  201. rst = {}
  202. rst['title'] = self.griptitle(data)
  203. rst['channel'] = self.gripchannel(data)
  204. rst['tag'] = self.griptag(data)
  205. rst['release'] = self.griptime(data)
  206. rst['user'] = self.gripuser(data)
  207. ccdata = self.getCCData(data)
  208. rst['ccsiteId'] = ccdata[0]
  209. rst['ccVid'] = ccdata[1]
  210. views = self.gripViewsFromData(data)
  211. if views =="" or not views:
  212. vid = self.gripVideoId(data)
  213. basedir = self.gripBaseDir(data)
  214. views = self.gripViewsAjax(vid,url,basedir)
  215. if( views == "" ):
  216. views = "error"
  217. if( views == "error"):
  218. Tools.writelog("error","获取观看次数失败")
  219. Tools.writelog("debug","点击量:"+views)
  220. rst['views'] = views
  221. Tools.writelog('debug','title=%s,channel=%s,tag=%s'%(rst['title'],rst['channel'],rst['tag']))
  222. return rst
  223. '''''
  224. 单元测试
  225. '''
  226. if __name__ == '__main__':
  227. list = [
  228. 'http://008yx.com/xbsp/index.php/video/index/3138',
  229. 'http://vblog.xwhb.com/index.php/video/index/4067',
  230. 'http://demo.ccvms.bokecc.com/index.php/video/index/3968',
  231. 'http://vlog.cnhubei.com/wuhan/20100912_56145.html',
  232. 'http://vlog.cnhubei.com/html/js/30271.html',
  233. 'http://www.ddvtv.com/index.php/video/index/15',
  234. 'http://boke.2500sz.com/index.php/video/index/60605',
  235. 'http://video.zgkqw.com/index.php/video/index/334',
  236. 'http://yule.hitmv.com/html/joke/27041.html',
  237. 'http://www.ddvtv.com/index.php/video/index/11',
  238. 'http://www.zgnyyy.com/index.php/video/index/700',
  239. 'http://www.kdianshi.com/index.php/video/index/5330',
  240. 'http://www.aoyatv.com/index.php/video/index/127',
  241. 'http://v.ourracing.com/html/channel2/64.html',
  242. 'http://v.zheye.net/index.php/video/index/93',
  243. 'http://vblog.thmz.com/index.php/video/index/7616',
  244. 'http://kdianshi.com/index.php/video/index/5330',
  245. 'http://tv.seeyoueveryday.com/index.php/video/index/95146',
  246. 'http://sp.zgyangzhi.com/html/ji/2.html',
  247. 'http://www.xjapan.cc/index.php/video/index/146',
  248. 'http://www.jojy.cn/vod/index.php/video/index/399',
  249. 'http://v.cyzone.cn/index.php/video/index/99',
  250. ]
  251. list1 = ['http://192.168.25.7:8079/vinfoant/versionasdfdf']
  252. infoGripper = InfoGripper()
  253. for url in list:
  254. infoGripper.gripinfo(url)
  255. del infoGripper

WEB服务及任务调度

  1. '''''
  2. Created on 2010-9-15
  3. @author: chenggong
  4. '''
  5. # -*- coding: utf-8 -*-
  6. import string,cgi,time
  7. from os import curdir,sep
  8. from BaseHTTPServer import BaseHTTPRequestHandler,HTTPServer
  9. from InfoGripper import *
  10. import re
  11. import MySQLdb
  12. import time
  13. import threading
  14. import urllib
  15. import urllib2
  16. PORT = 8079
  17. VERSION = 0.1
  18. DBCHARSET = "utf8"
  19. PARAMS = [
  20. 'callback',
  21. 'sessionId',
  22. 'retry',
  23. 'retryInterval',
  24. 'dbhost',
  25. 'dbport',
  26. 'db',
  27. 'dbuser',
  28. 'dbpass',
  29. 'videoId'
  30. ]
  31. DBMAP = ['video_id',
  32. 'ccsiteid',
  33. 'ccvid',
  34. 'desc_url',
  35. 'site_id',
  36. 'title',
  37. 'post_time',
  38. 'author',
  39. 'elapse',
  40. 'channel',
  41. 'tags',
  42. 'create_time',
  43. 'check_time',
  44. 'status']
  45. '''''
  46. ERROR CODE定义
  47. '''
  48. ERR_OK = 0
  49. ERR_PARAM = 1
  50. ERR_HTTP_TIMEOUT = 5
  51. ERR_HTTP_STATUS = 6
  52. ERR_DB_CONNECT_FAIL = 8
  53. ERR_DB_SQL_FAIL = 9
  54. ERR_GRIPVIEW = 11
  55. ERR_UNKNOW = 12
  56. '''''
  57. 数据库适配器
  58. '''
  59. class DBAdapter(object):
  60. def __init__(self):
  61. self.param = {'ip':'',
  62. 'port':0,
  63. 'user':'',
  64. 'pw':'',
  65. 'db':''}
  66. self.connect_once = False  #是否连接过数据库
  67. '''''
  68. 创建/更新数据库连接池
  69. '''
  70. def connect(self,ip,port,user,pw,db):
  71. if( ip != self.param['ip'] or
  72. port != self.param['port'] or
  73. user != self.param['user'] or
  74. pw != self.param['pw'] or
  75. db != self.param['db']):
  76. Tools.writelog('info','更换数据库连接池,ip='+ip+',port='+port+',user='+user+',pw='+pw+',db='+db)
  77. try:
  78. if self.connect_once == True: #释放上次连接
  79. self.cur.close()
  80. self.conn.close()
  81. self.conn=MySQLdb.connect(user=user,passwd=pw,db=db,host=ip,port=int(port))
  82. self.conn.set_character_set(DBCHARSET)
  83. self.connect_once = True
  84. self.cur=self.conn.cursor(MySQLdb.cursors.Cursor)
  85. self.param['ip'] = ip
  86. self.param['port'] = port
  87. self.param['user'] = user
  88. self.param['pw'] = pw
  89. self.param['db'] = db
  90. except:
  91. Tools.writelog('error',u'数据库连接失败',True)
  92. raise
  93. else:
  94. Tools.writelog('info',u'数据库连接成功')
  95. '''''
  96. 执行SQL语句
  97. '''
  98. def execute(self,sql):
  99. Tools.writelog('debug',u'执行SQL: '+sql)
  100. try:
  101. self.cur.execute(sql)
  102. except:
  103. Tools.writelog('error',u'SQL执行错误:'+sql)
  104. raise
  105. '''''
  106. 查询数据库
  107. '''
  108. def query(self,sql):
  109. row = {}
  110. self.execute(sql)
  111. row=self.cur.fetchall()
  112. return row
  113. '''''
  114. 视频错误
  115. '''
  116. def updateErr(self,videoId):
  117. nowtime = time.strftime('%Y-%m-%d-%H-%M-%S',time.localtime(time.time()))
  118. sql = "UPDATE videos SET "
  119. sql += "check_time='" + nowtime +"',"
  120. sql += "status=-1 "
  121. sql += "WHERE video_id="+videoId
  122. self.execute(sql)
  123. self.conn.commit()
  124. '''''
  125. 更新查询结果
  126. '''
  127. def update(self,obj,videoId,isUpdateTitle=True):
  128. Tools.writelog('debug','开始更新数据库')
  129. try:
  130. #更新video表
  131. sql = "UPDATE videos SET "
  132. if(obj['ccsiteId'] !="" ):
  133. sql += "ccsiteid='" + obj['ccsiteId'] + "',"
  134. if(obj['ccVid'] != "" ):
  135. sql += "ccvid='" + obj['ccVid'] + "',"
  136. if isUpdateTitle:
  137. sql += "title='" + obj['title'] + "',"
  138. sql += "post_time='" + obj['release'] + "',"
  139. sql += "author='" + obj['user'] + "',"
  140. sql += "channel='" + obj['channel'] + "',"
  141. sql += "tags='" + obj['tag'] + "',"
  142. nowtime = time.strftime('%Y-%m-%d-%H-%M-%S',time.localtime(time.time()))
  143. sql += "check_time='" + nowtime +"',"
  144. sql += "status=0 "
  145. sql += "WHERE video_id="+videoId
  146. self.execute(sql)
  147. #更新count表
  148. if( obj['views'] != 'error' ):
  149. nowdate = time.strftime('%Y-%m-%d',time.localtime(time.time()))
  150. sql = "SELECT * FROM counts WHERE "
  151. sql += "date = '" + nowdate + "' and video_id=" + videoId
  152. rst = self.query(sql)
  153. if len(rst) > 0:#如果当天已有记录,则更新
  154. sql = "UPDATE counts SET count="+obj['views']
  155. sql +=" WHERE video_id=" + videoId + " AND date='" + nowdate+ "'"
  156. else:#否则插入
  157. sql = "INSERT INTO counts VALUES"
  158. sql += "(null," +videoId+",'"+nowdate+"',"+obj['views'] + ")"
  159. self.execute(sql)
  160. self.conn.commit()
  161. Tools.writelog('debug', "db commit ok")
  162. return ERR_OK
  163. except Exception,e:
  164. print e
  165. return ERR_DB_SQL_FAIL
  166. '''''
  167. 任务线程类
  168. '''
  169. class TaskThread(threading.Thread):
  170. def setTaskTool(self,dbAdapter,gripper):
  171. self.dbAdapter = dbAdapter
  172. self.gripper = gripper
  173. def setParam(self,param):
  174. self.param = param
  175. self.videoId = param['videoId']
  176. assert self.videoId != ""
  177. def init(self):
  178. self.views = "0"
  179. self.errcode = ERR_OK
  180. def run(self):
  181. Tools.writelog('debug','开始爬虫任务,sessionId='+self.param['sessionId'])
  182. self.init()
  183. try:
  184. #更新数据库连接
  185. self.dbAdapter.connect(self.param['dbhost'],self.param['dbport'],self.param['dbuser'],self.param['dbpass'],self.param['db'])
  186. except:
  187. self.errcode = ERR_DB_CONNECT_FAIL #数据库连接失败
  188. callback(self.errcode)
  189. return
  190. #查询该vid的视频
  191. sql = "SELECT "
  192. for column in DBMAP:
  193. sql += column
  194. if column != DBMAP[len(DBMAP)-1]:
  195. sql += ","
  196. sql += " FROM videos"
  197. sql += " WHERE video_id="+self.videoId
  198. video = self.dbAdapter.query(sql)
  199. assert not (len(video)>1 or len(video)==0) #有且仅有一条记录
  200. url = video[0][3]
  201. assert url != ""
  202. try:
  203. rst = self.gripper.gripinfo(url)
  204. except urllib2.HTTPError,e:
  205. self.errcode = ERR_HTTP_STATUS #HTTP状态错误
  206. self.dbAdapter.updateErr(self.videoId)
  207. except urllib2.URLError,e:
  208. self.errcode = ERR_HTTP_TIMEOUT #HTTP连接超时
  209. self.dbAdapter.updateErr(self.videoId)
  210. except:
  211. self.errcode = ERR_UNKNOW #未知错误
  212. self.dbAdapter.updateErr(self.videoId)
  213. else:
  214. self.views = rst['views']
  215. if self.views == "error":
  216. self.views = "-1"
  217. self.errcode = ERR_GRIPVIEW #数据抓取成功,点击量抓取失败
  218. #更新数据库(特殊处理,如果原title中有 "-" 则不更新title字段)
  219. title = video[0][5]
  220. assert title != ""
  221. if re.match('.*-.*', title):
  222. self.errocde = self.dbAdapter.update(rst,self.videoId,True)
  223. else:
  224. self.errcode = self.dbAdapter.update(rst,self.videoId)
  225. self.callback(self.errcode)
  226. Tools.writelog('info','任务结束,sessionId='+self.param['sessionId'])
  227. return
  228. def callback(self,errcode):
  229. results = {'errorcode':errcode,'count':int(self.views)}
  230. results = urllib.urlencode(results)
  231. results = results.replace('&', '%26')
  232. url = self.param['callback']
  233. url += "?"
  234. url += "sessionId=" + self.param['sessionId']
  235. url += "&results=" + results
  236. retry = 0
  237. while True:
  238. try:
  239. Tools.writelog('debug',"回调主控,url="+url)
  240. urllib2.urlopen(url)
  241. Tools.writelog('debug','回调成功')
  242. break
  243. except urllib2.URLError, e: #超时、错误
  244. Tools.writelog('debug','回调主控超时,%s秒后重试'%self.param['retryInterval'])
  245. retry+=1
  246. time.sleep(int(self.param['retryInterval']))
  247. if( retry > int(self.param['retry'])):
  248. Tools.writelog('error','回调主控失败')
  249. return
  250. '''''
  251. WEB服务类
  252. '''
  253. class MyHandler(BaseHTTPRequestHandler):
  254. dbAdapter = DBAdapter()
  255. gripper = InfoGripper()
  256. def pageSuccess(self):
  257. self.send_response(200)
  258. self.send_header('Content-type', 'text/html')
  259. self.end_headers()
  260. def pageFail(self):
  261. self.send_error(404, "not found")
  262. def getValue(self,param):
  263. src = self.path + '&'
  264. reg = param + '=' + '(.*?)&'
  265. value = Tools.getFromPatten(reg,src,True)
  266. return value
  267. def do_GET(self):
  268. isGetVersion = re.match('.*vinfoant/version.*', self.path)
  269. isTask = re.match('.*vinfoant/run.*', self.path)
  270. if( isGetVersion ):
  271. self.pageSuccess()
  272. self.wfile.write(VERSION)
  273. elif( isTask ):
  274. self.pageSuccess()
  275. param = {}
  276. for p in PARAMS:
  277. param[p] = self.getValue(p) #获取各项参数
  278. taskThread = TaskThread()
  279. taskThread.setTaskTool(self.dbAdapter, self.gripper)
  280. taskThread.setParam(param)
  281. taskThread.start()#启动任务线程
  282. self.wfile.write("ok")
  283. else:
  284. self.pageFail()
  285. return
  286. '''''
  287. 启动WEB服务,全局入口
  288. '''
  289. def startHttpd():
  290. try:
  291. Tools.writelog('debug','httpd start..listen on '+str(PORT))
  292. httpd = HTTPServer(('',PORT), MyHandler )
  293. Tools.writelog('debug','success')
  294. httpd.serve_forever()
  295. except KeyboardInterrupt:
  296. Tools.writelog('debug','httpd close..')
  297. httpd.socket.close()
  298. if __name__ == '__main__':
  299. startHttpd()