python爬虫基础_webwechat

时间:2021-09-19 21:00:14

简单的模拟:借用微信网页版,写个扫码页面,登录页面,实现简单的登录、联系人列表、发消息,收消息。

以下是笔记:

#!/usr/bin/env python
# coding:utf-8 from flask import Flask, render_template, request, session, jsonify,redirect,url_for
import time, re, requests, json
from bs4 import BeautifulSoup app = Flask(__name__)
app.debug = True
app.secret_key = "LSHM" # 使用session必须要有key # xml转变字典函数
def xml_parser(text):
dic = {}
soup = BeautifulSoup(text, 'html.parser')
div = soup.find(name='error') # for item in div.children(recursive=False): # 不使用children, 因为它会找文本, find_all则只找标签
for item in div.find_all(recursive=False): # 不使用递归,也就是只找儿子
dic[item.name] = item.text
return dic @app.route('/')
def home():
return redirect(url_for('login')) @app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == "GET":
ctime = str(int(time.time() * 1000))
qrcode_url = "https://login.wx.qq.com/jslogin?appid=wx782c26e4c19acffb&redirect_uri=https%3A%2F%2Fwx.qq.com%2Fcgi-bin%2Fmmwebwx-bin%2Fwebwxnewloginpage&fun=new&lang=zh_CN&_={}".format(
ctime) ret = requests.get(qrcode_url)
# print(ret.text)
qrcode = re.findall('uuid = "(.*)";', ret.text)[0]
# print(qrcode)
session['qrcode'] = qrcode
return render_template("login.html", qr=qrcode)
else:
pass @app.route('/check_login')
def check_login():
'''
发送GET请求,检测是否已经扫码、登录
https://login.wx.qq.com/cgi-bin/mmwebwx-bin/login?loginicon=true&uuid=IY02Sx7eyQ==&tip=0&r=-1908176756&_=1530916511143
:return:
'''
response = {"code": 408}
qrcode = session.get("qrcode")
ctime = str(int(time.time() * 1000))
check_url = "https://login.wx.qq.com/cgi-bin/mmwebwx-bin/login?loginicon=true&uuid={0}&tip=0&r=-1908176756&_={1}".format(
qrcode, ctime)
# time.sleep(10)
ret = requests.get(check_url)
# print(ret.text) if "window.code=201" in ret.text: # 扫码成功
src = re.findall("window.userAvatar = '(.*)';", ret.text)[0]
response["code"] = 201
response["src"] = src
elif "window.code=200" in ret.text: # 确认登录
redirect_uri = re.findall('window.redirect_uri="(.*)";', ret.text)[0] # 获取重定向地址 # 向上面的地址发送请求, 添加2个必要的参数
redirect_uri = redirect_uri + "&fun=new&version=v2" ticket_ret = requests.get(redirect_uri) # 获取凭证
ticket_dict = xml_parser(ticket_ret.text) # 拿到字典
# print(ticket_dict)
session["ticket_dict"] = ticket_dict # 存入session
session["ticket_cookie"] = ticket_ret.cookies.get_dict() # 保存cookie 给后面使用 response["code"] = 200
return jsonify(response) @app.route("/index")
def index():
'''
用户数据初始化
https://wx.qq.com/cgi-bin/mmwebwx-bin/webwxinit?r=-1912609442&pass_ticket=v8tvMrwfudoYLl0dyHNIX5QtJe4BtO%252FGoffihP5Ion0oScWCAU%252F18Avj6ZL1rj%252B6
:return:
'''
ticket_dict = session.get("ticket_dict")
init_url = "https://wx.qq.com/cgi-bin/mmwebwx-bin/webwxinit?r=-1912609442&pass_ticket={0}".format(
ticket_dict.get("pass_ticket")) data_dict = {
"BaseRequest": {
"DeviceID": "e261019482970229",
"Sid": ticket_dict.get("wxsid"),
"Uin": ticket_dict.get("wxuin"),
"Skey": ticket_dict.get("skey"),
}
} init_ret = requests.post(
url=init_url,
json=data_dict, # Payload 对应传json
# data=json_dumps(data_dict), # 使用这种方式,必须要带上headers
# headers={
# 'Content-Type':'application/json'
# }
)
init_ret.encoding = "utf-8"
# print(init_ret.text)
# print(init_ret.json()) # print(json.loads(init_ret.text))
user_dict = init_ret.json()
# print(user_dict) # for user in user_dict['ContactList']: # 最近 联系人列表
# print(user.get('NickName')) session["current_user"] = user_dict['User']
# print(user_dict['SyncKey'])
session["SyncKey"] = user_dict['SyncKey'] # return "用户首页"
return render_template('index.html', user_dict=user_dict) @app.route("/get_img")
def get_img():
# 获取头像
current_user = session.get("current_user")
ticket_cookie = session.get("ticket_cookie") head_url = "https://wx.qq.com" + current_user["HeadImgUrl"]
img_ret = requests.get(head_url, cookies=ticket_cookie, headers={"Content-Type": "image/jpeg"})
return img_ret.content # 直接返回字节 @app.route("/user_list")
def user_list():
ticket_dict = session.get("ticket_dict")
ticket_cookie = session.get("ticket_cookie")
ctime = int(time.time() * 1000)
skey = ticket_dict.get('skey')
user_list_url = "https://wx.qq.com/cgi-bin/mmwebwx-bin/webwxgetcontact?lang=zh_CN&r={0}&seq=0&skey={1}".format(
ctime, skey) r1 = requests.get(user_list_url, cookies=ticket_cookie)
r1.encoding = "utf-8"
wx_user_dict = r1.json()
print(wx_user_dict['MemberCount'])
for item in wx_user_dict['MemberList']:
print(item) return render_template("user_list.html", wx_user_dict=wx_user_dict) ## 发消息的话:需要自己的UserName和对方的UserName
@app.route("/send", methods=["GET", "POST"])
def send():
if request.method == "GET":
return render_template('send.html') ticket_dict = session.get("ticket_dict") current_user = session["current_user"]
from_user = current_user['UserName']
to = request.form.get('to')
content = request.form.get('content')
ctime = str(time.time() * 1000)
msg_url = "https://wx.qq.com/cgi-bin/mmwebwx-bin/webwxsendmsg?lang=zh_CN&pass_ticket={}".format(
ticket_dict['pass_ticket']) data_dict = {
"BaseRequest": {
"DeviceID": "e261019482970229",
"Sid": ticket_dict.get("wxsid"),
"Uin": ticket_dict.get("wxuin"),
"Skey": ticket_dict.get("skey"),
},
"Msg": {
"ClientMsgId": ctime,
"FromUserName": from_user,
"LocalID": ctime,
"ToUserName": to,
"Content": content,
"Type": 1
},
"Scene": 0
} ret = requests.post(
url=msg_url,
data=bytes(json.dumps(data_dict, ensure_ascii=False), encoding="utf-8")
)
print(ret.text)
response ={}
response['to'] = to
response['content'] = content
response['status'] = "成功"
return jsonify(response) @app.route("/recv", methods=["GET", "POST"])
def recv():
# https://wx.qq.com/cgi-bin/mmwebwx-bin/webwxsync?sid={0}&skey={1}&pass_ticket={2} sync_url = "https://webpush.weixin.qq.com/cgi-bin/mmwebwx-bin/synccheck" synckey = session.get("SyncKey")
# print(synckey['List'])
ticket_dict = session.get("ticket_dict")
ticket_cookie = session.get("ticket_cookie")
ctime = str(int(time.time() * 1000)) sync_data_list = []
for item in synckey['List']:
temp = "%s_%s" % (item['Key'], item['Val'])
sync_data_list.append(temp)
sync_data_str = "|".join(sync_data_list) sync_dict = {
"r": ctime,
"skey": ticket_dict['skey'],
"sid": ticket_dict['wxsid'],
"uin": ticket_dict['wxuin'],
"deviceid": "e261019482970229",
"synckey": sync_data_str,
} response_sync = requests.get(sync_url, params=sync_dict, cookies=ticket_cookie)
# print(response_sync.text)
if 'selector:"2"' in response_sync.text:
# https://wx.qq.com/cgi-bin/mmwebwx-bin/webwxsync?sid={}&skey={}&lang=zh_CN&pass_ticket={}
fetch_msg_url = "https://wx.qq.com/cgi-bin/mmwebwx-bin/webwxsync?sid={0}&skey={1}&lang=zh_CN&pass_ticket={2}".format(
ticket_dict['wxsid'], ticket_dict['skey'], ticket_dict['pass_ticket']) form_data = {
'BaseRequest': {
'DeviceID': 'e261019482970229',
'Sid': ticket_dict['wxsid'],
'Skey': ticket_dict['skey'],
'Uin': ticket_dict['wxuin']
},
'SyncKey': synckey,
'rr': ctime
}
response_fetch_msg = requests.post(
fetch_msg_url,
json=form_data,
headers={
'Connection':'keep-alive',
'Accept-Encoding':'gzip, deflate, br',
'X-Requested-With':'XMLHttpRequest',
'user-agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36'
},
cookies=ticket_cookie,
)
response_fetch_msg.encoding = 'utf-8'
res_fetch_msg_dict = response_fetch_msg.json() session["SyncKey"] = res_fetch_msg_dict['SyncKey'] response={}
if res_fetch_msg_dict:
for item in res_fetch_msg_dict['AddMsgList']: response['content'] = item['Content']
response['FromUserName'] = item['FromUserName']
response['ToUserName'] = item['ToUserName'] return jsonify(response)
# return render_template("send.html", res_fetch_msg_dict=res_fetch_msg_dict)
else:
return "ok" if __name__ == '__main__':
app.run()

