呵呵,前两节好像和python没多大关系。。这节完全是贴代码,
这是我第一次写python,很多地方比较乱,主要就看看逻辑流程吧。
对于编码格式确实搞得我头大。。取下来页面不知道是什么编码,所以先找charset,然后转unicode。统一在unicode下操作,但是数据库是utf8的,WINDOWS的控制台又必须是gbk的,但是我IDE控制台必须是utf8的。。所以才会有DEBUG这个变量存在。。。主要是为了控制输出编码。
本程序连跑了24小时,然后分布式在10台机器上部署,长时间续航基本没有问题。
之后每天将进行10万次网页的爬取。
源码如下:
内容爬取及工具
- '''''
- Created on 2010-9-15
- @author: chenggong
- '''
- import urllib2
- import re
- import socket
- DEBUG = 0
- '''''
- 工具类
- '''
- class Tools():
- #log函数
- @staticmethod
- def writelog(level,info,notify=False):
- if DEBUG == 0:
- try:
- print "["+level+"]"+info.decode('UTF-8').encode('GBK')
- except:
- print "["+level+"]"+info.encode('GBK')
- else:
- print "["+level+"]"+info
- #if notify:
- # print "[notify]报告管理员!!"
- #转unicode
- @staticmethod
- def toUnicode(s,charset):
- if( charset == "" ):
- return s
- else:
- try:
- u = unicode( s, charset )
- except:
- u = ""
- return u
- #正则抓取
- #@param single 是否只抓取一个
- @staticmethod
- def getFromPatten(patten,src,single=False):
- rst = "";
- p = re.compile(patten,re.S)
- all = p.findall(src)
- for matcher in all:
- rst += matcher + " "
- if( single ):
- break
- return rst.strip()
- '''''
- 网页内容爬虫
- '''
- class PageGripper():
- URL_OPEN_TIMEOUT = 10 #网页超时时间
- MAX_RETRY = 3 #最大重试次数
- def __init__(self):
- socket.setdefaulttimeout(self.URL_OPEN_TIMEOUT)
- #获取字符集
- def getCharset(self,s):
- rst = Tools.getFromPatten(u'charset=(.*?)"',s,True)
- if rst != "":
- if rst == "utf8":
- rst = "utf-8"
- return rst
- #尝试获取页面
- def downloadUrl(self,url):
- charset = ""
- page = ""
- retry = 0
- while True:
- try:
- fp = urllib2.urlopen(url)
- break
- except urllib2.HTTPError,e: #状态错误
- Tools.writelog('error','HTTP状态错误 code='+e.code)
- raise urllib2.HTTPError
- except urllib2.URLError,e: #网络错误超时
- Tools.writelog('warn','页面访问超时,重试..')
- retry+=1
- if( retry > self.MAX_RETRY ):
- Tools.writelog('warn','超过最大重试次数,放弃')
- raise urllib2.URLError
- while True:
- line = fp.readline()
- if charset == "":
- charset = self.getCharset(line)
- if not line:
- break
- page += Tools.toUnicode(line,charset)
- fp.close()
- return page
- #获取页面
- def getPageInfo(self,url):
- Tools.writelog( "info","开始抓取网页,url= "+url)
- info = ""
- try:
- info = self.downloadUrl(url)
- except:
- raise
- Tools.writelog("debug","网页抓取成功")
- return info
- '''''
- 内容提取类
- '''
- class InfoGripper():
- pageGripper = PageGripper()
- def __init__(self):
- Tools.writelog('debug',"爬虫启动")
- #抓取标题
- def griptitle(self,data):
- title = Tools.getFromPatten(u'box2t sp"><h3>(.*?)</h3>', data, True)
- if title == "":
- title = Tools.getFromPatten(u'<title>(.*?)[-<]',data,True)
- return title.strip()
- #抓取频道
- def gripchannel(self,data):
- zone = Tools.getFromPatten(u'频道:(.*?)</span>',data,True)
- channel = Tools.getFromPatten(u'<a.*?>(.*?)</a>',zone,True)
- return channel
- #抓取标签
- def griptag(self,data):
- zone = Tools.getFromPatten(u'标签:(.*?)</[^a].*>',data,True);
- rst = Tools.getFromPatten(u'>(.*?)</a>',zone,False);
- return rst
- #抓取观看次数
- def gripviews(self,data):
- rst = Tools.getFromPatten(u'已经有<em class="hot" id="viewcount">(.*?)</em>次观看',data);
- return rst
- #抓取发布时间
- def griptime(self,data):
- rst = Tools.getFromPatten(u'在<em>(.*?)</em>发布',data,True)
- return rst
- #抓取发布者
- def gripuser(self,data):
- rst = Tools.getFromPatten(u'title="点击进入(.*?)的用户空间"',data,True)
- return rst
- #获取页面字符集
- def getPageCharset(self,data):
- charset = Tools.getFromPatten(u'charset=(.*?)"',data,True)
- if( charset == "utf8" ):
- charset = "utf-8"
- return charset
- #获取CC相关数据
- def getCCData(self,data):
- zone = Tools.getFromPatten(u'SWFObject(.*?)</script>',data,True)
- #判断是否使用bokecc播放
- isFromBokeCC = re.match('.*bokecc.com.*', zone)
- if( not isFromBokeCC ):
- return "",""
- ccSiteId = Tools.getFromPatten(u'siteid=(.*?)[&,"]',zone,True)
- ccVid = Tools.getFromPatten(u'vid=(.*?)[&,"]',zone,True)
- return ccSiteId,ccVid
- #获取站内vid
- def gripVideoId(self,data):
- vid = Tools.getFromPatten(u'var vid = "(.*?)"',data,True)
- return vid
- #获取点击量
- def gripViewsAjax(self,vid,url,basedir):
- host = Tools.getFromPatten(u'http://(.*?)/',url,True)
- ajaxAddr = "http://" + host + basedir + "/index.php/ajax/video_statistic/" + vid
- '''''
- try:
- content = self.pageGripper.getPageInfo(ajaxAddr)
- except Exception,e:
- print e
- Tools.writelog ("error", ajaxAddr+u"抓取失败")
- return "error"
- '''
- Tools.writelog('debug', u"开始获取点击量,url="+ajaxAddr)
- while True:
- try:
- fp = urllib2.urlopen(ajaxAddr)
- break
- except urllib2.HTTPError,e: #状态错误
- Tools.writelog('error','HTTP状态错误 code='+"%d"%e.code)
- return ""
- except urllib2.URLError,e: #网络错误超时
- Tools.writelog('warn','页面访问超时,重试..')
- retry+=1
- if( retry > self.MAX_RETRY ):
- Tools.writelog('warn','超过最大重试次数,放弃')
- return ""
- content = fp.read()
- fp.close()
- views = Tools.getFromPatten(u'"viewcount":(.*?),',content,True)
- views = views.replace('"','')
- return views
- #从网页内容中爬取点击量
- def gripViewsFromData(self,data):
- views = Tools.getFromPatten(u'已经有<.*?>(.*?)<.*?>次观看',data,True)
- return views
- def gripBaseDir(self,data):
- dir = Tools.getFromPatten(u"base_dir = '(.*?)'",data,True)
- return dir
- #抓取数据
- def gripinfo(self,url):
- try:
- data = self.pageGripper.getPageInfo(url)
- except:
- Tools.writelog ("error", url+" 抓取失败")
- raise
- Tools.writelog('info','开始内容匹配')
- rst = {}
- rst['title'] = self.griptitle(data)
- rst['channel'] = self.gripchannel(data)
- rst['tag'] = self.griptag(data)
- rst['release'] = self.griptime(data)
- rst['user'] = self.gripuser(data)
- ccdata = self.getCCData(data)
- rst['ccsiteId'] = ccdata[0]
- rst['ccVid'] = ccdata[1]
- views = self.gripViewsFromData(data)
- if views =="" or not views:
- vid = self.gripVideoId(data)
- basedir = self.gripBaseDir(data)
- views = self.gripViewsAjax(vid,url,basedir)
- if( views == "" ):
- views = "error"
- if( views == "error"):
- Tools.writelog("error","获取观看次数失败")
- Tools.writelog("debug","点击量:"+views)
- rst['views'] = views
- Tools.writelog('debug','title=%s,channel=%s,tag=%s'%(rst['title'],rst['channel'],rst['tag']))
- return rst
- '''''
- 单元测试
- '''
- if __name__ == '__main__':
- list = [
- 'http://008yx.com/xbsp/index.php/video/index/3138',
- 'http://vblog.xwhb.com/index.php/video/index/4067',
- 'http://demo.ccvms.bokecc.com/index.php/video/index/3968',
- 'http://vlog.cnhubei.com/wuhan/20100912_56145.html',
- 'http://vlog.cnhubei.com/html/js/30271.html',
- 'http://www.ddvtv.com/index.php/video/index/15',
- 'http://boke.2500sz.com/index.php/video/index/60605',
- 'http://video.zgkqw.com/index.php/video/index/334',
- 'http://yule.hitmv.com/html/joke/27041.html',
- 'http://www.ddvtv.com/index.php/video/index/11',
- 'http://www.zgnyyy.com/index.php/video/index/700',
- 'http://www.kdianshi.com/index.php/video/index/5330',
- 'http://www.aoyatv.com/index.php/video/index/127',
- 'http://v.ourracing.com/html/channel2/64.html',
- 'http://v.zheye.net/index.php/video/index/93',
- 'http://vblog.thmz.com/index.php/video/index/7616',
- 'http://kdianshi.com/index.php/video/index/5330',
- 'http://tv.seeyoueveryday.com/index.php/video/index/95146',
- 'http://sp.zgyangzhi.com/html/ji/2.html',
- 'http://www.xjapan.cc/index.php/video/index/146',
- 'http://www.jojy.cn/vod/index.php/video/index/399',
- 'http://v.cyzone.cn/index.php/video/index/99',
- ]
- list1 = ['http://192.168.25.7:8079/vinfoant/versionasdfdf']
- infoGripper = InfoGripper()
- for url in list:
- infoGripper.gripinfo(url)
- del infoGripper
WEB服务及任务调度
- '''''
- Created on 2010-9-15
- @author: chenggong
- '''
- # -*- coding: utf-8 -*-
- import string,cgi,time
- from os import curdir,sep
- from BaseHTTPServer import BaseHTTPRequestHandler,HTTPServer
- from InfoGripper import *
- import re
- import MySQLdb
- import time
- import threading
- import urllib
- import urllib2
- PORT = 8079
- VERSION = 0.1
- DBCHARSET = "utf8"
- PARAMS = [
- 'callback',
- 'sessionId',
- 'retry',
- 'retryInterval',
- 'dbhost',
- 'dbport',
- 'db',
- 'dbuser',
- 'dbpass',
- 'videoId'
- ]
- DBMAP = ['video_id',
- 'ccsiteid',
- 'ccvid',
- 'desc_url',
- 'site_id',
- 'title',
- 'post_time',
- 'author',
- 'elapse',
- 'channel',
- 'tags',
- 'create_time',
- 'check_time',
- 'status']
- '''''
- ERROR CODE定义
- '''
- ERR_OK = 0
- ERR_PARAM = 1
- ERR_HTTP_TIMEOUT = 5
- ERR_HTTP_STATUS = 6
- ERR_DB_CONNECT_FAIL = 8
- ERR_DB_SQL_FAIL = 9
- ERR_GRIPVIEW = 11
- ERR_UNKNOW = 12
- '''''
- 数据库适配器
- '''
- class DBAdapter(object):
- def __init__(self):
- self.param = {'ip':'',
- 'port':0,
- 'user':'',
- 'pw':'',
- 'db':''}
- self.connect_once = False #是否连接过数据库
- '''''
- 创建/更新数据库连接池
- '''
- def connect(self,ip,port,user,pw,db):
- if( ip != self.param['ip'] or
- port != self.param['port'] or
- user != self.param['user'] or
- pw != self.param['pw'] or
- db != self.param['db']):
- Tools.writelog('info','更换数据库连接池,ip='+ip+',port='+port+',user='+user+',pw='+pw+',db='+db)
- try:
- if self.connect_once == True: #释放上次连接
- self.cur.close()
- self.conn.close()
- self.conn=MySQLdb.connect(user=user,passwd=pw,db=db,host=ip,port=int(port))
- self.conn.set_character_set(DBCHARSET)
- self.connect_once = True
- self.cur=self.conn.cursor(MySQLdb.cursors.Cursor)
- self.param['ip'] = ip
- self.param['port'] = port
- self.param['user'] = user
- self.param['pw'] = pw
- self.param['db'] = db
- except:
- Tools.writelog('error',u'数据库连接失败',True)
- raise
- else:
- Tools.writelog('info',u'数据库连接成功')
- '''''
- 执行SQL语句
- '''
- def execute(self,sql):
- Tools.writelog('debug',u'执行SQL: '+sql)
- try:
- self.cur.execute(sql)
- except:
- Tools.writelog('error',u'SQL执行错误:'+sql)
- raise
- '''''
- 查询数据库
- '''
- def query(self,sql):
- row = {}
- self.execute(sql)
- row=self.cur.fetchall()
- return row
- '''''
- 视频错误
- '''
- def updateErr(self,videoId):
- nowtime = time.strftime('%Y-%m-%d-%H-%M-%S',time.localtime(time.time()))
- sql = "UPDATE videos SET "
- sql += "check_time='" + nowtime +"',"
- sql += "status=-1 "
- sql += "WHERE video_id="+videoId
- self.execute(sql)
- self.conn.commit()
- '''''
- 更新查询结果
- '''
- def update(self,obj,videoId,isUpdateTitle=True):
- Tools.writelog('debug','开始更新数据库')
- try:
- #更新video表
- sql = "UPDATE videos SET "
- if(obj['ccsiteId'] !="" ):
- sql += "ccsiteid='" + obj['ccsiteId'] + "',"
- if(obj['ccVid'] != "" ):
- sql += "ccvid='" + obj['ccVid'] + "',"
- if isUpdateTitle:
- sql += "title='" + obj['title'] + "',"
- sql += "post_time='" + obj['release'] + "',"
- sql += "author='" + obj['user'] + "',"
- sql += "channel='" + obj['channel'] + "',"
- sql += "tags='" + obj['tag'] + "',"
- nowtime = time.strftime('%Y-%m-%d-%H-%M-%S',time.localtime(time.time()))
- sql += "check_time='" + nowtime +"',"
- sql += "status=0 "
- sql += "WHERE video_id="+videoId
- self.execute(sql)
- #更新count表
- if( obj['views'] != 'error' ):
- nowdate = time.strftime('%Y-%m-%d',time.localtime(time.time()))
- sql = "SELECT * FROM counts WHERE "
- sql += "date = '" + nowdate + "' and video_id=" + videoId
- rst = self.query(sql)
- if len(rst) > 0:#如果当天已有记录,则更新
- sql = "UPDATE counts SET count="+obj['views']
- sql +=" WHERE video_id=" + videoId + " AND date='" + nowdate+ "'"
- else:#否则插入
- sql = "INSERT INTO counts VALUES"
- sql += "(null," +videoId+",'"+nowdate+"',"+obj['views'] + ")"
- self.execute(sql)
- self.conn.commit()
- Tools.writelog('debug', "db commit ok")
- return ERR_OK
- except Exception,e:
- print e
- return ERR_DB_SQL_FAIL
- '''''
- 任务线程类
- '''
- class TaskThread(threading.Thread):
- def setTaskTool(self,dbAdapter,gripper):
- self.dbAdapter = dbAdapter
- self.gripper = gripper
- def setParam(self,param):
- self.param = param
- self.videoId = param['videoId']
- assert self.videoId != ""
- def init(self):
- self.views = "0"
- self.errcode = ERR_OK
- def run(self):
- Tools.writelog('debug','开始爬虫任务,sessionId='+self.param['sessionId'])
- self.init()
- try:
- #更新数据库连接
- self.dbAdapter.connect(self.param['dbhost'],self.param['dbport'],self.param['dbuser'],self.param['dbpass'],self.param['db'])
- except:
- self.errcode = ERR_DB_CONNECT_FAIL #数据库连接失败
- callback(self.errcode)
- return
- #查询该vid的视频
- sql = "SELECT "
- for column in DBMAP:
- sql += column
- if column != DBMAP[len(DBMAP)-1]:
- sql += ","
- sql += " FROM videos"
- sql += " WHERE video_id="+self.videoId
- video = self.dbAdapter.query(sql)
- assert not (len(video)>1 or len(video)==0) #有且仅有一条记录
- url = video[0][3]
- assert url != ""
- try:
- rst = self.gripper.gripinfo(url)
- except urllib2.HTTPError,e:
- self.errcode = ERR_HTTP_STATUS #HTTP状态错误
- self.dbAdapter.updateErr(self.videoId)
- except urllib2.URLError,e:
- self.errcode = ERR_HTTP_TIMEOUT #HTTP连接超时
- self.dbAdapter.updateErr(self.videoId)
- except:
- self.errcode = ERR_UNKNOW #未知错误
- self.dbAdapter.updateErr(self.videoId)
- else:
- self.views = rst['views']
- if self.views == "error":
- self.views = "-1"
- self.errcode = ERR_GRIPVIEW #数据抓取成功,点击量抓取失败
- #更新数据库(特殊处理,如果原title中有 "-" 则不更新title字段)
- title = video[0][5]
- assert title != ""
- if re.match('.*-.*', title):
- self.errocde = self.dbAdapter.update(rst,self.videoId,True)
- else:
- self.errcode = self.dbAdapter.update(rst,self.videoId)
- self.callback(self.errcode)
- Tools.writelog('info','任务结束,sessionId='+self.param['sessionId'])
- return
- def callback(self,errcode):
- results = {'errorcode':errcode,'count':int(self.views)}
- results = urllib.urlencode(results)
- results = results.replace('&', '%26')
- url = self.param['callback']
- url += "?"
- url += "sessionId=" + self.param['sessionId']
- url += "&results=" + results
- retry = 0
- while True:
- try:
- Tools.writelog('debug',"回调主控,url="+url)
- urllib2.urlopen(url)
- Tools.writelog('debug','回调成功')
- break
- except urllib2.URLError, e: #超时、错误
- Tools.writelog('debug','回调主控超时,%s秒后重试'%self.param['retryInterval'])
- retry+=1
- time.sleep(int(self.param['retryInterval']))
- if( retry > int(self.param['retry'])):
- Tools.writelog('error','回调主控失败')
- return
- '''''
- WEB服务类
- '''
- class MyHandler(BaseHTTPRequestHandler):
- dbAdapter = DBAdapter()
- gripper = InfoGripper()
- def pageSuccess(self):
- self.send_response(200)
- self.send_header('Content-type', 'text/html')
- self.end_headers()
- def pageFail(self):
- self.send_error(404, "not found")
- def getValue(self,param):
- src = self.path + '&'
- reg = param + '=' + '(.*?)&'
- value = Tools.getFromPatten(reg,src,True)
- return value
- def do_GET(self):
- isGetVersion = re.match('.*vinfoant/version.*', self.path)
- isTask = re.match('.*vinfoant/run.*', self.path)
- if( isGetVersion ):
- self.pageSuccess()
- self.wfile.write(VERSION)
- elif( isTask ):
- self.pageSuccess()
- param = {}
- for p in PARAMS:
- param[p] = self.getValue(p) #获取各项参数
- taskThread = TaskThread()
- taskThread.setTaskTool(self.dbAdapter, self.gripper)
- taskThread.setParam(param)
- taskThread.start()#启动任务线程
- self.wfile.write("ok")
- else:
- self.pageFail()
- return
- '''''
- 启动WEB服务,全局入口
- '''
- def startHttpd():
- try:
- Tools.writelog('debug','httpd start..listen on '+str(PORT))
- httpd = HTTPServer(('',PORT), MyHandler )
- Tools.writelog('debug','success')
- httpd.serve_forever()
- except KeyboardInterrupt:
- Tools.writelog('debug','httpd close..')
- httpd.socket.close()
- if __name__ == '__main__':
- startHttpd()