动态爬虫之qzone空间自动秒赞 - qq空间自动点赞

时间:2025-01-17 10:42:46
  • import json
  • import os
  • import pickle
  • import platform
  • import time
  • import urllib
  • from _sha1 import sha1
  • from selenium import webdriver
  • from selenium.common.exceptions import NoSuchElementException
  • from import DesiredCapabilities, ActionChains
  • class qzone_dlewares(object):
  • # 浏览器请求头
  • headers = {'Accept': '*/*',
  • 'Accept-Language': 'en-US,en;q=0.8',
  • 'Cache-Control': 'max-age=0',
  • 'User-Agent': 'Mozilla/5.0 (Linux; U; Android 2.3.6; zh-cn; GT-S5660 Build/GINGERBREAD) AppleWebKit/533.1 (KHTML, like Gecko) Version/4.0 Mobile Safari/533.1 MicroMessenger/4.5.255',
  • 'Connection': 'keep-alive', }
  • webdriverPath = 'E:\\phantomjs-2.1.1-windows\\bin\\'
  • attachinfo = False
  • # 初始化浏览器
  • def __init__(self, userName='', password='', *args, **kwargs):
  • self.userName = userName
  • self.password = password
  • desired_capabilities = .copy()
  • for key, value in self.():
  • desired_capabilities['.{}'.format(key)] = value
  • # 禁止加载图片
  • desired_capabilities[""] = False
  • self.driver = (executable_path=self.webdriverPath, desired_capabilities=desired_capabilities)
  • # 设置屏幕大小
  • self.driver.set_window_size(414, 736)
  • # 开始请求并且截图
  • def startQzoneRequest(self):
  • # 开始请求qzone
  • self.driver.get('')
  • # 截图保存到当前项目下
  • self.driver.save_screenshot('')
  • # 判断是否登录了
  • def isLogin(self):
  • try:
  • u = self.driver.find_element_by_xpath('//*[@]')
  • p = self.driver.find_element_by_xpath('//*[@]')
  • go = self.driver.find_element_by_xpath('//*[@]')
  • except NoSuchElementException:
  • return True
  • return False
  • def loginQzone(self):
  • u = self.driver.find_element_by_xpath('//*[@]')
  • p = self.driver.find_element_by_xpath('//*[@]')
  • go = self.driver.find_element_by_xpath('//*[@]')
  • # 清理账号和密码
  • ()
  • ()
  • # 移动到账号框模仿键盘输入账号
  • action = ActionChains(self.driver)
  • action.move_to_element(u)
  • (u)
  • # 模仿键盘输入账号
  • action.send_keys(self.userName)
  • # 移动到密码输入框
  • action.move_to_element(p)
  • (p)
  • # 模仿键盘输入密码
  • action.send_keys(self.password)
  • # 点击登录
  • action.move_by_offset(go.location['x'], go.location['y'])
  • (go)
  • # 执行登录
  • action.perform()
  • # 休息1秒保证能执行
  • time.sleep(1)
  • # 截图保存到当前项目下
  • self.driver.save_screenshot('')
  • def save_verify_code(self, element):
  • url = element.get_attribute('src')
  • fileName = element.get_attribute('id') + '.jpg'
  • (url, fileName)
  • # 校验码
  • def check_code(self):
  • # 先切换到默认的窗口
  • self.driver.switch_to.default_content()
  • iframe = None
  • try:
  • # 验证码
  • iframe = self.driver.find_element_by_xpath('//*[@]/iframe[2]')
  • except NoSuchElementException:
  • print('无需输入验证码')
  • else:
  • self.driver.switch_to.frame(iframe)
  • self.verify_code()
  • # 验证码
  • def verify_code(self):
  • que_code = self.driver.find_element_by_xpath('//*[@]')
  • que_input = self.driver.find_element_by_xpath('//*[@]')
  • que_but = self.driver.find_element_by_xpath('//*[@]')
  • # 保存验证码
  • self.save_verify_code(que_code)
  • verify_path = que_code.get_attribute('id') + '.jpg'
  • # 输入验证码
  • if (self.isWindows()):
  • (verify_path)
  • else:
  • .call(["xdg-open", verify_path])
  • input_verify_code = input("验证码:")
  • # 模仿用户输入
  • action = ActionChains(self.driver)
  • action.move_to_element(que_input)
  • ()
  • action.send_keys(input_verify_code)
  • action.move_to_element(que_but)
  • ()
  • # 执行
  • action.perform()
  • def paresHtml(self):
  • time.sleep(1)
  • # 切换到默认的容器
  • self.driver.switch_to.default_content()
  • # 获取动态列表参数
  • qzonetoken = self.driver.execute_script('return window.shine0callback')
  • g_tk = self.driver.execute_script('return ()')
  • res_type = '0'
  • res_attach = self.attachinfo and self.attachinfo or self.driver.execute_script(
  • 'return ')
  • refresh_type = '2'
  • attach_info = res_attach
  • format = 'json'
  • # 动态列表
  • sctiveFeeds = '/webapp/json/mqzone_feeds/getActiveFeeds?qzonetoken=%s&g_tk=%s' % (
  • qzonetoken, g_tk)
  • # 执行请求,并且返回结果
  • resultStr = next(self.sendRequest(sctiveFeeds, 'POST',
  • 'res_type=%s&res_attach=%s&refresh_type=%s&format=%s&attach_info=%s' % (
  • res_type, res_attach, refresh_type, format, attach_info)))
  • result = (resultStr)
  • print(resultStr)
  • # 判断数据是否正确
  • if result['ret'] == 0 and result['code'] == 0:
  • self.attachinfo = result['data']['attachinfo']
  • for item in result['data']['vFeeds']:
  • self.paresLikeList(item, qzonetoken, g_tk)
  • else:
  • print(result['message'])
  • resultStr = None
  • ()
  • def paresLikeList(self, item, qzonetoken, g_tk):
  • # 点赞参数
  • opuin = self.driver.execute_script('return ()')
  • unikey = item['comm']['orglikekey']
  • curkey = item['comm']['curlikekey']
  • appid = item['comm']['appid']
  • opr_type = 'like'
  • format = 'purejson'
  • likeresult = None
  • # 是否已经点赞,如果是那就不调用
  • liked = 'like' not in item or not 1 == item['like']['isliked']
  • if liked:
  • dolike = '/proxy/domain//cgi-bin/likes/internal_dolike_app?qzonetoken=%s&g_tk=%s' % (
  • qzonetoken, g_tk)
  • likeresult = (next(self.sendRequest(dolike, 'POST',
  • 'opuin=%s&unikey=%s&curkey=%s&appid=%s&opr_type=%s&format=%s' % (
  • opuin, unikey, curkey, appid, opr_type, format))))
  • if (not likeresult == None) and likeresult['ret'] == 0:
  • content = ''
  • if 'summary' in item:
  • content = item['summary']['summary']
  • elif 'cell_summary' in item:
  • content = item['cell_summary']['summary']
  • elif 'original' in item:
  • content = item['original']['cell_summary']['summary']
  • else:
  • content = '未知内容'
  • print('点赞成功:%s %s' % (item['userinfo']['user']['nickname'], content))
  • ()
  • def sendRequest(self, url, method, data):
  • cname = 'request_%d' % time.time()
  • self.driver.execute_script('window.%s = new XMLHttpRequest;' % cname)
  • self.driver.execute_script('window.%=true;' % cname)
  • self.driver.execute_script(
  • 'window.%(\'%s\',\'%s\',false)' % (cname, method, url))
  • self.driver.execute_script(
  • 'window.% = function (e){window.%s = window.%;}' % (
  • cname, ('%s_result' % cname), cname))
  • self.driver.execute_script(
  • 'window.%("%s")' % (cname, data))
  • yield self.driver.execute_script('return window.%s' % ('%s_result' % cname))
  • # 内存回收
  • self.driver.execute_script(' window.%s = undefined;window.%s = undefined;' % (('%s_result' % cname), cname))
  • # 是 windows 系统
  • def isWindows(self):
  • sysstr = ()
  • if (sysstr == "Windows"):
  • return True
  • return False
  • # 保存登录 cookies
  • def save_cookies(self):
  • with open(self.hashCode(), 'wb') as f:
  • obj = self.driver.get_cookies()
  • (obj, f)
  • f.close()
  • # 读取并设置 cookies
  • def load_cookies(self):
  • fileName = self.hashCode()
  • # 判断文件是否存在
  • if self.file_exists(fileName):
  • f = open(fileName, 'rb')
  • obj = (file=f)
  • f.close()
  • # 循环设置 cookie
  • try:
  • for cookie in obj:
  • self.driver.add_cookie(cookie)
  • except Exception as e:
  • print(e)
  • def delete_cookies(self):
  • (self.hashCode())
  • # hasCode
  • def hashCode(self):
  • sha = sha1()
  • (b'qzone_cookies')
  • return ()
  • # 判断文件是否存在
  • def file_exists(self, filename):
  • try:
  • with open(filename) as f:
  • return True
  • except IOError:
  • return False
  • # 退出浏览器
  • def __del__(self):
  • self.()
  • def startQzoneBaselanding():
  • # 事先输入账号和密码
  • userName = '***'
  • password = '***'
  • oldTime = time.time()
  • browser = qzone_dlewares(userName=userName, password=password)
  • # 加载cookies
  • browser.load_cookies()
  • initTime = time.time()
  • # 打开浏览器并且截图
  • ()
  • requestTime = time.time()
  • # 判断是否登录
  • if (not ()):
  • # 模仿用户登录
  • ()
  • # 检查code
  • browser.check_code()
  • currentTime = time.time()
  • # 解析动态
  • ()
  • # 运行完成后再截图一次
  • .save_screenshot('')
  • # 保存cookies
  • browser.save_cookies()
  • print('开始时间 %f' % oldTime)
  • print('结束时间 %f' % currentTime)
  • print('初始化时间 %f' % (initTime - oldTime))
  • print('加载页面时间 %f' % (requestTime - initTime))
  • print('模仿操作时间 %f' % (currentTime - requestTime))
  • print('总运行时间 %f' % (currentTime - oldTime))
  • return browser
  • if __name__ == '__main__':
  • # 第一次查询
  • browser = startQzoneBaselanding()
  • starttime = time.time()
  • while True:
  • # 两小时刷新
  • currentTime = (time.time() - starttime) / 60 / 60
  • if currentTime >= 2:
  • ()
  • browser.delete_cookies()
  • browser = startQzoneBaselanding()
  • continue
  • # 20秒刷新
  • time.sleep(20)
  • ()