运行项目(manage.py)后,将自动跳转到login页面,
扫码登录后,会获得个人信息,可以点击 查看所有联系人 ,然后可以在此页面点击 发消息
进入发消息页面,目前只能按UserName来发,未实现按昵称发送。

如果有消息来,可以在收到消息部分看到对方的UserName和消息内容

已知问题:
收消息经常会卡住,
或者报错: requests.exceptions.ConnectionError: ('Connection aborted.', BadStatusLine('HTTP/1.1 0 -\r\n',))

找不出原因。

提交作业后,指导老师给予回复:

感觉自己还差得远呢。

1.我觉得你第26行代码没有必要存在的,你可以在27行代码哪里@app.route('/')加一个就好了

2.我觉得你第33行代码那里还是有问题的,因为这么写字符串太长了不符合pep8规范不说也不好阅读,我觉得你可以定义成下面字典的格式,

{
'loginicon': 'true',
'uuid': session['qrcode'],
'tip': 0,
'r': '-2034746127', # 时间戳取反
'_': int(time.time() * 1000)
} response = requests.get(url=check_login_url, params=‘这个字典’) 这样代码是不是会很舒服 3.还有你第53行代码是不是应该也是int(time.time()*1000)这样去写的? 4.你的第72行代码是不是应该join一下去拼接。 5.你的try处理还是没有 6.你在@app.route("/recv")里是能够写更多的。比如图片语音消息的判断对不对,还有你在这里提交消息格式应该严格安装微信的来。 7.代码的优化这里不多说什么了,有问题和我沟通。