python爬取微信公众号文章的方法

时间:2022-04-27 02:24:42

最近在学习python3网络爬虫开发实践(崔庆才 著)刚好也学习到他使用代理爬取公众号文章这里,但是照着他的代码写,出现了一些问题。在这里我用到了这本书的前面讲的一些内容进行了完善。(作者写这个代码已经是半年前的事了,但腾讯的网站在这半年前进行了更新)

下面我直接上代码:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
timeout = 20
from requests import request, session, preparedrequest
import requests
from selenium import webdriver
from selenium.common.exceptions import nosuchelementexception
from bs4 import beautifulsoup as bs
import pymysql
 
# 要爬取的内容
keyword = '美女图片'
options = webdriver.chromeoptions()
# 设置中文
options.add_argument('lang=zh_cn.utf-8')
# 更换头部
options.add_argument(
 'user-agent="mozilla/5.0 (windows nt 10.0; win64; x64) applewebkit/537.36 (khtml, like gecko) chrome/69.0.3497.100 safari/537.36"')
browser = webdriver.chrome(chrome_options=options)
redis_host = '192.168.1.248'
redis_port = 6379
redis_password = '*****'
redis_key = 'requests'
proxy_pool_url = 'http://127.0.0.1:8080/random'
max_failed_time = 5
 
mysql_host = 'localhost'
mysql_port = 3306
mysql_user = 'moxiao'
mysql_password = '******'
 
 
class mysqlconn():
 def __init__(self, host=mysql_host, username=mysql_user, password=mysql_password, port=mysql_port):
  """
  mysql 初始化
  :param host:
  :param username:
  :param password:
  :param port:
  """
  try:
   self.db = pymysql.connection(host=host, user=username, password=password,
           database='weixin_data', port=port)
   self.cursor = self.db.cursor()
  except pymysql.mysqlerror as e:
   print(e.args)
 
 def insert(self, table, data):
  keys = ', '.join(data.keys())
  values = ', '.join(['%s'] * len(data))
  sql = 'insert into %s (%s) values (%s)' % (table, keys, values)
  try:
   self.cursor.execute(sql, tuple(data.values()))
   self.db.commit()
  except pymysql.mysqlerror as e:
   print(e.args)
   self.db.rollback()
 
 
class weixinrequest(request):
 def __init__(self, url, callback, method="get", headers=none, need_proxy=false, fail_time=0, timeout=timeout):
  super(weixinrequest, self).__init__(url=url, method=method, headers=headers)
  self.callback = callback
  self.need_proxy = need_proxy
  self.fail_time = fail_time
  self.timeout = timeout
 
 def prepare(self):
  p = preparedrequest()
  p.prepare(
   method=self.method,
   url=self.url,
   headers=self.headers,
  )
  return p
 
 
class weixinresponse():
 
 def __init__(self, text):
  self.text = text
 
 def set_status_code(self, status_code):
  self.status_code = status_code
 
 
import pickle
from redis import strictredis
 
 
class redisqueue():
 def __init__(self):
  """
   初始化redis
  """
  self.db = strictredis(host=redis_host, port=redis_port, password=redis_password, db=3)
 
 def add(self, request):
  """
   向队列添加序列化后的request
  :param request:请求对象
  :return:添加结果
  """
  if isinstance(request, weixinrequest):
   return self.db.rpush(redis_key, pickle.dumps(request))
  return false
 
 def pop(self):
  """
   取出下一个request并反序列化
  :return: request 或者 none
  """
  if self.db.llen(redis_key):
   return pickle.loads(self.db.lpop(redis_key))
  return false
 
 def empty(self):
  return self.db.llen(redis_key) == 0
 
 def del_all(self):
  return self.db.delete(redis_key)
 
 def get_proxy(self):
  """
   从代理池获取代理ip
  :return:
  """
  try:
   response = requests.get(proxy_pool_url)
   if response.status_code == 200:
    print('get proxy', response.text)
    return response.text
  except requests.connectionerror:
   return none
 
 
from urllib.parse import urlencode
from requests import readtimeout, connectionerror
from pyquery import pyquery as pq
 
