for i in range((shares["total_count"]-1)/ONESHAREPAGE):
try:
dbcurr.execute('INSERT INTO urlids(uk, start, limited, type, status) VALUES(%s, %s, %s, 0, 0)' % (uid, str(ONESHAREPAGE*(i+1)), str(ONESHAREPAGE)))
except Exception as ex:
print "E3", str(ex)
pass
if "records" in shares.keys():
for item in shares["records"]:
try:
dbcurr.execute('INSERT INTO share(userid, filename, shareid, status) VALUES(%s, "%s", %s, 0)' % (uid, item['title'], item['shareid'])) #item['title']恰好是文件名称
#返回的json信息:
except Exception as ex:
#print "E33", str(ex), item
pass
else:
print "delete 0", uid, start
dbcurr.execute('delete from urlids where uk=%s and type=0 and start>%s' % (uid, str(start)))
dbcurr.execute('delete from urlids where id=%s' % (id, ))
dbconn.commit()
except Exception as ex:
print "E5", str(ex), id
dbcurr.close()
dbconn.close() #关闭数据库
def worker():
global success, failed
dbconn = mdb.connect(DB_HOST, DB_USER, DB_PASS, 'baiduyun', charset='utf8')
dbcurr = dbconn.cursor()
dbcurr.execute('SET NAMES utf8')
dbcurr.execute('set global wait_timeout=60000')
#以上是数据库相关设置
while True:
#dbcurr.execute('select * from urlids where status=0 order by type limit 1')
dbcurr.execute('select * from urlids where status=0 and type>0 limit 1') #type>0,为非分享列表
d = dbcurr.fetchall()
#每次取出一条数据出来
#print d
if d: #如果数据存在
id = d[0][0] #请求url编号
uk = d[0][1] #用户编号
start = d[0][2]
limit = d[0][3]
type = d[0][4] #哪种类型
dbcurr.execute('update urlids set status=1 where id=%s' % (str(id),)) #状态更新为1,已经访问过了
url = ""
if type == 0: #分享
url = URL_SHARE.format(uk=uk, start=start, id=id).encode('utf-8') #分享列表格式化
#query_uk uk 查询编号
#start
#urlid id url编号
elif type == 1: #订阅
url = URL_FOLLOW.format(uk=uk, start=start, id=id).encode('utf-8') #订阅列表格式化
elif type == 2: #粉丝
url = URL_FANS.format(uk=uk, start=start, id=id).encode('utf-8') #关注列表格式化
if url:
hc_q.put((type, url)) #如果url存在,则放入请求队列,type表示从哪里获得数据
#通过以上的url就可以获得相应情况下的数据的json数据格式,如分享信息的,订阅信息的,粉丝信息的
#print "processed", url
else: #否则从订阅者或者粉丝的引出人中获得信息来存储,这个过程是爬虫树的下一层扩展
dbcurr.execute('select * from user where status=0 limit 1000')
d = dbcurr.fetchall()
if d:
for item in d:
try:
dbcurr.execute('insert into urlids(uk, start, limited, type, status) values("%s", 0, %s, 0, 0)' % (item[1], str(ONESHAREPAGE)))
#uk 查询号,其实是用户编号
#start 从第1条数据出发获取信息
#
dbcurr.execute('insert into urlids(uk, start, limited, type, status) values("%s", 0, %s, 1, 0)' % (item[1], str(ONEPAGE)))
dbcurr.execute('insert into urlids(uk, start, limited, type, status) values("%s", 0, %s, 2, 0)' % (item[1], str(ONEPAGE)))
dbcurr.execute('update user set status=1 where userid=%s' % (item[1],)) #做个标志,该条数据已经访问过了
#跟新了分享,订阅,粉丝三部分数据
except Exception as ex:
print "E6", str(ex)
else:
time.sleep(1)
dbconn.commit()
dbcurr.close()
dbconn.close()
def main():
print 'starting at:',now()
for item in range(16):
t = threading.Thread(target = req_worker, args = (item,))
t.setDaemon(True)
t.start() #请求线程开启,共开启16个线程
s = threading.Thread(target = worker, args = ())
s.setDaemon(True)
s.start() #worker线程开启
response_worker() #response_worker开始工作
print 'all Done at:', now()