import json
import os
import pickle
import platform
import time
import urllib
from _sha1 import sha1
from selenium import webdriver
from selenium.common.exceptions import NoSuchElementException
from import DesiredCapabilities, ActionChains
class qzone_dlewares(object):
# 浏览器请求头
headers = {'Accept': '*/*',
'Accept-Language': 'en-US,en;q=0.8',
'Cache-Control': 'max-age=0',
'User-Agent': 'Mozilla/5.0 (Linux; U; Android 2.3.6; zh-cn; GT-S5660 Build/GINGERBREAD) AppleWebKit/533.1 (KHTML, like Gecko) Version/4.0 Mobile Safari/533.1 MicroMessenger/4.5.255',
'Connection': 'keep-alive', }
webdriverPath = 'E:\\phantomjs-2.1.1-windows\\bin\\'
attachinfo = False
# 初始化浏览器
def __init__(self, userName='', password='', *args, **kwargs):
self.userName = userName
self.password = password
desired_capabilities = .copy()
for key, value in self.():
desired_capabilities['.{}'.format(key)] = value
# 禁止加载图片
desired_capabilities[""] = False
self.driver = (executable_path=self.webdriverPath, desired_capabilities=desired_capabilities)
# 设置屏幕大小
self.driver.set_window_size(414, 736)
# 开始请求并且截图
def startQzoneRequest(self):
# 开始请求qzone
self.driver.get('')
# 截图保存到当前项目下
self.driver.save_screenshot('')
# 判断是否登录了
def isLogin(self):
try:
u = self.driver.find_element_by_xpath('//*[@]')
p = self.driver.find_element_by_xpath('//*[@]')
go = self.driver.find_element_by_xpath('//*[@]')
except NoSuchElementException:
return True
return False
def loginQzone(self):
u = self.driver.find_element_by_xpath('//*[@]')
p = self.driver.find_element_by_xpath('//*[@]')
go = self.driver.find_element_by_xpath('//*[@]')
# 清理账号和密码
()
()
# 移动到账号框模仿键盘输入账号
action = ActionChains(self.driver)
action.move_to_element(u)
(u)
# 模仿键盘输入账号
action.send_keys(self.userName)
# 移动到密码输入框
action.move_to_element(p)
(p)
# 模仿键盘输入密码
action.send_keys(self.password)
# 点击登录
action.move_by_offset(go.location['x'], go.location['y'])
(go)
# 执行登录
action.perform()
# 休息1秒保证能执行
time.sleep(1)
# 截图保存到当前项目下
self.driver.save_screenshot('')
def save_verify_code(self, element):
url = element.get_attribute('src')
fileName = element.get_attribute('id') + '.jpg'
(url, fileName)
# 校验码
def check_code(self):
# 先切换到默认的窗口
self.driver.switch_to.default_content()
iframe = None
try:
# 验证码
iframe = self.driver.find_element_by_xpath('//*[@]/iframe[2]')
except NoSuchElementException:
print('无需输入验证码')
else:
self.driver.switch_to.frame(iframe)
self.verify_code()
# 验证码
def verify_code(self):
que_code = self.driver.find_element_by_xpath('//*[@]')
que_input = self.driver.find_element_by_xpath('//*[@]')
que_but = self.driver.find_element_by_xpath('//*[@]')
# 保存验证码
self.save_verify_code(que_code)
verify_path = que_code.get_attribute('id') + '.jpg'
# 输入验证码
if (self.isWindows()):
(verify_path)
else:
.call(["xdg-open", verify_path])
input_verify_code = input("验证码:")
# 模仿用户输入
action = ActionChains(self.driver)
action.move_to_element(que_input)
()
action.send_keys(input_verify_code)
action.move_to_element(que_but)
()
# 执行
action.perform()
def paresHtml(self):
time.sleep(1)
# 切换到默认的容器
self.driver.switch_to.default_content()
# 获取动态列表参数
qzonetoken = self.driver.execute_script('return window.shine0callback')
g_tk = self.driver.execute_script('return ()')
res_type = '0'
res_attach = self.attachinfo and self.attachinfo or self.driver.execute_script(
'return ')
refresh_type = '2'
attach_info = res_attach
format = 'json'
# 动态列表
sctiveFeeds = '/webapp/json/mqzone_feeds/getActiveFeeds?qzonetoken=%s&g_tk=%s' % (
qzonetoken, g_tk)
# 执行请求,并且返回结果
resultStr = next(self.sendRequest(sctiveFeeds, 'POST',
'res_type=%s&res_attach=%s&refresh_type=%s&format=%s&attach_info=%s' % (
res_type, res_attach, refresh_type, format, attach_info)))
result = (resultStr)
print(resultStr)
# 判断数据是否正确
if result['ret'] == 0 and result['code'] == 0:
self.attachinfo = result['data']['attachinfo']
for item in result['data']['vFeeds']:
self.paresLikeList(item, qzonetoken, g_tk)
else:
print(result['message'])
resultStr = None
()
def paresLikeList(self, item, qzonetoken, g_tk):
# 点赞参数
opuin = self.driver.execute_script('return ()')
unikey = item['comm']['orglikekey']
curkey = item['comm']['curlikekey']
appid = item['comm']['appid']
opr_type = 'like'
format = 'purejson'
likeresult = None
# 是否已经点赞,如果是那就不调用
liked = 'like' not in item or not 1 == item['like']['isliked']
if liked:
dolike = '/proxy/domain//cgi-bin/likes/internal_dolike_app?qzonetoken=%s&g_tk=%s' % (
qzonetoken, g_tk)
likeresult = (next(self.sendRequest(dolike, 'POST',
'opuin=%s&unikey=%s&curkey=%s&appid=%s&opr_type=%s&format=%s' % (
opuin, unikey, curkey, appid, opr_type, format))))
if (not likeresult == None) and likeresult['ret'] == 0:
content = ''
if 'summary' in item:
content = item['summary']['summary']
elif 'cell_summary' in item:
content = item['cell_summary']['summary']
elif 'original' in item:
content = item['original']['cell_summary']['summary']
else:
content = '未知内容'
print('点赞成功:%s %s' % (item['userinfo']['user']['nickname'], content))
()
def sendRequest(self, url, method, data):
cname = 'request_%d' % time.time()
self.driver.execute_script('window.%s = new XMLHttpRequest;' % cname)
self.driver.execute_script('window.%=true;' % cname)
self.driver.execute_script(
'window.%(\'%s\',\'%s\',false)' % (cname, method, url))
self.driver.execute_script(
'window.% = function (e){window.%s = window.%;}' % (
cname, ('%s_result' % cname), cname))
self.driver.execute_script(
'window.%("%s")' % (cname, data))
yield self.driver.execute_script('return window.%s' % ('%s_result' % cname))
# 内存回收
self.driver.execute_script(' window.%s = undefined;window.%s = undefined;' % (('%s_result' % cname), cname))
# 是 windows 系统
def isWindows(self):
sysstr = ()
if (sysstr == "Windows"):
return True
return False
# 保存登录 cookies
def save_cookies(self):
with open(self.hashCode(), 'wb') as f:
obj = self.driver.get_cookies()
(obj, f)
f.close()
# 读取并设置 cookies
def load_cookies(self):
fileName = self.hashCode()
# 判断文件是否存在
if self.file_exists(fileName):
f = open(fileName, 'rb')
obj = (file=f)
f.close()
# 循环设置 cookie
try:
for cookie in obj:
self.driver.add_cookie(cookie)
except Exception as e:
print(e)
def delete_cookies(self):
(self.hashCode())
# hasCode
def hashCode(self):
sha = sha1()
(b'qzone_cookies')
return ()
# 判断文件是否存在
def file_exists(self, filename):
try:
with open(filename) as f:
return True
except IOError:
return False
# 退出浏览器
def __del__(self):
self.()
def startQzoneBaselanding():
# 事先输入账号和密码
userName = '***'
password = '***'
oldTime = time.time()
browser = qzone_dlewares(userName=userName, password=password)
# 加载cookies
browser.load_cookies()
initTime = time.time()
# 打开浏览器并且截图
()
requestTime = time.time()
# 判断是否登录
if (not ()):
# 模仿用户登录
()
# 检查code
browser.check_code()
currentTime = time.time()
# 解析动态
()
# 运行完成后再截图一次
.save_screenshot('')
# 保存cookies
browser.save_cookies()
print('开始时间 %f' % oldTime)
print('结束时间 %f' % currentTime)
print('初始化时间 %f' % (initTime - oldTime))
print('加载页面时间 %f' % (requestTime - initTime))
print('模仿操作时间 %f' % (currentTime - requestTime))
print('总运行时间 %f' % (currentTime - oldTime))
return browser
if __name__ == '__main__':
# 第一次查询
browser = startQzoneBaselanding()
starttime = time.time()
while True:
# 两小时刷新
currentTime = (time.time() - starttime) / 60 / 60
if currentTime >= 2:
()
browser.delete_cookies()
browser = startQzoneBaselanding()
continue
# 20秒刷新
time.sleep(20)
()