vald_statues = [200]
 
 
class spider():
 base_url = 'http://weixin.sogou.com/weixin?'
 # 这里的page可以修改,即第几页,我本来想获取所有的个数再除以10 这样就能爬完了,但是我只是测试所以这里并没有做
 # 但如果需要做可以加到schedule方法的while循环内的最下面 即self.params['page']+=1
 params = {'type': 2, 's_from': 'input', 'query': keyword, 'page': 1, 'ie': 'utf8', '_sug_': 'n',
    '_sug_type_': ''}
 headers = {'host': 'weixin.sogou.com',
    'connection': 'keep-alive',
    'cache-control': 'max-age=0',
    'upgrade-insecure-requests': '1',
    'user-agent': 'mozilla/5.0 (windows nt 10.0; win64; x64) applewebkit/537.36 (khtml, like gecko) chrome/69.0.3497.100 safari/537.36',
    'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8',
    'accept-encoding': 'gzip, deflate',
    'accept-language': 'zh-cn,zh;q=0.9',
    'referer': 'http: // weixin.sogou.com /',
    'cookie': '你的cookie'} # todo 不可能把我的给你撒
 session = session()
 queue = redisqueue()
 queue.del_all()
 mysql = mysqlconn()
 
 def start(self):
  """
   初始化工作
  :return:
  """
  # 全局更新headers
  # 如果你试过用这个方法修改headers,那么就知道这个在这里好像没什么用,我在这里浪费了至少两个小时
  self.session.headers.update(self.headers)
  start_url = self.base_url + urlencode(self.params)
  # 这里我将need_proxy=false设为了false 即并没有使用代理 ps:我也就是测试一下
  # 真正修改了headers是在这里
  weixin_request = weixinrequest(url=start_url, callback=self.parse_index, headers=self.headers, need_proxy=false)
  # 调度第一个请求
  self.queue.add(weixin_request)
 
 def schedule(self):
  """
   调度请求
  :return:
  """
  while not self.queue.empty():
   weixin_request = self.queue.pop()
   callback = weixin_request.callback
   print('schedule', weixin_request.url)
   response = self.request(weixin_request)
   if response and response.status_code in vald_statues:
    results = list(callback(response))
    if results:
     for result in results:
      print('new result', result)
      if isinstance(result, weixinrequest):
       # 将新的文章详情的url也加入队列
       self.queue.add(result)
      if isinstance(result, dict):
       # 储存到mysql
       self.mysql.insert('articles', result)
    else:
     self.error(weixin_request)
   else:
    self.error(weixin_request)
 
 def request(self, weixin_request):
  """
   执行请求
  :param weixin_request:请求
  :return: 响应
  """
  if not 'http://mp.weixin.qq.com/s?src' in weixin_request.url:
   try:
    if weixin_request.need_proxy:
     proxy = self.queue.get_proxy()
     if proxy:
      proxies = {
       'http': 'http://' + proxy,
       'https': 'https://' + proxy
      }
      return self.session.send(weixin_request.prepare(),
             timeout=weixin_request.timeout, allow_redirects=false, proxies=proxies)
    return self.session.send(weixin_request.prepare(), timeout=weixin_request.timeout,
           allow_redirects=false)
   except (connectionerror, readtimeout) as e:
    print(e.args)
    return false
  else:
   print('-' * 20)
   browser.get(weixin_request.url)
   try:
    browser.find_element_by_class_name('rich_media_area_primary_inner')
    wr = weixinresponse(browser.page_source)
    wr.set_status_code(200)
    return wr
   except nosuchelementexception:
    wr = weixinresponse('')
    wr.set_status_code(403)
    return wr
 
 def parse_index(self, response):
  """
   解析索引页
  :param response: 响应
  :return: 新的响应
  """
  doc = pq(response.text)
  items = doc('.news-box .news-list li .txt-box h3 a').items()
  for item in items:
   url = item.attr('href')
   weixin_request = weixinrequest(url=url, callback=self.parse_detail)
   yield weixin_request
 
 def parse_detail(self, response):
  """
   解析详情页
  :param response: 响应
  :return: 微信公众号文章
  """
  doc = pq(response.text)
  profile_inner = doc('.profile_inner')
  data = {
   'title': doc('.rich_media_title').text(),
   'content': doc('.rich_media_content').text(),
   'date': doc('#publish_time').text(),
   # 'nickname':doc('#js_profile_qrcode > div > strong').text(),
   'nickname': profile_inner.find('.profile_nickname').text(),
   'wechat':
    [ns for ns in profile_inner.find('.profile_meta').find('.profile_meta_value').items()][
     0].text()
  }
  # 储存图片
  print('#' * 30)
  soup = bs(response.text)
  wn = soup.find_all('img')
  for img in wn:
   if img.has_attr('_width') and img.has_attr('data-src'):
    print(img.attrs['data-src'])
  yield data
 
 def error(self, weixin_request):
  """
   错误处理
  :param weixin_request:请求
  :return:
  """
  weixin_request.fail_time = weixin_request.fail_time + 1
  print('request failed', weixin_request.fail_time, 'times', weixin_request.url)
  if weixin_request.fail_time < max_failed_time:
   self.queue.add(weixin_request)
 
 def run(self):
  self.start()
  self.schedule()
 
 
if __name__ == '__main__':
 spider = spider()
 spider.run()

2018-10-6更新:

今天测试之后使用了cookie并不能登录这个网站了,也许是腾讯使用了新的安全验证,具体也无从得知,但使用浏览器访问没有问题

python爬取微信公众号文章的方法

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。

原文链接:https://blog.csdn.net/max_wcsdn/article/details/82666088